今天,对实时数据处理和分析的需求比以往任何时候都高。现代数据生态系统需要工具和技术,不仅可以捕获,存储和处理大量数据,而且还应实时提供见解。本文将涵盖 mongodb,kafka,debezium和Risingwave 的强大组合,以分析实时数据,它们如何共同工作以及使用这些开源技术的好处。
了解Mongodb,Kafka,Debezium和Risingwave
在我们深入了解实施细节之前,重要的是要了解这些工具是什么以及它们的作用。
- Debezium :是开源分布式平台,用于更改数据捕获(CDC)。 CDC是一种跟踪写入源数据库的数据更改并自动同步目标数据库的技术。例如,Debezium的MongoDB Connector可以在实时发生数据库和集合中的文档更改,将这些更改记录为Kafka主题中的事件。
- RisingWave :是分布式开源SQL数据库用于流处理。它的主要目标是使实时运行的应用程序更容易,更便宜。当它采用流数据时,RisingWave对每个新数据进行了直通计算,并迅速更新结果。例如,RisingWave接受来自sources(例如Kafka)的数据,为复杂数据构建了实质性的视图,您可以使用SQL查询它们。
分析实时数据:管道
一旦我们了解了每个工具,让我们讨论MongoDB,Kafka,Debezium和RisingWave如何共同努力以创建有效的实时数据分析管道。这些技术是免费使用的,并且易于集成。
- MongoDB中的数据生成和存储:我们的数据管道始于MongoDB中数据的生成和存储。鉴于MongoDB的灵活数据模型,可以以多种格式存储数据,使其适用于不同的数据源。
- 使用Debezium 捕获数据:管道的下一步是使用Debezium在MongoDB中捕获更改(所有插入,更新和删除)。 Debezium为MongoDB提供了一个CDC connector,可以捕获数据库中的行级变化。一旦捕获更改,它们就会发送到卡夫卡进行处理。
- 使用Kafka的数据流:Kafka从Debezium接收数据,然后照顾将其流式传输给消费者。在我们的情况下,我们将数据与RisingWave一起消费。
- 使用RisingWave的数据处理:最后,通过RisingWave接收并处理流数据。 RisingWave为复杂的事件处理和流分析提供了高级SQL接口。处理后的数据可以传递给BI and Data analytics platforms或用于实时决策,异常检测,预测分析等。
该管道的关键优势在于其处理大量数据,实时过程事件并产生最小延迟的见解的能力。例如,该解决方案可用于构建全球酒店搜索平台,以获取有关酒店价格和可用性的实时更新。当该平台的主要数据库中的费率或可用性变化时,Debezium捕获了这种变化并将其流式传输到Kafka,而RisingWave可以进行趋势分析。这样可以确保用户在搜索酒店时始终看到最新信息。
如何集成QuickStart
本指南向您展示了如何从技术上配置MongoDB Debezium Connector将数据从MongoDB发送到Kafka主题并将数据发送到RisingWave。
完成本指南后,您应该了解如何使用这些工具来创建实时数据处理管道,并在RisingWave中创建data source和materialized view,以使用SQL查询分析数据。
要完成本指南中的步骤,您必须下载/克隆并在GitHub上的existing sample project上工作。该项目为方便和一致性使用Docker。它提供了一个容器化的开发环境,其中包括您需要构建示例数据管道的服务。
在你开始之前
要在本地环境中运行该项目,您需要以下内容。
- Git
- 确保您在环境中安装了Docker。
- 确保在您的环境中安装了PostgreSQL交互式终端PSQL。有关详细说明,请参见Download PostgreSQL。
开始项目
Docker-Compose文件在Docker容器中启动以下服务:
- RisingWave数据库。
- MongoDB,被配置为副本集。
- debezium。
- Python应用程序生成MongoDB的随机数据。
- Redpanda安装了MongoDB Debezium连接器。我们将Redpanda用作Kafka经纪人。
- kafka Connect UI以管理和监视我们的连接器。
启动项目简单地从教程目录中运行以下命令
docker compose up
启动该项目时,Docker下载需要运行的任何图像。您可以在docker-compose.yaml文件中查看服务的完整列表。
数据流
App.py生成随机用户数据(名称,地址和电子邮件),并将其插入MongoDB users
Collection。因为我们配置了Debezium MongoDB连接器,以指向MongoDB数据库和要监视的集合,因此它可以实时捕获数据,并将它们沉入Redpanda中,将其降低到称为dbserver1.random_data.users
的Kafka主题中。接下来的步骤,我们消费Kafka事件,并使用RisingWave创建实现的视图。
创建数据源
要使用RisingWave消费Kafka主题,我们首先需要设置data source。在演示项目中,应将KAFKA定义为数据源。打开一个新的终端窗口并运行以连接到RisingWave:
psql -h localhost -p 4566 -d dev -U root
作为一个数据库,您可以直接为Kafka主题创建表:
CREATE TABLE users (_id JSONB PRIMARY KEY, payload JSONB) WITH (
connector = 'kafka',
kafka.topic = 'dbserver1.random_data.users',
kafka.brokers = 'message_queue:29092',
kafka.scan.startup.mode = 'earliest'
) ROW FORMAT DEBEZIUM_MONGO_JSON;
具有实体视图的数据标准化
为了使用户的数据归一化,我们在RisingWave中创建了一个实现的视图:
CREATE MATERIALIZED VIEW normalized_users AS
SELECT
payload ->> 'name' as name,
payload ->> 'email' as email,
payload ->> 'address' as address
FROM
users;
实体视图的主要好处是,它们保存了执行复杂的连接,聚合或计算所需的计算。
,没有每次查询数据时运行这些操作,而是要预先计算结果。
查询数据
使用SELECT
命令在物化视图中查询数据。让我们看看normalized_users
物质观点的最新结果:
SELECT
*
FROM
normalized_users
LIMIT
10;
响应您的查询,可能的结果集(随机数据)可能看起来像:
id | 名称 | 电子邮件 | 地址 |
---|---|---|---|
1 | John Doe | mailto:john.doe@example.com | 1234 ELM ST,美国Anytown,USA |
2 | 简·史密斯 | mailto:jane.smith@example.com | 2345 Oak St,Anytown,USA |
3 | 鲍勃·约翰逊 | mailto:bob.johnson@example.com | 3456 Pine St,Anytown,USA |
4 | 爱丽丝·威廉姆斯 | 邮件:RisingWave | 4567 Maple St,Anytown,USA |
5 | Charlie Brown | mailto:charlie.brown@example.com | 5678 Cedar St,Anytown,USA |
6 | 艾米丽·戴维斯(Emily Davis) | mailto:emily.davis@example.com | 6789 Birch St,Anytown,USA |
7 | 弗兰克·米勒(Frank Miller) | mailto:frank.miller@example.com | 7890 Spruce St,Anytown,USA |
8 | 格蕾丝·威尔逊 | mailto:grace.wilson@example.com | 8901 Ash St,Anytown,USA |
9 | 亨利·摩尔 | mailto:henry.moore@example.com | 9012 Alder St,Anytown,USA |
10 | 伊莎贝拉·泰勒 | 邮件:RisingWave | 0123 Cherry St,Anytown,USA |
概括
这是用于使用MongoDB,Kafka,Debezium和RisingWave进行实时数据处理管道的基本设置。可以根据您的特定需求对设置进行调整,例如添加更多Kafka主题,跟踪多个MongoDB集合的变化,实现更复杂的数据处理逻辑强>,或在Risingwave中组合multiple streams。
相关资源
建议的内容
- Query Real-Time Data in Kafka Using SQL.
- How Streaming database differs from a Traditional database?
社区
°1âJoin the Risingwave Community
关于作者
- 请访问我的博客:âwww.iambobur.com
- 在Twitter上关注我:BoburUmurzokov50