第3部分:转换MongoDB CDC事件消息
#mongodb #memphisdev #cdc #dataprocessing

这是有关使用孟菲斯构建现代事件驱动系统的一系列博客文章的第三部分。

在我们的last blog post中,我们引入了使用Debezium ServerMemphis.devMongoDB数据库中捕获变更数据捕获(CDC)事件的参考实现。在帖子的结尾,我们注意到MongoDB记录被序列化为Debezium CDC消息中的字符串,例如:

{
    "schema" : ...,

"payload" : {
"before" : null,

"after" : "{\\"_id\\": {\\"$oid\\": \\"645fe9eaf4790c34c8fcc2ed\\"},\\"creation_timestamp\\": {\\"$date\\": 1684007402978},\\"due_date\\": {\\"$date\\": 1684266602978},\\"description\\": \\"buy milk\\",\\"completed\\": false}",

...
}
}

我们想使用孟菲斯的Schemaverse功能来检查有关预期模式的消息。不匹配模式的消息被路由到一个死信件站,因此它们不会影响下游消费者。如果这一切听起来像古希腊,那就不用担心!我们将在下一篇博客文章中解释细节。

要使用诸如schemaverse之类的功能,我们需要将MongoDB记录作为JSON文档进行序列化。在这篇博客文章中,我们描述了MongoDB CDC管道的修改,该管道添加了变压器服务,以将MongoDB记录列为JSON文档。


解决方案的概述

previous solution由六个组件组成:

  1. todo Item Generator :每0.5秒插入MongoDB集合中随机生成的TODO项目。每个待办事项都包含一个描述,创建时间戳,可选的到期日和完成状态。

  2. mongodb :配置一个包含一个集合(todo_items)的单个数据库。

  3. Debezium Server :配置了MongoDB源和HTTP客户端接收器连接器的Debezium Server实例。

  4. MEMPHIS.DEV REST GATEWAY :使用开箱即用的配置。

  5. MEMPHIS.DEV :配置了一个站(todo-cdc- evests)和单个用户(todocdcservice)。

  6. 打印消费者:使用MEMPHIS.DEV PYTHON SDK消耗消息并将其打印到控制台的脚本。

mongocdcd example

在此迭代中,我们添加了两个其他组件:

  • 变压器服务:一种消耗来自todo-cdc-events站的消息的transformer服务,对MongoDB记录进行了挑战,并将其推到了清洁的todo-cdc-events站。

  • 清洁打印消费者:印刷消费者的第二个实例将被推到清洁的todo-cdc-events站。

我们更新的架构看起来像这样:

data flow diagram


深入了解变压器服务

消息变压器服务的骨架

我们的transformer服务使用Memphis.dev Python SDK。让我们浏览变压器的实现。我们的变压器的主要()方法首先连接到Memphis.dev broker。连接详细信息是从环境变量中获取的。根据Twelve-Factor App manifesto的建议,使用环境变量传递了主机,用户名,密码,输入站名称和输出站名称。

async def main():
    try:
        print("Waiting on messages...")
        memphis = Memphis()
        await memphis.connect(host=os.environ[HOST_KEY],
                              username=os.environ[USERNAME_KEY],
                              password=os.environ[PASSWORD_KEY])

建立连接后,我们就会创建消费者和生产者对象。在Memphis.dev中,消费者和生产者有名字。这些名称出现在Memphis.dev UI中,为系统操作提供了透明度。

print("Creating consumer")
        consumer = await memphis.consumer(station_name=os.environ[INPUT_STATION_KEY],
                                          consumer_name="transformer",
                                          consumer_group="")

        print("Creating producer")
        producer = await memphis.producer(station_name=os.environ[OUTPUT_STATION_KEY],
                                          producer_name="transformer")

消费者API使用callback function设计模式。从经纪人中撤出消息时,提供的功能将以消息列表为参数调用。

  print("Creating handler")
        msg_handler = create_handler(producer)

        print("Setting handler")
        consumer.consume(msg_handler)

设置回调后,我们启动了Asyncio事件循环。此时,变压器服务暂停并等待,直到可以从经纪人那里拉出消息。

保持主线程的活力,因此消费者将继续接收数据

await asyncio.Event().wait()


创建消息处理程序功能

消息处理程序的创建函数采用生产者对象并返回回调函数。由于回调函数仅采用一个参数,因此我们使用closure pattern在创建它时隐式将生产者传递给MSG_HANDLER函数。

称为msg_handler函数时将传递三个参数:消息列表,错误(如果发生)以及由字典组成的上下文。我们的处理程序循环在消息上循环,调用每个函数的转换功能,使用生产者将消息发送到第二个站,并确认已处理了该消息。在孟菲斯。dev中,直到消费者承认它们之前,不会将消息标记为传递。如果在处理过程中发生错误,则可以防止消息删除。

def create_handler(producer):
    async def msg_handler(msgs, error, context):
        try:
            for msg in msgs:
                transformed_msg = deserialize_mongodb_cdc_event(msg.get_data())
                await producer.produce(message=transformed_msg)
                await msg.ack()
        except (MemphisError, MemphisConnectError, MemphisHeaderError) as e:
            print(e)
            return

    return msg_handler

消息变压器功能

现在,我们获得了服务的肉:消息变压器功能。消息有效载荷(由get_data()方法返回)存储为bytearray对象。我们使用Python JSON库将消息对python集合(列表和dict)和原始类型(int,float,str和none)的层次结构进行序列化。

def deserialize_mongodb_cdc_event(input_msg):
    obj = json.loads(input_msg)

我们希望对象具有有效载荷属性,其中一个对象作为值。然后,该对象具有两个属性(之前和之后),它们是无或包含串行的JSON对象的字符串。我们再次使用JSON库将字符串替换为对象。

 if "payload" in obj:
        payload = obj["payload"]

        if "before" in payload:
            before_payload = payload["before"]
            if before_payload is not None:
                payload["before"] = json.loads(before_payload)

        if "after" in payload:
            after_payload = payload["after"]
            if after_payload is not None:
                payload["after"] = json.loads(after_payload)

最后,我们将整个JSON记录重新化,然后将其转换为Bytearray以进行传输到经纪人。

  output_s = json.dumps(obj)
    output_msg = bytearray(output_s, "utf-8")
    return output_msg

hooray!我们的对象现在看起来像:

{
"schema" : ...,

"payload" : {
"before" : null,

"after" : {
"_id": { "$oid": "645fe9eaf4790c34c8fcc2ed" },
"creation_timestamp": { "$date": 1684007402978 },
"due_date": { "$date" : 1684266602978 },
"description": "buy milk",
"completed": false
},

...
}
}

运行变压器服务

如果您遵循previous blog post中的7个步骤,则只需执行三个步骤即可。启动变压器服务并验证其工作是否有效:

步骤8:启动变压器服务

$ docker compose up -d cdc-transformer
[+] Running 3/3
 ⠿ Container mongodb-debezium-cdc-example-memphis-metadata-1  Hea...                                                             0.5s
 ⠿ Container mongodb-debezium-cdc-example-memphis-1           Healthy                                                            1.0s
 ⠿ Container cdc-transformer                                  Started                                                            1.3s

步骤9:启动第二个打印消费者

$ docker compose up -d cleaned-printing-consumer
[+] Running 3/3
 ⠿ Container mongodb-debezium-cdc-example-memphis-metadata-1  Hea...                                                             0.5s
 ⠿ Container mongodb-debezium-cdc-example-memphis-1           Healthy                                                            1.0s
 ⠿ Container cleaned-printing-consumer                        Started                                                            1.3s

步骤10:检查孟菲斯UI

当变压器开始向MEMPHIS.DEV生成消息时,将创建一个名为“清洁todo-cdc-事件”的第二个站。您应该在孟菲斯的车站概述页面上看到这个新车站。

Check memphis ui

“清洁todo-cdc events”页面的详细信息页面应显示与生产者,印刷消费者和转换消息相关的变压器:

Image description

恭喜!现在,我们准备在下一篇博客文章中使用schemaverse来解决验证消息。订阅我们的新闻通讯以保持关注!


如果您错过了第1和2部分:

Part 2: Change Data Capture (CDC) for MongoDB with Debezium and Memphis.dev

Part 1: Integrating Debezium Server and Memphis.dev for Streaming Change Data Capture (CDC) Events


最初由Memphis.dev的开发人员倡导者RJ Nowling在Memphis.dev上发布


关注我们以获取最新更新!
GithubDocsDiscord