Apache Kafka是分布式流媒体平台,您可以存储和处理实时数据流。它通常在现代数据体系结构中用于捕获和分析用户与Web和移动应用程序以及物联网设备数据,日志和系统指标的交互。它通常用于实时数据处理,数据管道和事件驱动的应用程序。但是,查询存储在Kafka中的数据可能具有挑战性,尤其是对于SQL比Kafka's native APIs更舒适的用户。这是流式SQL引擎和数据库可以有所帮助的地方。实际上可以直接在流数据上运行SQL。
在本文中,您将学习为什么流媒体sql是有益的,如何在kafka主题中使用RisingWave到查询数据的工具 如何执行实时数据的一些基本分析。
学习目标ð
您将在整篇文章中学习以下内容:
- 基于SQL的流处理。
- 了解为什么SQL是查询Kafka的关键。
- SQL比Kafka本机API的其他好处是什么?
- 与SQL一起查询Kafka的现实世界示例用例。
- 使用实体视图和SQL查询分析订单输送系统性能。
为什么基于SQL的流处理?
有很多方法可以处理和操纵数据。我们可以通过脚本语言,在Excel电子表格中,甚至可以通过在Java,C ++,Python或其他一些语言中编写低级代码来完成此操作。
实际上,流处理中有三个选项:
- 低级代码或API。
- 基于SQL的处理。
- 基于UI的构建块,以更高的定义进行转换。
我认为,SQL是最好的解决方案 - 当您考虑整体功率,速度和易用性时,其他两个选择之间的良好折衷。它使我们进入了Streaming SQL方法,该方法扩展了SQL,并能够处理实时数据流。
此外,与Kafka合作的挑战之一是如何有效地分析和从KAFKA主题中存储的大量数据中提取见解。传统的批处理处理方法,例如Hadoop MapReduce或Apache Spark,可能是缓慢而昂贵的,可能不适合实时分析。
为了应对这一挑战,您可以使用KAFKA使用SQL查询来实时分析和从数据中提取见解。
kafka上的SQL提供了一些好处
用SQL查询Kafka可以提供使用Kafka的本机API的几个好处。这是您可能想使用SQL查询Kafka的一些原因:
熟悉SQL :许多开发人员和数据分析师对SQL比Kafka的本机API更熟悉。使用SQL可以使这些用户更容易与Kafka进行交互并查询存储在Kafka主题中的数据。
来自Kafka API复杂性的抽象:Kafka的本机API可以很复杂,并且需要大量的样板代码来读取,写入和操纵数据。使用SQL可以提供一个抽象层,以简化与Kafka的相互作用并隐藏Kafka API的复杂性。
标准化:SQL是一种标准查询语言,在行业中广泛使用。使用SQL查询KAFKA可以提供与Kafka交互的标准化方法,从而更容易将KAFKA与支持SQL的其他系统和工具集成。
灵活性:SQL提供了广泛的查询功能,包括过滤,排序,汇总和加入数据。使用SQL查询Kafka可以在查询和分析Kafka主题中存储的数据时提供更大的灵活性。
SQL也很丰富。使用案例语句使用来定义的定义过滤,定义列转换并进行有条件的操作。可以通过 ed ed和汇总的不同类型的对象以及组。尽管使用数据库,您通常会加入表格,而在流媒体案例中,您可以加入流,窗口和caches 以产生结果。在SQL中这样做很容易。
大多数流媒体数据库技术都使用SQL出于以下原因:RisingWave,Materialize,KsqlDB,Apache Flink,等等,等等。这篇文章解释了how to choose the right streaming database。
与SQL查询Kafka的现实世界示例用例是什么?
使用SQL有很多用于查询流数据的现实世界用例,在这里,我分别使用RisingWave列出了其中一些。
RisingWave是用于流处理的开源分布式SQL数据库。 RisingWave通过本机更改数据捕获连接到MySQL和PostgreSQL来源,接受来自Apache Kafka,Apache Pulsar,Amazon Kinesis,Redpanda和数据库等来源的数据。它使用了实体视图的概念,涉及缓存查询操作的结果,并且对于长期运行的流处理查询非常有效。
-
Real-time ad performance analysis在网站或移动应用程序上的某些用户互动上。
-
Server performance anomaly detection自动化可能会使DevOps团队改变生活。
-
Monitoring live stream metrics,例如视频质量和查看计数。
基本上,您可以在building faster any real-time应用程序中使用此方法。
分析订单交付性能(DEMO)
分析订单交付性能对于任何电子商务业务至关重要。了解订单的快速有效地可以帮助识别瓶颈,提高客户满意度并最终取得收入。例如,为了分析食品订单交付性能,我们可以利用Kafka流以及RisingWave流数据库上的SQL查询来实时提取和分析数据。
在演示教程中,我们将利用以下GitHub repository,假设使用Docker compose设置所有必要的东西。
使用此配置,Docker启动了一个演示集群,其中包括所有上升波组件,包括前端节点,计算节点,元数据节点和MinIO。工作负载生成器将开始生成随机模拟数据,并将其馈入Kafka主题。在此演示群集中,实体视图的数据将存储在Minio实例中。
我们有一个名为delivery_orders
的Kafka主题,其中包含在电子商务网站上订购的每个订单的事件。每个事件都包括有关该顺序的信息,例如order ID
,restaurant ID
和delivery status
。
在你开始之前
要完成本教程,您需要以下内容:
- 确保您在环境中安装了Docker和Docker Compose。
- 确保PostgreSQL交互式终端PSQL安装在您的环境中。有关详细说明,请参见Download PostgreSQL。
步骤1:设置演示群
首先,克隆RisingWave存储库到您当地的环境。
git clone https://github.com/risingwavelabs/risingwave.git
然后,integration_tests/delivery
目录并从docker compose file启动演示群。
cd risingwave/integration_tests/delivery
docker compose up -d
确保所有容器都启动并运行!
步骤2:创建KAFKA的数据流源
要将RisingWave连接到Kafka,我们需要配置新的data ingestion source。
打开一个新的SQL Shell,我们将使用Postgres Interactive terminal psql
进行运行查询和从Risingwave检索结果。然后,创建一个Kafka source,以允许RisingWave访问delivery_orders
主题中的消息。
CREATE SOURCE delivery_orders_source (
order_id BIGINT,
restaurant_id BIGINT,
order_state VARCHAR,
order_timestamp TIMESTAMP
) WITH (
connector = 'kafka',
topic = 'delivery_orders',
properties.bootstrap.server = 'message_queue:29092',
scan.startup.mode = 'earliest'
) ROW FORMAT JSON;
步骤3:定义一个实现的视图
现在,我们已经将RisingWave连接到Kafka流,但是RisingWave尚未开始消耗数据。要仅提取我们感兴趣的数据并加快查询,我们需要定义materialized views。创建了实现的视图后,RisingWave将开始消耗Kafka主题的数据。
让我们假设我们要在过去15分钟内实时计算从特定餐厅创建的总订单数量。我们可以使用以下SQL查询来实现这一目标:
CREATE MATERIALIZED VIEW restaurant_orders AS
SELECT
window_start,
restaurant_id,
COUNT(*) AS total_order
FROM
HOP(delivery_orders_source, order_timestamp, INTERVAL '1' MINUTE, INTERVAL '15' MINUTE)
WHERE
order_state = 'CREATED'
GROUP BY
restaurant_id,
window_start;
让我们看一下我们刚刚创建的实现视图,它使用hop()时间窗口函数来安排使用order_timestamp
Timestamp创建订单的时间之间的时间间隔,而窗口大小为15分钟。
步骤4.在Kafka主题上运行流媒体查询
现在我们可以编写一个简单的流询问,从Kafka获取消息。
SELECT * FROM restaurant_orders WHERE restaurant_id = 1;
您应该看到RisingWave已执行查询并返回结果:
window_start | restaurant_id | total_order
---------------------+---------------+-------------
2023-03-18 16:50:00 | 1 | 120
2023-03-18 17:20:00 | 1 | 80
2023-03-18 17:30:00 | 1 | 14
2023-03-18 18:17:00 | 1 | 18
2023-03-18 18:41:00 | 1 | 166
2023-03-18 18:49:00 | 1 | 176
2023-03-18 19:24:00 | 1 | 1
2023-03-19 12:22:00 | 1 | 5
2023-03-19 12:55:00 | 1 | 188
2023-03-19 13:02:00 | 1 | 214
2023-03-19 13:46:00 | 1 | 191
2023-03-18 16:35:00 | 1 | 8
2023-03-18 16:55:00 | 1 | 147
2023-03-18 19:08:00 | 1 | 70
2023-03-18 19:18:00 | 1 | 16
2023-03-19 12:32:00 | 1 | 42
2023-03-19 13:19:00 | 1 | 207
2023-03-19 13:55:00 | 1 | 195
2023-03-19 14:01:00 | 1 | 188
2023-03-18 17:06:00 | 1 | 172
2023-03-18 17:15:00 | 1 | 120
2023-03-18 17:28:00 | 1 | 22
2023-03-18 18:36:00 | 1 | 139
2023-03-18 18:46:00 | 1 | 188
2023-03-18 18:58:00 | 1 | 144
2023-03-18 19:12:00 | 1 | 44
2023-03-19 12:52:00 | 1 | 174
我们还可以使用SQL查询来计算更复杂的指标,例如交付成功率。这是一个示例SQL查询,可以计算每家餐厅的交付成功率:
CREATE MATERIALIZED VIEW restaurant_delivery_success_rate AS
SELECT
restaurant_id,
SUM(CASE WHEN order_state = 'DELIVERED' THEN 1 ELSE 0 END) / COUNT(*) AS delivery_success_rate
FROM
delivery_orders_source
GROUP BY
restaurant_id;
此查询将restaurant_id
分组,并计算每家餐厅的交付成功率。交付成功率计算为分配的订单数量除以每家餐厅的订单总数。
restaurant_id | delivery_success_rate
---------------+-----------------------
0 | 0
1 | 1
2 | 0
更多分析您可以根据流数据结构进行。例如,您可以在一定时间内计算订单的平均交货时间和成本。
可选步骤:停止演示集群
完成后,您将删除容器和通过运行docker compose down
生成的数据。
结论
通过使用SQL查询实时提取和分析数据,我们可以对交付性能获得宝贵的见解并确定改进领域。我们还可以使用实体视图来实时存储和更新计算的指标,从而使我们能够快速检索和可视化数据。
下一步
在整个演示中,我们仅介绍了Kafka上数据流的一些简单查询。也可以使用Rishingwave的实现视图将结果导出到另一个Kafka主题,并且还有许多其他功能,您可以在其中加入两个流,处理晚期事件,分析大量的子流和聚合输出。您可以通过检查前面提供的用例场景来观察它们。
相关资源
建议的内容
社区
1 at 17 Join the Risingwave Community
关于作者
访问我的个人博客:www.iambobur.com 1