介绍
假设您有一个数据库处理OLTP查询。为了解决密集的分析BI报告,您设置了一个对Clickhouse等适用于OLAP的数据库。您如何同步追随者数据库(这里是Clickhouse)?您应该为哪些挑战做好准备?
在数据密集型应用程序中同步两个或多个数据库是您之前或现在正在处理的常规例程之一。由于更改数据捕获(CDC)和Kafka等技术,此过程不再复杂。但是,根据您使用的数据库,如果源数据库在OLTP范式中工作,并且OLAP中的目标是源数据库,则可能具有挑战性。在本文中,我将从MySQL作为Clickhouse作为目的地的来源浏览此过程。尽管我将这篇文章限于这些技术,但对于类似情况来说,这是相当普遍的。
系统设计概述
与听起来相反,它非常简单。数据库更改是通过Debezium捕获的,并在Apache Kafka上发表。 Clickhouse由Kafka Engine分区消耗这些变化。实时并最终保持一致。
案例分析
想象我们在MySQL中有一个 orders 表,以下DDL:
CREATE TABLE `orders` (
`id` int(11) NOT NULL,
`status` varchar(50) NOT NULL,
`price` varchar(50) NOT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=latin1
用户可以创建,删除和更新任何列或整个记录。我们想捕获其更改并将其沉入Clickhouse以同步。
我们将使用debezium v2.1 和 inplecingmergetree 发动机
执行
步骤1:与Debezium的CDC
大多数数据库都有一个日志,该日志在应用数据之前都写在此处(写入日志或WAL)。在MySQL中,此文件称为binlog。如果您读取该文件,请解析并将其应用于您的目标数据库,则遵循更改数据捕获(CDC)清单。
CDC是同步两个或多个异质数据库的最佳方法之一。它是实时的,最终是一致的,并阻止您无法采取更多的成本,例如带有气流的批处理。无论源上发生了什么,您都可以按顺序捕获它,并且与原件一致(当然,当然!)
debezium是用于阅读和解析Binlog的众所周知的工具。它只是与Kafka Connect作为连接器集成,并在Kafka主题上产生所有更改。
为此,您要在MySQL数据库上启用log-bin,并相应地设置Kafka Connect,Kafka和Debezium。由于它在this或this(例如this)等其他文章中得到了很好的解释,因此我只专注于为我们的目的定制的Debezium配置:捕获更改,同时由Clickhouse进行功能和解析。
显示整个配置之前,我们应该讨论三个必要的配置:
提取新的记录状态
debezium在和 em> em>“默认情况下” 和之前散发出每条记录。此外,它会在删除操作(同样是由Clickhouse无法避免的情况下)创建墓碑记录(即具有零值的记录)。整个行为已在下表中证明。
我们在Debezium配置中使用ExtractNewRecod变压器来解决问题。多亏了此选项,Debezium仅在 状态之后保持 创建/更新操作,并忽略了以前的状态。但是,作为缺点,它删除了 delete 记录,其中包含先前的状态和前面提到的墓碑记录。换句话说,您将不再捕获删除操作了。不用担心!我们将在下一部分中解决它。
"transforms": "unwrap",
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState"
下图显示了如何通过使用 extractNewRecord 配置来删除之前的状态 。
重写删除事件
要捕获删除操作,我们必须添加重写 config如下:
"transforms.unwrap.delete.handling.mode":"rewrite"
debezium与此配置一起添加了字段__deleted
,这对于删除操作是正确的,而对其他配置则是错误的。因此,删除将包含以前的状态以及__deleted: true
字段。
提供上述配置,更新记录(主键除外的每一列),以新状态发出简单的记录。可以拥有另一个具有相同DDL的关系数据库,因为更新的记录替换了目标中的前一个。但是在Clickhouse的情况下,故事出错了! 在我们的示例中,源将 id 用作主要键,ClickHouse使用 id 和 status 作为订单键。替换和唯一性仅保证具有相同 id 和状态的记录!那么,如果源更新状态列会发生什么?我们最终获得了重复的记录,这意味着等于 ids ,但Clikhouse中的不同状态! 幸运的是,有一种方法。默认情况下,Debezium创建删除记录和一个创建记录,用于更新主键。因此,如果源更新 id ,它将以先前的 id 和a 创建的记录 delete 记录。新的 id 。前一个带有 现在,通过将上述所有选项组合在一起,我们将具有功能齐全的Debezium配置,能够处理Clickhouse所需的任何更改: 通过更改连接器的密钥列,Debezium使用这些列作为主题键,而不是源表的默认主键。因此,与数据库记录相关的不同操作可能最终在Kafka的其他分区中。由于记录在不同的分区中丢失了订单,因此,除非您确保ClickHouse订单键和Debezium消息键相同,否则它可能会导致Clikchouse的不一致。 经验法则如下: 根据所需的表设计设计分区密钥和顺序键。 提取分区和排序键的源起源,假设它们是在物质化过程中计算的。 联合所有这些列 将步骤3的结果定义为 message.column.keys 在Debezium Connector配置中。 检查Clickhouse排序键是否具有所有这些列。如果没有,请添加。 Clickhouse可以通过使用Kafka Engine将Kafka记录陷入表中。我们需要定义三个表:kafka表,消费者材料表和主表。 kafka表定义了记录结构,旨在阅读的kafka主题。 Kafka表中的每张记录都只能阅读一次,因为其消费者群会碰到偏移量,我们可以阅读两次。因此,我们需要定义一个主表,并通过视图材料器将每个kafka表记录归于它: 主表具有源结构加上 最后,我们需要过滤每个已删除的记录(因为我们不想看到它们),并且在具有相同类型键的不同记录的情况下,有最新记录。可以使用 final 修饰符来解决这。但是,为了避免在每个查询中使用过滤器和最终方法,我们可以定义一个简单的视图以隐式地完成工作: 注意:对于每个查询,尤其是在生产中,最终使用最终的效率低下。您可以使用聚合查看最后记录或等待Clickhouse在后台合并记录。 在本文中,我们看到了如何通过CDC同步Clickhouse数据库,并使用软耗尽方法来防止重复。
处理非主要键更新
__deleted=ture
字段的人取代了我们在CH中的失速记录。然后,可以在视图中过滤删除的记录。我们可以使用以下选项将此行为扩展到其他列:
"message.key.columns": "inventory.orders:id;inventory.orders:status"
{
"name": "mysql-connector",
"config": {
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"database.hostname": "mysql",
"database.include.list": "inventory",
"database.password": "mypassword",
"database.port": "3306",
"database.server.id": "2",
"database.server.name": "dbz.inventory.v2",
"database.user": "root",
"message.key.columns": "inventory.orders:id;inventory.orders:status",
"name": "mysql-connector-v2",
"schema.history.internal.kafka.bootstrap.servers": "broker:9092",
"schema.history.internal.kafka.topic": "dbz.inventory.history.v2",
"snapshot.mode": "schema_only",
"table.include.list": "inventory.orders",
"topic.prefix": "dbz.inventory.v2",
"transforms": "unwrap",
"transforms.unwrap.delete.handling.mode": "rewrite",
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState"
}
}
重要的是:如何选择Debezium键列?
步骤2:Clickhouse表
卡夫卡桌
CREATE TABLE default.kafka_orders
(
`id` Int32,
`status` String,
`price` String,
`__deleted` Nullable(String)
)
ENGINE = Kafka('broker:9092', 'inventory.orders', 'clickhouse', 'AvroConfluent')
SETTINGS format_avro_schema_registry_url = 'http://schema-registry:8081'
消费者材料化合物
CREATE MATERIALIZED VIEW default.consumer__orders TO default.stream_orders
(
`id` Int32,
`status` String,
`price` String,
`__deleted` Nullable(String)
) AS
SELECT
id AS id,
status AS status,
price AS price,
__deleted AS __deleted
FROM default.kafka_orders
主表
__deleted
字段。我使用替换合并树,因为我们需要用删除或更新的摊位记录替换失速记录。
CREATE TABLE default.stream_orders
(
`id` Int32,
`status` String,
`price` String,
`__deleted`String
)
ENGINE = ReplacingMergeTree
ORDER BY (id, price)
SETTINGS index_granularity = 8192
查看表
CREATE VIEW default.orders
(
`id` Int32,
`status` String,
`price` String,
`__deleted` String
) AS
SELECT *
FROM default.stream_orders
FINAL
WHERE __deleted = 'false'
结论