使用Debezium更改数据捕获
#database #debezium

介绍

什么是更改数据捕获?在一个数据源中发生的更改将在另一个数据源中捕获并复制。让我们用图片理解它。

Image showing debezium

a :数据库保持交易日志,其中所有DDL&DML查询都在其中。 MySQL维护Binlog和PostgreSQL维护WAL(写入日志)。

B&C :源连接器连接到数据库并读取事务日志。然后,它准备了变更的内存结构,并将其写入消息代理。通常,CDC系统维护更改的单个架构,以使不同的数据库连接器将数据转换为其中。

d&e :接收器连接器连接到消息经纪,读取数据并将更改写入目标数据库。

CDC的用例

让我们考虑软件工程中CDC的不同用例。

  1. OLAP(在线分析处理)系统使用CDC将数据从交易数据库迁移到分析数据库。

  2. OLTP(在线交易处理)系统还可以使用CDC作为事件总线来复制不同的数据存储中的数据。例如,从MySQL到Elasticsearch。

流行和广泛使用的系统之一是debezium。在此博客文章中,我们将讨论如何设置Debezium及其内部工作方式。

Debezium

Debezium在Kafka Connect的顶部工作。 Kafka Connect是在多个系统之间流式传输数据的框架。部署后,它将提供REST API来管理连接器。有两个连接器,源和水槽。从上图中看到,源连接器读取源数据库并将数据写入KAFKA主题。接收器连接器从这些KAFKA主题中读取并写入目标数据库。 Debezium是使用Kafka Connect框架构建的。它带有不同数据库的连接器。

Debezium只能通过Docker安装。安装非常容易。只需从注册表中取出图像并运行它。给出更多说明here。部署后,Debezium Server默认情况下将在端口8083上运行。我们可以点击其REST API以创建连接器。

# view connectors
curl -i -X GET localhost:8083/connectors
# create a connector
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" localhost:8083/connectors/ -d @mysql-connector.json
# delete a connector
curl -i -X DELETE localhost:8083/connectors/<connector name>

Debezium和Mysql

mySQL维护二进制日志(binlog),该日志写入所有模式的更改,数据以与发生相同的顺序更改。 Debezium MySQL连接器读取Binlog并为每个模式和行级别更改产生更改事件。

创建MySQL用户

为了使Debezium读取Binlog,它应具有具有特定权限的用户。在部署MySQL连接器之前,让我们使用以下命令创建一个用户。

CREATE USER 'user'@'localhost' IDENTIFIED BY 'password';
GRANT SELECT, RELOAD, SHOW DATABASES, LOCK TABLES, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'user' IDENTIFIED BY 'password';
FLUSH PRIVILEGES;

启用Binlogs

在自托管环境中启用Binlogs的过程很容易。指示给出了here。如果您使用的是AWS等托管服务,you can enable it from the console。此外,AWS RDS提供了一些存储的程序来设置此。

# show rds config
call mysql.rds_show_configuration;
# set binlog retention period
call mysql.rds_set_configuration('binlog retention hours', 96); 

部署MySQL连接器

在部署mySQL连接器之前,我们需要准备JSON。

{
  "name": "debezium-connector1",
  "config": {
    "connector.class": "io.debezium.connector.mysql.MySqlConnector",
    "database.hostname": "localhost",
    "database.port": "3306",
    "database.user": "debezium",
    "database.password": "abc123",
    "database.server.id": "130",
    "topic.prefix": "prod-debezium",
    "database.include.list": "mydatabase",
    "table.include.list": "mydatabase.users,mydatabase.orders",
    "snapshot.mode": "initial",
    "schema.history.internal.kafka.bootstrap.servers": "localhost:9092,locahost:9093,localhost:9094",
    "schema.history.internal.kafka.topic": "dbhistory.debezium",
    "provide.transaction.metadata": "true",
    "topic.creation.default.replication.factor": 2,
    "topic.creation.default.partitions": 10,
    "database.history.skip.unparseable.ddl": "true",
    "event.processing.failure.handling.mode": "warn",
    "snapshot.lock.timeout.ms": 180000,
  }
}

让我们尝试了解每种配置的含义。我跳过自我解释的配置,仅提及重要的配置。

connector.class-这是Debezium代码中连接器的途径。部署此连接器时,该特定类将运行并处理数据。因此,正确给出此配置很重要。

database.server.id-连接器的唯一标识符。这区分了其他MySQL连接器。

topic.prefix-在较旧版本中,它曾经是database.server.name。这为连接器的当前实例提供了一个名称空间。 Debezium通过将其前缀加上表名称,即<topic.prefix>.<table_name>创建KAFKA主题。对于连接器而言,这应该是唯一的,其他Debezium假定它是现有的连接器,并试图从停止的位置恢复。

数据库历史主题

客户端连接到数据库并读取其当前架构。但是模式可以随时更改。因此,Debezium连接器不能仅仅读取当前架构,因为它可能正在处理数据库的旧变更事件。它需要将当前变化与确切的模式相关联。因此,它需要存储模式更改的位置。

我们知道,MySQL会随着行级别的变化而变化。 Debezium读取DDL语句,准备表的内存表示形式,并将它们与Binlog位置一起存储在单独的主题中。该主题称为数据库历史主题。它配置为schema.history.internal.kafka.topic

当连接器重新启动时,它会读取模式历史主题并开始重建每个表的内存表示形式,并恢复阅读剩下的binlog。 Debezium连接器与行级变更一起排放架构,以便消费者系统可以重建整个桌子。

关于历史主题的一个非常重要的说明是它仅用于Debezium内部使用,不应被分区。意思是,它应该只有一个分区。为什么?因为在Kafka中,仅在分区级别而不是在主题级别上保证消息的顺序。因此,如果对其进行了分区,则将混合模式更改的顺序,从而导致混乱。

这是模式历史主题的数据看起来

{
  "source" : {
    "server" : "prod-debezium"
  },
  "position" : {
    "transaction_id" : null,
    "ts_sec" : 1674648853,
    "file" : "mysql-bin-changelog.106869",
    "pos" : 69735280,
    "server_id" : 1229362390
  },
  "databaseName" : "licious",
  "ddl" : "CREATE TABLE `user_groups` (\n\t`id` bigint(20) NOT NULL AUTO_INCREMENT PRIMARY KEY,\n\t`user_id` bigint(20),\n\t`customer_key` varchar(25) NOT NULL,\n\tCONSTRAINT FOREIGN KEY `fk_user_id` (user_id) REFERENCES `users`(id) ON DELETE CASCADE,\n\tCONSTRAINT UNIQUE KEY `unique_user_group_id` (user_id, customer_key)\n) ENGINE=InnoDB DEFAULT CHARSET=latin1",
  "tableChanges" : [ ]
}

模式更改主题

由于模式历史主题是用于Debezium的内部使用,因此它提供了一个模式更改主题,外部消费者可以在其中消费模式更改事件。主题名称可以使用topic.prefix(较早的database.server.name

配置

快照

debezium存储数据库的快照,以提供高容差。为了执行快照,连接器首先试图获得阻止其他客户端写作的全局读取锁,然后读取所有表格的架构并释放锁。获取锁非常重要,因为它有助于保持一致性,因为它阻止了此期间的写入。如果不可能进行全局读取锁,那么它将获取表级锁。有关它的更多信息here

有不同的快照模式。它可以使用snapshot.mode配置。最常用的模式是

initial在需要架构更改时使用,并且从开始时,行级别更改。模式更改写入模式历史记录和模式更改主题,数据更改写入<topic.prefix>.<table_name>

schema_only仅使用模式的快照。这很有用,如果您不希望表的全部数据相反,则只需要从部署的那一刻起就需要数据。如果您的表包含OLTP系统中的动态数据,则使用此模式。

when_needed每当需要删除BINLOG或删除模式历史主题等时,请使用快照。

故障排除Debezium

MySQL Server不配置为使用ROW binlog_format,该连接器正常工作需要

io.debezium.DebeziumException: The MySQL server is not
configured to use a ROW binlog_format, which is required for this
connector to work properly. Change the MySQL configuration to use a
binlog_format=ROW and restart the connector.    at
io.debezium.connector.mysql.MySqlConnectorTask.validateBinlogConfiguration(MySqlConnectorTask.java:262) at
io.debezium.connector.mysql.MySqlConnectorTask.start(MySqlConnectorTask.java:86) at
io.debezium.connector.common.BaseSourceTask.start(BaseSourceTask.java:130)  at
org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:232)    at

通过/etc/mysql.cnf或在mySQL shell中执行以下命令。

SET GLOBAL binlog_format = 'ROW';

参考-MySQL Documentation

数据库历史主题 retention.ms 应为-1或大于5年

2022-01-06 13:08:55,904 WARN   ||  Database history topic
'dbhistory.dev-jigyasa-licious' option 'retention.ms' should be '-1' or greater than '157680000000' (5 years)
but is '604800000'   [io.debezium.relational.history.KafkaDatabaseHistory]

通过kafka cli。将数据库历史主题的retention.ms设置为-1

连接器正在尝试读取binlog,但它不再在服务器上可用。

2022-01-06 13:08:56,405 ERROR  || 
WorkerSourceTask{id=jigyasa-mysql-connector-licious-2-0} Task threw an uncaught and unrecoverable exception.
Task is being killed and will not recover until manually restarted  
[org.apache.kafka.connect.runtime.WorkerTask] io.debezium.DebeziumException:
The connector is trying to read binlog starting at SourceInfo [currentGtid=null,
currentBinlogFilename=mysql-bin-changelog.032256, currentBinlogPosition=53270, currentRowNumber=0,
serverId=0, sourceTime=null, threadId=-1, currentQuery=null, tableIds=[], databaseName=null],
but this is no longer available on the server. Reconfigure the connector to use a snapshot when needed.  


2022-01-06 13:08:56,405 INFO   ||  Connector requires binlog file
'mysql-bin-changelog.032256', but MySQL only has mysql-bin-changelog.032918, mysql-bin-changelog.032919,
mysql-bin-changelog.032920   [io.debezium.connector.mysql.MySqlConnectorTask]  
  1. 使用上述命令扩展了Binlog保留时间。

  2. 删除连接器并使用REST API再次重新删除它。

在二进制日志索引文件错误代码中找不到第一个日志文件名:1236; SQLSTATE:HY000。

org.apache.kafka.connect.errors.ConnectException: An exception occurred in the change event producer. This connector will be stopped.
    at io.debezium.pipeline.ErrorHandler.setProducerThrowable(ErrorHandler.java:42)
    at io.debezium.connector.mysql.MySqlStreamingChangeEventSource$ReaderThreadLifecycleListener.onCommunicationFailure(MySqlStreamingChangeEventSource.java:1217)
    at com.github.shyiko.mysql.binlog.BinaryLogClient.listenForEventPackets(BinaryLogClient.java:980)
    at com.github.shyiko.mysql.binlog.BinaryLogClient.connect(BinaryLogClient.java:599)
    at com.github.shyiko.mysql.binlog.BinaryLogClient$7.run(BinaryLogClient.java:857)
    at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: io.debezium.DebeziumException: Could not find first log file name in binary log index file Error code: 1236; SQLSTATE: HY000.
    at io.debezium.connector.mysql.MySqlStreamingChangeEventSource.wrap(MySqlStreamingChangeEventSource.java:1172)
    ... 5 more
Caused by: com.github.shyiko.mysql.binlog.network.ServerException: Could not find first log file name in binary log index file
    at com.github.shyiko.mysql.binlog.BinaryLogClient.listenForEventPackets(BinaryLogClient.java:944)
    ... 3 more

如果您升级了MySQL并清理了Binlogs,通常会发生这种情况。删除连接器并使用REST API再次重新删除它。

io.debezium.debeziumexception:未能从位置1640249837读取下一个字节

org.apache.kafka.connect.errors.ConnectException: An exception occurred in the change event producer. This connector will be stopped.
    at io.debezium.pipeline.ErrorHandler.setProducerThrowable(ErrorHandler.java:42)
    at io.debezium.connector.mysql.MySqlStreamingChangeEventSource$ReaderThreadLifecycleListener.onCommunicationFailure(MySqlStreamingChangeEventSource.java:1217)
    at com.github.shyiko.mysql.binlog.BinaryLogClient.listenForEventPackets(BinaryLogClient.java:980)
    at com.github.shyiko.mysql.binlog.BinaryLogClient.connect(BinaryLogClient.java:599)
    at com.github.shyiko.mysql.binlog.BinaryLogClient$7.run(BinaryLogClient.java:857)
    at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: io.debezium.DebeziumException: Failed to read next byte from position 1640249837
    at io.debezium.connector.mysql.MySqlStreamingChangeEventSource.wrap(MySqlStreamingChangeEventSource.java:1172)
    ... 5 more
Caused by: java.io.EOFException: Failed to read next byte from position 1640249837
    at com.github.shyiko.mysql.binlog.io.ByteArrayInputStream.read(ByteArrayInputStream.java:213)
    at com.github.shyiko.mysql.binlog.io.ByteArrayInputStream.readInteger(ByteArrayInputStream.java:52)
    at com.github.shyiko.mysql.binlog.event.deserialization.EventHeaderV4Deserializer.deserialize(EventHeaderV4Deserializer.java:35)
    at com.github.shyiko.mysql.binlog.event.deserialization.EventHeaderV4Deserializer.deserialize(EventHeaderV4Deserializer.java:27)
    at com.github.shyiko.mysql.binlog.event.deserialization.EventDeserializer.nextEvent(EventDeserializer.java:221)
    at io.debezium.connector.mysql.MySqlStreamingChangeEventSource$1.nextEvent(MySqlStreamingChangeEventSource.java:230)
    at com.github.shyiko.mysql.binlog.BinaryLogClient.listenForEventPackets(BinaryLogClient.java:952)
    ... 3 more

如果BINLOG包含连接器无法处理的任何特殊字符,则会发生这种情况。解决此问题的一种方法是将event.processing.failure.handling.mode设置为在连接器配置中警告。

mysqltransactionRollbackexception:超过锁定超时;尝试重新启动交易

org.apache.kafka.connect.errors.ConnectException:An exception occurred in the change event producer. This connector will be
stopped.
    at
io.debezium.pipeline.ErrorHandler.setProducerThrowable(ErrorHandler.java:42)
    at
io.debezium.pipeline.ChangeEventSourceCoordinator.lambda$start$0(ChangeEventSourceCoordinator.java:115)
    at
java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
    at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
    at
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    at
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
    at java.base/java.lang.Thread.run(Thread.java:829)
Caused by:
io.debezium.DebeziumException: com.mysql.cj.jdbc.exceptions.MySQLTransactionRollbackException: Lock wait
timeout exceeded; try restarting transaction
    at
io.debezium.pipeline.source.AbstractSnapshotChangeEventSource.execute(AbstractSnapshotChangeEventSource.java:85)
    at
io.debezium.pipeline.ChangeEventSourceCoordinator.doSnapshot(ChangeEventSourceCoordinator.java:153)
    at

当Debezium连接器无法获得全局读取锁定或表级锁定时,就会发生这种情况。

  1. koude17增加到10-15分钟,然后重新部署连接器,以便Debezium连接器等待更多时间来获取锁。

  2. 如果上述解决方案没有帮助,则意味着数据库很忙。简而言之,杀死所有这些客户并重新部署连接器。

show open tables where in_use>0;
show full processlist;
kill <pid>;