介绍
什么是更改数据捕获?在一个数据源中发生的更改将在另一个数据源中捕获并复制。让我们用图片理解它。
a :数据库保持交易日志,其中所有DDL&DML查询都在其中。 MySQL维护Binlog和PostgreSQL维护WAL(写入日志)。
B&C :源连接器连接到数据库并读取事务日志。然后,它准备了变更的内存结构,并将其写入消息代理。通常,CDC系统维护更改的单个架构,以使不同的数据库连接器将数据转换为其中。
d&e :接收器连接器连接到消息经纪,读取数据并将更改写入目标数据库。
CDC的用例
让我们考虑软件工程中CDC的不同用例。
-
OLAP(在线分析处理)系统使用CDC将数据从交易数据库迁移到分析数据库。
-
OLTP(在线交易处理)系统还可以使用CDC作为事件总线来复制不同的数据存储中的数据。例如,从MySQL到Elasticsearch。
流行和广泛使用的系统之一是debezium。在此博客文章中,我们将讨论如何设置Debezium及其内部工作方式。
Debezium
Debezium在Kafka Connect的顶部工作。 Kafka Connect是在多个系统之间流式传输数据的框架。部署后,它将提供REST API来管理连接器。有两个连接器,源和水槽。从上图中看到,源连接器读取源数据库并将数据写入KAFKA主题。接收器连接器从这些KAFKA主题中读取并写入目标数据库。 Debezium是使用Kafka Connect框架构建的。它带有不同数据库的连接器。
Debezium只能通过Docker安装。安装非常容易。只需从注册表中取出图像并运行它。给出更多说明here。部署后,Debezium Server默认情况下将在端口8083上运行。我们可以点击其REST API以创建连接器。
# view connectors
curl -i -X GET localhost:8083/connectors
# create a connector
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" localhost:8083/connectors/ -d @mysql-connector.json
# delete a connector
curl -i -X DELETE localhost:8083/connectors/<connector name>
Debezium和Mysql
mySQL维护二进制日志(binlog),该日志写入所有模式的更改,数据以与发生相同的顺序更改。 Debezium MySQL连接器读取Binlog并为每个模式和行级别更改产生更改事件。
创建MySQL用户
为了使Debezium读取Binlog,它应具有具有特定权限的用户。在部署MySQL连接器之前,让我们使用以下命令创建一个用户。
CREATE USER 'user'@'localhost' IDENTIFIED BY 'password';
GRANT SELECT, RELOAD, SHOW DATABASES, LOCK TABLES, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'user' IDENTIFIED BY 'password';
FLUSH PRIVILEGES;
启用Binlogs
在自托管环境中启用Binlogs的过程很容易。指示给出了here。如果您使用的是AWS等托管服务,you can enable it from the console。此外,AWS RDS提供了一些存储的程序来设置此。
# show rds config
call mysql.rds_show_configuration;
# set binlog retention period
call mysql.rds_set_configuration('binlog retention hours', 96);
部署MySQL连接器
在部署mySQL连接器之前,我们需要准备JSON。
{
"name": "debezium-connector1",
"config": {
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"database.hostname": "localhost",
"database.port": "3306",
"database.user": "debezium",
"database.password": "abc123",
"database.server.id": "130",
"topic.prefix": "prod-debezium",
"database.include.list": "mydatabase",
"table.include.list": "mydatabase.users,mydatabase.orders",
"snapshot.mode": "initial",
"schema.history.internal.kafka.bootstrap.servers": "localhost:9092,locahost:9093,localhost:9094",
"schema.history.internal.kafka.topic": "dbhistory.debezium",
"provide.transaction.metadata": "true",
"topic.creation.default.replication.factor": 2,
"topic.creation.default.partitions": 10,
"database.history.skip.unparseable.ddl": "true",
"event.processing.failure.handling.mode": "warn",
"snapshot.lock.timeout.ms": 180000,
}
}
让我们尝试了解每种配置的含义。我跳过自我解释的配置,仅提及重要的配置。
connector.class
-这是Debezium代码中连接器的途径。部署此连接器时,该特定类将运行并处理数据。因此,正确给出此配置很重要。
database.server.id
-连接器的唯一标识符。这区分了其他MySQL连接器。
topic.prefix
-在较旧版本中,它曾经是database.server.name
。这为连接器的当前实例提供了一个名称空间。 Debezium通过将其前缀加上表名称,即<topic.prefix>.<table_name>
创建KAFKA主题。对于连接器而言,这应该是唯一的,其他Debezium假定它是现有的连接器,并试图从停止的位置恢复。
数据库历史主题
客户端连接到数据库并读取其当前架构。但是模式可以随时更改。因此,Debezium连接器不能仅仅读取当前架构,因为它可能正在处理数据库的旧变更事件。它需要将当前变化与确切的模式相关联。因此,它需要存储模式更改的位置。
我们知道,MySQL会随着行级别的变化而变化。 Debezium读取DDL语句,准备表的内存表示形式,并将它们与Binlog位置一起存储在单独的主题中。该主题称为数据库历史主题。它配置为schema.history.internal.kafka.topic
。
当连接器重新启动时,它会读取模式历史主题并开始重建每个表的内存表示形式,并恢复阅读剩下的binlog。 Debezium连接器与行级变更一起排放架构,以便消费者系统可以重建整个桌子。
关于历史主题的一个非常重要的说明是它仅用于Debezium内部使用,不应被分区。意思是,它应该只有一个分区。为什么?因为在Kafka中,仅在分区级别而不是在主题级别上保证消息的顺序。因此,如果对其进行了分区,则将混合模式更改的顺序,从而导致混乱。
这是模式历史主题的数据看起来
{
"source" : {
"server" : "prod-debezium"
},
"position" : {
"transaction_id" : null,
"ts_sec" : 1674648853,
"file" : "mysql-bin-changelog.106869",
"pos" : 69735280,
"server_id" : 1229362390
},
"databaseName" : "licious",
"ddl" : "CREATE TABLE `user_groups` (\n\t`id` bigint(20) NOT NULL AUTO_INCREMENT PRIMARY KEY,\n\t`user_id` bigint(20),\n\t`customer_key` varchar(25) NOT NULL,\n\tCONSTRAINT FOREIGN KEY `fk_user_id` (user_id) REFERENCES `users`(id) ON DELETE CASCADE,\n\tCONSTRAINT UNIQUE KEY `unique_user_group_id` (user_id, customer_key)\n) ENGINE=InnoDB DEFAULT CHARSET=latin1",
"tableChanges" : [ ]
}
模式更改主题
由于模式历史主题是用于Debezium的内部使用,因此它提供了一个模式更改主题,外部消费者可以在其中消费模式更改事件。主题名称可以使用topic.prefix
(较早的database.server.name
)
快照
debezium存储数据库的快照,以提供高容差。为了执行快照,连接器首先试图获得阻止其他客户端写作的全局读取锁,然后读取所有表格的架构并释放锁。获取锁非常重要,因为它有助于保持一致性,因为它阻止了此期间的写入。如果不可能进行全局读取锁,那么它将获取表级锁。有关它的更多信息here。
有不同的快照模式。它可以使用snapshot.mode
配置。最常用的模式是
initial
在需要架构更改时使用,并且从开始时,行级别更改。模式更改写入模式历史记录和模式更改主题,数据更改写入<topic.prefix>.<table_name>
。
schema_only
仅使用模式的快照。这很有用,如果您不希望表的全部数据相反,则只需要从部署的那一刻起就需要数据。如果您的表包含OLTP系统中的动态数据,则使用此模式。
when_needed
每当需要删除BINLOG或删除模式历史主题等时,请使用快照。
故障排除Debezium
MySQL Server不配置为使用ROW binlog_format,该连接器正常工作需要
io.debezium.DebeziumException: The MySQL server is not
configured to use a ROW binlog_format, which is required for this
connector to work properly. Change the MySQL configuration to use a
binlog_format=ROW and restart the connector. at
io.debezium.connector.mysql.MySqlConnectorTask.validateBinlogConfiguration(MySqlConnectorTask.java:262) at
io.debezium.connector.mysql.MySqlConnectorTask.start(MySqlConnectorTask.java:86) at
io.debezium.connector.common.BaseSourceTask.start(BaseSourceTask.java:130) at
org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:232) at
通过/etc/mysql.cnf
或在mySQL shell中执行以下命令。
SET GLOBAL binlog_format = 'ROW';
数据库历史主题 retention.ms 应为-1或大于5年
2022-01-06 13:08:55,904 WARN || Database history topic
'dbhistory.dev-jigyasa-licious' option 'retention.ms' should be '-1' or greater than '157680000000' (5 years)
but is '604800000' [io.debezium.relational.history.KafkaDatabaseHistory]
通过kafka cli。将数据库历史主题的retention.ms
设置为-1
连接器正在尝试读取binlog,但它不再在服务器上可用。
2022-01-06 13:08:56,405 ERROR ||
WorkerSourceTask{id=jigyasa-mysql-connector-licious-2-0} Task threw an uncaught and unrecoverable exception.
Task is being killed and will not recover until manually restarted
[org.apache.kafka.connect.runtime.WorkerTask] io.debezium.DebeziumException:
The connector is trying to read binlog starting at SourceInfo [currentGtid=null,
currentBinlogFilename=mysql-bin-changelog.032256, currentBinlogPosition=53270, currentRowNumber=0,
serverId=0, sourceTime=null, threadId=-1, currentQuery=null, tableIds=[], databaseName=null],
but this is no longer available on the server. Reconfigure the connector to use a snapshot when needed.
2022-01-06 13:08:56,405 INFO || Connector requires binlog file
'mysql-bin-changelog.032256', but MySQL only has mysql-bin-changelog.032918, mysql-bin-changelog.032919,
mysql-bin-changelog.032920 [io.debezium.connector.mysql.MySqlConnectorTask]
-
使用上述命令扩展了Binlog保留时间。
-
删除连接器并使用REST API再次重新删除它。
在二进制日志索引文件错误代码中找不到第一个日志文件名:1236; SQLSTATE:HY000。
org.apache.kafka.connect.errors.ConnectException: An exception occurred in the change event producer. This connector will be stopped.
at io.debezium.pipeline.ErrorHandler.setProducerThrowable(ErrorHandler.java:42)
at io.debezium.connector.mysql.MySqlStreamingChangeEventSource$ReaderThreadLifecycleListener.onCommunicationFailure(MySqlStreamingChangeEventSource.java:1217)
at com.github.shyiko.mysql.binlog.BinaryLogClient.listenForEventPackets(BinaryLogClient.java:980)
at com.github.shyiko.mysql.binlog.BinaryLogClient.connect(BinaryLogClient.java:599)
at com.github.shyiko.mysql.binlog.BinaryLogClient$7.run(BinaryLogClient.java:857)
at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: io.debezium.DebeziumException: Could not find first log file name in binary log index file Error code: 1236; SQLSTATE: HY000.
at io.debezium.connector.mysql.MySqlStreamingChangeEventSource.wrap(MySqlStreamingChangeEventSource.java:1172)
... 5 more
Caused by: com.github.shyiko.mysql.binlog.network.ServerException: Could not find first log file name in binary log index file
at com.github.shyiko.mysql.binlog.BinaryLogClient.listenForEventPackets(BinaryLogClient.java:944)
... 3 more
如果您升级了MySQL并清理了Binlogs,通常会发生这种情况。删除连接器并使用REST API再次重新删除它。
io.debezium.debeziumexception:未能从位置1640249837读取下一个字节
org.apache.kafka.connect.errors.ConnectException: An exception occurred in the change event producer. This connector will be stopped.
at io.debezium.pipeline.ErrorHandler.setProducerThrowable(ErrorHandler.java:42)
at io.debezium.connector.mysql.MySqlStreamingChangeEventSource$ReaderThreadLifecycleListener.onCommunicationFailure(MySqlStreamingChangeEventSource.java:1217)
at com.github.shyiko.mysql.binlog.BinaryLogClient.listenForEventPackets(BinaryLogClient.java:980)
at com.github.shyiko.mysql.binlog.BinaryLogClient.connect(BinaryLogClient.java:599)
at com.github.shyiko.mysql.binlog.BinaryLogClient$7.run(BinaryLogClient.java:857)
at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: io.debezium.DebeziumException: Failed to read next byte from position 1640249837
at io.debezium.connector.mysql.MySqlStreamingChangeEventSource.wrap(MySqlStreamingChangeEventSource.java:1172)
... 5 more
Caused by: java.io.EOFException: Failed to read next byte from position 1640249837
at com.github.shyiko.mysql.binlog.io.ByteArrayInputStream.read(ByteArrayInputStream.java:213)
at com.github.shyiko.mysql.binlog.io.ByteArrayInputStream.readInteger(ByteArrayInputStream.java:52)
at com.github.shyiko.mysql.binlog.event.deserialization.EventHeaderV4Deserializer.deserialize(EventHeaderV4Deserializer.java:35)
at com.github.shyiko.mysql.binlog.event.deserialization.EventHeaderV4Deserializer.deserialize(EventHeaderV4Deserializer.java:27)
at com.github.shyiko.mysql.binlog.event.deserialization.EventDeserializer.nextEvent(EventDeserializer.java:221)
at io.debezium.connector.mysql.MySqlStreamingChangeEventSource$1.nextEvent(MySqlStreamingChangeEventSource.java:230)
at com.github.shyiko.mysql.binlog.BinaryLogClient.listenForEventPackets(BinaryLogClient.java:952)
... 3 more
如果BINLOG包含连接器无法处理的任何特殊字符,则会发生这种情况。解决此问题的一种方法是将event.processing.failure.handling.mode
设置为在连接器配置中警告。
mysqltransactionRollbackexception:超过锁定超时;尝试重新启动交易
org.apache.kafka.connect.errors.ConnectException:An exception occurred in the change event producer. This connector will be
stopped.
at
io.debezium.pipeline.ErrorHandler.setProducerThrowable(ErrorHandler.java:42)
at
io.debezium.pipeline.ChangeEventSourceCoordinator.lambda$start$0(ChangeEventSourceCoordinator.java:115)
at
java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:829)
Caused by:
io.debezium.DebeziumException: com.mysql.cj.jdbc.exceptions.MySQLTransactionRollbackException: Lock wait
timeout exceeded; try restarting transaction
at
io.debezium.pipeline.source.AbstractSnapshotChangeEventSource.execute(AbstractSnapshotChangeEventSource.java:85)
at
io.debezium.pipeline.ChangeEventSourceCoordinator.doSnapshot(ChangeEventSourceCoordinator.java:153)
at
当Debezium连接器无法获得全局读取锁定或表级锁定时,就会发生这种情况。
-
将koude17增加到10-15分钟,然后重新部署连接器,以便Debezium连接器等待更多时间来获取锁。
-
如果上述解决方案没有帮助,则意味着数据库很忙。简而言之,杀死所有这些客户并重新部署连接器。
show open tables where in_use>0;
show full processlist;
kill <pid>;