使用MongoDB,Kafka,Debezium和Risingwave的实时数据处理管道
#sql #database #体系结构 #kafka

今天,对实时数据处理和分析的需求比以往任何时候都高。现代数据生态系统需要工具和技术,不仅可以捕获,存储和处理大量数据,而且还应实时提供见解。本文将涵盖 mongodb,kafka,debezium和Risingwave 的强大组合,以分析实时数据,它们如何共同工作以及使用这些开源技术的好处。

了解Mongodb,Kafka,Debezium和Risingwave

在我们深入了解实施细节之前,重要的是要了解这些工具是什么以及它们的作用。

  1. Debezium :是开源分布式平台,用于更改数据捕获(CDC)。 CDC是一种跟踪写入源数据库的数据更改并自动同步目标数据库的技术。例如,Debezium的MongoDB Connector可以在实时发生数据库和集合中的文档更改,将这些更改记录为Kafka主题中的事件。
  2. RisingWave :是分布式开源SQL数据库用于流处理。它的主要目标是使实时运行的应用程序更容易,更便宜。当它采用流数据时,RisingWave对每个新数据进行了直通计算,并迅速更新结果。例如,RisingWave接受来自sources(例如Kafka)的数据,为复杂数据构建了实质性的视图,您可以使用SQL查询它们。

分析实时数据:管道

一旦我们了解了每个工具,让我们讨论MongoDB,Kafka,Debezium和RisingWave如何共同努力以创建有效的实时数据分析管道。这些技术是免费使用的,并且易于集成

efficient real-time data analysis pipeline

  1. MongoDB中的数据生成和存储:我们的数据管道始于MongoDB中数据的生成和存储。鉴于MongoDB的灵活数据模型,可以以多种格式存储数据,使其适用于不同的数据源。
  2. 使用Debezium 捕获数据:管道的下一步是使用Debezium在MongoDB中捕获更改(所有插入,更新和删除)。 Debezium为MongoDB提供了一个CDC connector,可以捕获数据库中的行级变化。一旦捕获更改,它们就会发送到卡夫卡进行处理。
  3. 使用Kafka的数据流:Kafka从Debezium接收数据,然后照顾将其流式传输给消费者。在我们的情况下,我们将数据与RisingWave一起消费。
  4. 使用RisingWave的数据处理:最后,通过RisingWave接收并处理流数据。 RisingWave为复杂的事件处理和流分析提供了高级SQL接口。处理后的数据可以传递给BI and Data analytics platforms或用于实时决策,异常检测,预测分析等。

该管道的关键优势在于其处理大量数据,实时过程事件并产生最小延迟的见解的能力。例如,该解决方案可用于构建全球酒店搜索平台,以获取有关酒店价格和可用性的实时更新。当该平台的主要数据库中的费率或可用性变化时,Debezium捕获了这种变化并将其流式传输到Kafka,而RisingWave可以进行趋势分析。这样可以确保用户在搜索酒店时始终看到最新信息。

如何集成QuickStart

本指南向您展示了如何从技术上配置MongoDB Debezium Connector将数据从MongoDB发送到Kafka主题并将数据发送到RisingWave。

完成本指南后,您应该了解如何使用这些工具来创建实时数据处理管道,并在RisingWave中创建data sourcematerialized view,以使用SQL查询分析数据。

要完成本指南中的步骤,您必须下载/克隆并在GitHub上的existing sample project上工作。该项目为方便和一致性使用Docker。它提供了一个容器化的开发环境,其中包括您需要构建示例数据管道的服务

在你开始之前

要在本地环境中运行该项目,您需要以下内容。

开始项目

Docker-Compose文件在Docker容器中启动以下服务:

  • RisingWave数据库。
  • MongoDB,被配置为副本集。
  • debezium。
  • Python应用程序生成MongoDB的随机数据。
  • Redpanda安装了MongoDB Debezium连接器。我们将Redpanda用作Kafka经纪人。
  • kafka Connect UI以管理和监视我们的连接器。

启动项目简单地从教程目录中运行以下命令

docker compose up

启动该项目时,Docker下载需要运行的任何图像。您可以在docker-compose.yaml文件中查看服务的完整列表。

数据流

App.py生成随机用户数据(名称,地址和电子邮件),并将其插入MongoDB users Collection。因为我们配置了Debezium MongoDB连接器,以指向MongoDB数据库和要监视的集合,因此它可以实时捕获数据,并将它们沉入Redpanda中,将其降低到称为dbserver1.random_data.users的Kafka主题中。接下来的步骤,我们消费Kafka事件,并使用RisingWave创建实现的视图。

创建数据源

要使用RisingWave消费Kafka主题,我们首先需要设置data source。在演示项目中,应将KAFKA定义为数据源。打开一个新的终端窗口并运行以连接到RisingWave:

psql -h localhost -p 4566 -d dev -U root

作为一个数据库,您可以直接为Kafka主题创建表:

CREATE TABLE users (_id JSONB PRIMARY KEY, payload JSONB) WITH (
    connector = 'kafka',
    kafka.topic = 'dbserver1.random_data.users',
    kafka.brokers = 'message_queue:29092',
    kafka.scan.startup.mode = 'earliest'
) ROW FORMAT DEBEZIUM_MONGO_JSON;

具有实体视图的数据标准化

为了使用户的数据归一化,我们在RisingWave中创建了一个实现的视图:

CREATE MATERIALIZED VIEW normalized_users AS
SELECT
    payload ->> 'name' as name,
    payload ->> 'email' as email,
    payload ->> 'address' as address
FROM
    users;

实体视图的主要好处是,它们保存了执行复杂的连接,聚合或计算所需的计算。

,没有每次查询数据时运行这些操作,而是要预先计算结果。

查询数据

使用SELECT命令在物化视图中查询数据。让我们看看normalized_users物质观点的最新结果:

SELECT
    *
FROM
    normalized_users
LIMIT
    10;

响应您的查询,可能的结果集(随机数据)可能看起来像:

id 名称 电子邮件 地址
1 John Doe mailto:john.doe@example.com 1234 ELM ST,美国Anytown,USA
2 简·史密斯 mailto:jane.smith@example.com 2345 Oak St,Anytown,USA
3 鲍勃·约翰逊 mailto:bob.johnson@example.com 3456 Pine St,Anytown,USA
4 爱丽丝·威廉姆斯 邮件:RisingWave 4567 Maple St,Anytown,USA
5 Charlie Brown mailto:charlie.brown@example.com 5678 Cedar St,Anytown,USA
6 艾米丽·戴维斯(Emily Davis) mailto:emily.davis@example.com 6789 Birch St,Anytown,USA
7 弗兰克·米勒(Frank Miller) mailto:frank.miller@example.com 7890 Spruce St,Anytown,USA
8 格蕾丝·威尔逊 mailto:grace.wilson@example.com 8901 Ash St,Anytown,USA
9 亨利·摩尔 mailto:henry.moore@example.com 9012 Alder St,Anytown,USA
10 伊莎贝拉·泰勒 邮件:RisingWave 0123 Cherry St,Anytown,USA

概括

这是用于使用MongoDB,Kafka,Debezium和RisingWave进行实时数据处理管道的基本设置。可以根据您的特定需求对设置进行调整,例如添加更多Kafka主题,跟踪多个MongoDB集合的变化,实现更复杂的数据处理逻辑,或在Risingwave中组合multiple streams

相关资源

建议的内容

社区

°1âJoin the Risingwave Community

关于作者