这是有关使用孟菲斯构建现代事件驱动系统的一系列博客文章的第二部分。
在我们的last blog post中,我们引入了使用Debezium Server和Memphis.dev从PostgreSQL数据库中捕获更改数据捕获(CDC)事件的参考实现。通过用Memphis.dev替换Apache Kafka,该解决方案大大降低了运营资源和间接费用 - 节省资金并释放开发人员专注于构建新功能。
PostgreSQL是唯一常用的数据库。 Debezium为一系列数据库提供连接器,包括非依赖文档数据库MongoDB。 MongoDB在开发人员中很受欢迎,尤其是那些从事动态编程语言的人,因为它避免了对象相关的阻抗不匹配。开发人员可以在数据库中直接存储,查询和更新对象。
在此博客文章中,我们演示了如何使CDC解决方案适应MongoDB。
解决方案的概述
在这里,我们描述了使用Memphis.dev传递更改数据捕获事件的参考解决方案的体系结构。该体系结构没有从our previous blog帖子更改,除了用MongoDB替换PostgreSQL。
一个todo项目生成器脚本将随机生成的记录写入mongoDB。 Debezium Server从MongoDB接收CDC事件,并通过HTTP客户端接收器将其转发到Memphis REST gateway。孟菲斯休息网关将消息添加到孟菲斯的站点。最后,一个消费者脚本对孟菲斯进行了民意调查,以获取新消息并将其打印到控制台。
- todo Item Generator :每0.5秒插入MongoDB集合中随机生成的TODO项目。每个待办事项都包含一个描述,创建时间戳,可选的到期日和完成状态。
- mongodb :使用一个包含一个集合(todo_items)的单个数据库配置。
- Debezium Server :配置了MongoDB源和HTTP客户端接收器连接器的Debezium Server的实例。
- MEMPHIS.DEV REST网关:使用开箱即用的配置。
- MEMPHIS.DEV :配置了一个站(TODO-CDC-事件)和单个用户(TODOCDCService)
- p* rinting消费者*:一个使用孟菲斯的脚本。devpython sdk消耗消息并将其打印到控制台
入门
该实现教程可在Memphis Example Solutions存储库的MongoDB-Debezium-CDC-example目录中找到。将需要Docker Compose运行它。
运行实施
为Debezium服务器,打印消费者和数据库设置构建Docker映像(表和用户创建)。
当前,实现取决于JWT身份验证支持的Debezium Server的预释放版本。 Docker映像将直接从Debezium和Debezium Server存储库的主要分支中构建。请注意,此步骤可能需要一段时间(约20分钟)才能运行。当Debezium Server 2.3.0发布时,我们将切换到上游Docker Image。
步骤1:构建图像
$ docker compose build --pull --no-cache
步骤2:启动孟菲斯。DEV经纪和REST Gateway
启动Memphis.dev broker和REST gateway。请注意,孟菲斯 - 雷斯特门口服务取决于孟菲斯经纪服务,因此也将启动经纪人服务。
$ docker compose up -d memphis-rest-gateway
[+] Running 4/4
⠿ Network mongodb-debezium-cdc-example_default Created 0.0s
⠿ Container mongodb-debezium-cdc-example-memphis-metadata-1 Healthy 6.0s
⠿ Container mongodb-debezium-cdc-example-memphis-1 Healthy 16.8s
⠿ Container mongodb-debezium-cdc-example-memphis-rest-gateway-1 Started
步骤3:在孟菲斯中创建一个站和相应的用户。dev
消息将传递到孟菲斯的站点。它们等同于消息经纪人使用的主题。将您的浏览器指向http://localhost:9000/。单击页面底部的root链接的登录。
使用root(用户名)和孟菲斯(密码)登录。
按照向导创建一个名为todo-cdc events的站点。
创建一个名为todocdcservice的用户,具有相同的密码值。
单击向导完成之前:
单击“转到站概述”以转到站概述页面。
步骤4:启动印刷消费者
我们使用Memphis.dev Python SDK创建了一个consumer script,该consumer script对TODO-CDC-Events Station进行了调查并将消息打印到控制台。
$ docker compose up -d printing-consumer
[+] Running 3/3
⠿ Container mongodb-debezium-cdc-example-memphis-metadata-1 Hea... 0.5s
⠿ Container mongodb-debezium-cdc-example-memphis-1 Healthy 1.0s
⠿ Container printing-consumer Started 1.4s
步骤5:启动和配置MongoDB
为了捕获更改,必须启用MongoDB的replication功能。有几个步骤:
-
必须设置副本集名称。这可以由passing the name of a replica set在命令行或配置文件中完成。在Docker Compose file中,我们使用命令行参数REPLET RS0运行mongoDB来设置副本集名称。
-
使用复制并启用授权时,必须向每个副本实例提供一个通用的密钥文件。我们在MongoDB文档中的instructions之后生成了一个密钥文件。然后,我们通过包含密钥文件来扩展官方MongoDB图像的built an image。
-
一旦MongoDB运行,就需要初始化复制品集。我们在启动上配置实例的use a script。脚本调用replSetInitiate命令,其中包含复制品集中每个MongoDB实例的IP地址和端口的列表。此命令使MongoDB实例相互交流并选择一个领导者。
一般而言,复制品集用于提高可靠性(高可用性)。您发现的大多数文档都描述了如何设置具有多个mongoDB实例的复制品集。在我们的情况下,Debezium s MongoDB连接器从复制功能中脱离了数据更改事件。尽管我们通过配置副本集的步骤,但我们仅使用一个mongodb实例。
todo item generator script每半秒就会创建一个新的汤托物品。场值是随机生成的。这些项目被添加到名为“ todo_items”的mongodb集合中。
在Docker组成的文件中,TODO ITEM GENTATOR脚本配置为依赖于以健康状态运行的MongoDB实例,并成功完成了数据库设置脚本。通过启动TODO ITEM GENERATOR脚本,Docker Compose还将启动MongoDB并运行数据库设置脚本。
$ docker compose up -d todo-generator
[+] Running 3/3
⠿ Container mongodb Healthy 8.4s
⠿ Container mongodb-database-setup Exited 8.8s
⠿ Container mongodb-todo-generator Started 9.1s
步骤6:启动Debezium Server
需要开始的最后一个服务是Debezium Server。该服务器通过Java properties file配置了MongoDB和HTTP客户端连接器的源连接器:
debezium.sink.type=http
debezium.sink.http.url=http://memphis-rest-gateway:4444/stations/todo-cdc-events/produce/single
debezium.sink.http.time-out.ms=500
debezium.sink.http.retries=3
debezium.sink.http.authentication.type=jwt
debezium.sink.http.authentication.jwt.username=todocdcservice
debezium.sink.http.authentication.jwt.password=todocdcservice
debezium.sink.http.authentication.jwt.url=http://memphis-rest-gateway:4444/
debezium.source.connector.class=io.debezium.connector.mongodb.MongoDbConnector
debezium.source.mongodb.connection.string=mongodb://db
debezium.source.mongodb.user=root
debezium.source.mongodb.password=mongodb
debezium.source.offset.storage.file.filename=data/offsets.dat
debezium.source.offset.flush.interval.ms=0
debezium.source.topic.prefix=tutorial
debezium.format.key=json
debezium.format.value=json
quarkus.log.console.json=false
大多数选项都是不言自明的。 HTTP客户端接收器URL值得详细解释。 Memphis.dev REST gateway希望以以下格式接收以路径的邮政请求:
/stations/{station}/proce/{ventity}
{station}占位符被替换为将消息发送到的电台的名称。 {数量}占位符被值单(对于单个消息)或批次(对于多个消息)代替。
作为邮政请求的有效载荷传递消息(s)。 REST网关支持三种消息格式(纯文本,JSON或协议缓冲区)。内容类型标头字段的值(text/application/json,application/x-protobuf)确定如何解释有效负载。
DeBezium Server的HTTP客户端接收器会产生与这些模式一致的REST请求。请求使用帖子动词,每个请求包含单个JSON编码的消息作为有效载荷,并且将内容类型的标头设置为应用程序/JSON。我们使用todo-cdc events作为站名称和端点URL中的单个数量值来路由消息,并指示其余网关应如何解释请求:
http://memphis-rest-gateway:4444/stations/todo-cdc-events/produce/single
debezium.sink.http.authentication.type = jwt属性指示HTTP客户端接收器应使用JWT身份验证。用户名和密码属性是不言而喻的,但是debezium.sink.http.authentication.jwt.url属性值得一些解释。使用 /auth /Authenticate端点获取初始令牌,而使用单独的 /auth /Refreshtoken端点对身份验证进行刷新。 HTTP客户端中的JWT身份验证将适当的端点附加到给定的基本网址。
Debezium Server可以从以下命令开始:
$ docker compose up -d debezium-server
步骤7:确认系统正在起作用
检查MEMPHIS.DEV WEB UI中的TODO-CDC-Events Station概述屏幕,以确认生产者和消费者已连接并发送消息。
And, print the logs for the printing-consumer container:
消息:
bytearray(b'{"schema":{"type":"struct","fields":[{"type":"string","optional":true,"name":"io.debezium.data.Json","version":1,"field":"before"},{"type":"string","optional":true,"name":"io.debezium.data.Json","version":1,"field":"after"},{"type":"struct","fields":[{"type":"array","items":{"type":"string","optional":false},"optional":true,"field":"removedFields"},{"type":"string","optional":true,"name":"io.debezium.data.Json","version":1,"field":"updatedFields"},{"type":"array","items":{"type":"struct","fields":[{"type":"string","optional":false,"field":"field"},{"type":"int32","optional":false,"field":"size"}],"optional":false,"name":"io.debezium.connector.mongodb.changestream.truncatedarray","version":1},"optional":true,"field":"truncatedArrays"}],"optional":true,"name":"io.debezium.connector.mongodb.changestream.updatedescription","version":1,"field":"updateDescription"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false,incremental"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":true,"field":"sequence"},{"type":"string","optional":false,"field":"rs"},{"type":"string","optional":false,"field":"collection"},{"type":"int32","optional":false,"field":"ord"},{"type":"string","optional":true,"field":"lsid"},{"type":"int64","optional":true,"field":"txnNumber"},{"type":"int64","optional":true,"field":"wallTime"}],"optional":false,"name":"io.debezium.connector.mongo.Source","field":"source"},{"type":"string","optional":true,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"name":"event.block","version":1,"field":"transaction"}],"optional":false,"name":"tutorial.todo_application.todo_items.Envelope"},"payload":{"before":null,"after":"{\\"_id\\": {\\"$oid\\": \\"645fe9eaf4790c34c8fcc2ec\\"},\\"creation_timestamp\\": {\\"$date\\": 1684007402475},\\"due_date\\": {\\"$date\\": 1684266602475},\\"description\\": \\"GMZVMKXVKOWIOEAVRYWR\\",\\"completed\\": false}","updateDescription":null,"source":{"version":"2.3.0-SNAPSHOT","connector":"mongodb","name":"tutorial","ts_ms":1684007402000,"snapshot":"false","db":"todo_application","sequence":null,"rs":"rs0","collection":"todo_items","ord":1,"lsid":null,"txnNumber":null,"wallTime":1684007402476},"op":"c","ts_ms":1684007402478,"transaction":null}}')
CDC消息的格式
传入消息的格式为JSON。消息有两个顶级字段(架构和有效负载)。该架构描述了记录架构(字段名称和类型),而有效载荷描述了记录的更改。有效负载对象本身包含两个字段(前后),指示更改前后的记录值。
对于mongodb,debezium服务器将记录编码为串行的JSON:
{
"before" : null,
"after" : "{\\"_id\\": {\\"$oid\\": \\"645fe9eaf4790c34c8fcc2ed\\"},\\"creation_timestamp\\": {\\"$date\\": 1684007402978},\\"due_date\\": {\\"$date\\": 1684266602978},\\"description\\": \\"buy milk\\",\\"completed\\": false}"
}
这将对消息的下游处理有影响,我们将在本系列的未来博客文章中描述。
恭喜!现在,您有一个工作示例,说明如何使用Debezium Server从MongoDB数据库捕获数据更改事件,然后将事件转移到孟菲斯。dev进行下游处理。
第3部分即将出来!如果您想通知,请确保订阅右侧的孟菲斯。
如果您错过了第1部分:
Part 1: Integrating Debezium Server and Memphis.dev for Streaming Change Data Capture (CDC) Events
关注我们以获取最新更新!
Github ⢠Docs ⢠Discord
Join 4500+ others and sign up for our data engineering newsletter.