使用SQL查询KAFKA中的实时数据
#sql #database #datascience #kafka

Apache Kafka分布式流媒体平台,您可以存储和处理实时数据流。它通常在现代数据体系结构中用于捕获和分析用户与Web和移动应用程序以及物联网设备数据,日志和系统指标的交互。它通常用于实时数据处理,数据管道和事件驱动的应用程序。但是,查询存储在Kafka中的数据可能具有挑战性,尤其是对于SQL比Kafka's native APIs更舒适的用户。这是流式SQL引擎和数据库可以有所帮助的地方。实际上可以直接在流数据上运行SQL。

在本文中,您将学习为什么流媒体sql是有益的,如何在kafka主题中使用RisingWave查询数据的工具 如何执行实时数据的一些基本分析。

学习目标ð

您将在整篇文章中学习以下内容:

  • 基于SQL的流处理。
  • 了解为什么SQL是查询Kafka的关键。
  • SQL比Kafka本机API的其他好处是什么?
  • 与SQL一起查询Kafka的现实世界示例用例。
  • 使用实体视图和SQL查询分析订单输送系统性能。

为什么基于SQL的流处理?

有很多方法可以处理和操纵数据。我们可以通过脚本语言,在Excel电子表格中,甚至可以通过在Java,C ++,Python或其他一些语言中编写低级代码来完成此操作。

实际上,流处理中有三个选项:

  • 低级代码或API。
  • 基于SQL的处理。
  • 基于UI的构建块,以更高的定义进行转换。

我认为,SQL是最好的解决方案 - 当您考虑整体功率,速度和易用性时,其他两个选择之间的良好折衷。它使我们进入了Streaming SQL方法,该方法扩展了SQL,并能够处理实时数据流。

此外,与Kafka合作的挑战之一是如何有效地分析和从KAFKA主题中存储的大量数据中提取见解。传统的批处理处理方法,例如Hadoop MapReduceApache Spark,可能是缓慢而昂贵的,可能不适合实时分析。
为了应对这一挑战,您可以使用KAFKA使用SQL查询来实时分析和从数据中提取见解。

kafka上的SQL提供了一些好处

用SQL查询Kafka可以提供使用Kafka的本机API的几个好处。这是您可能想使用SQL查询Kafka的一些原因:

熟悉SQL :许多开发人员和数据分析师对SQL比Kafka的本机API更熟悉。使用SQL可以使这些用户更容易与Kafka进行交互并查询存储在Kafka主题中的数据。

来自Kafka API复杂性的抽象:Kafka的本机API可以很复杂,并且需要大量的样板代码来读取,写入和操纵数据。使用SQL可以提供一个抽象层,以简化与Kafka的相互作用并隐藏Kafka API的复杂性。

标准化:SQL是一种标准查询语言,在行业中广泛使用。使用SQL查询KAFKA可以提供与Kafka交互的标准化方法,从而更容易将KAFKA与支持SQL的其他系统和工具集成。

灵活性:SQL提供了广泛的查询功能,包括过滤,排序,汇总和加入数据。使用SQL查询Kafka可以在查询和分析Kafka主题中存储的数据时提供更大的灵活性。

SQL也很丰富。使用案例语句使用来定义的定义过滤,定义列转换并进行有条件的操作。可以通过 ed ed和汇总的不同类型的对象以及组。尽管使用数据库,您通常会加入表格,而在流媒体案例中,您可以加入流,窗口和caches 以产生结果。在SQL中这样做很容易。

大多数流媒体数据库技术都使用SQL出于以下原因:RisingWaveMaterializeKsqlDBApache Flink,等等,等等。这篇文章解释了how to choose the right streaming database

与SQL查询Kafka的现实世界示例用例是什么?

使用SQL有很多用于查询流数据的现实世界用例,在这里,我分别使用RisingWave列出了其中一些。

RisingWave是用于流处理的开源分布式SQL数据库。 RisingWave通过本机更改数据捕获连接到MySQL和PostgreSQL来源,接受来自Apache Kafka,Apache PulsarAmazon KinesisRedpanda和数据库等来源的数据。它使用了实体视图的概念,涉及缓存查询操作的结果,并且对于长期运行的流处理查询非常有效。

Query Real Time Data in Kafka Using SQL

  1. Real-time ad performance analysis在网站或移动应用程序上的某些用户互动上。

  2. Server performance anomaly detection自动化可能会使DevOps团队改变生活。

  3. Social media platforms events processing并分析实时活动。

  4. Monitoring live stream metrics,例如视频质量和查看计数。

基本上,您可以在building faster any real-time应用程序中使用此方法。

分析订单交付性能(DEMO)

分析订单交付性能对于任何电子商务业务至关重要。了解订单的快速有效地可以帮助识别瓶颈,提高客户满意度并最终取得收入。例如,为了分析食品订单交付性能,我们可以利用Kafka流以及RisingWave流数据库上的SQL查询来实时提取和分析数据。

在演示教程中,我们将利用以下GitHub repository,假设使用Docker compose设置所有必要的东西。

使用此配置,Docker启动了一个演示集群,其中包括所有上升波组件,包括前端节点,计算节点,元数据节点和MinIO。工作负载生成器将开始生成随机模拟数据,并将其馈入Kafka主题。在此演示群集中,实体视图的数据将存储在Minio实例中。

我们有一个名为delivery_orders的Kafka主题,其中包含在电子商务网站上订购的每个订单的事件。每个事件都包括有关该顺序的信息,例如order IDrestaurant IDdelivery status

在你开始之前

要完成本教程,您需要以下内容:

步骤1:设置演示群

首先,克隆RisingWave存储库到您当地的环境。

git clone https://github.com/risingwavelabs/risingwave.git

然后,integration_tests/delivery目录并从docker compose file启动演示群。

cd risingwave/integration_tests/delivery
docker compose up -d

确保所有容器都启动并运行!

步骤2:创建KAFKA的数据流源

要将RisingWave连接到Kafka,我们需要配置新的data ingestion source

打开一个新的SQL Shell,我们将使用Postgres Interactive terminal psql进行运行查询和从Risingwave检索结果。然后,创建一个Kafka source,以允许RisingWave访问delivery_orders主题中的消息。

CREATE SOURCE delivery_orders_source (
    order_id BIGINT,
    restaurant_id BIGINT,
    order_state VARCHAR,
    order_timestamp TIMESTAMP
) WITH (
    connector = 'kafka',
    topic = 'delivery_orders',
    properties.bootstrap.server = 'message_queue:29092',
    scan.startup.mode = 'earliest'
) ROW FORMAT JSON;

步骤3:定义一个实现的视图

现在,我们已经将RisingWave连接到Kafka流,但是RisingWave尚未开始消耗数据。要仅提取我们感兴趣的数据并加快查询,我们需要定义materialized views。创建了实现的视图后,RisingWave将开始消耗Kafka主题的数据。

让我们假设我们要在过去15分钟内实时计算从特定餐厅创建的总订单数量。我们可以使用以下SQL查询来实现这一目标:

CREATE MATERIALIZED VIEW restaurant_orders AS
SELECT
    window_start,
    restaurant_id,
    COUNT(*) AS total_order
FROM 
    HOP(delivery_orders_source, order_timestamp, INTERVAL '1' MINUTE, INTERVAL '15' MINUTE)
WHERE 
    order_state = 'CREATED'
GROUP BY
    restaurant_id,
    window_start;

让我们看一下我们刚刚创建的实现视图,它使用hop()时间窗口函数来安排使用order_timestamp Timestamp创建订单的时间之间的时间间隔,而窗口大小为15分钟。

步骤4.在Kafka主题上运行流媒体查询

现在我们可以编写一个简单的流询问,从Kafka获取消息。

SELECT * FROM restaurant_orders WHERE restaurant_id = 1;

您应该看到RisingWave已执行查询并返回结果:

    window_start     | restaurant_id | total_order
---------------------+---------------+-------------
 2023-03-18 16:50:00 |             1 |         120
 2023-03-18 17:20:00 |             1 |          80
 2023-03-18 17:30:00 |             1 |          14
 2023-03-18 18:17:00 |             1 |          18
 2023-03-18 18:41:00 |             1 |         166
 2023-03-18 18:49:00 |             1 |         176
 2023-03-18 19:24:00 |             1 |           1
 2023-03-19 12:22:00 |             1 |           5
 2023-03-19 12:55:00 |             1 |         188
 2023-03-19 13:02:00 |             1 |         214
 2023-03-19 13:46:00 |             1 |         191
 2023-03-18 16:35:00 |             1 |           8
 2023-03-18 16:55:00 |             1 |         147
 2023-03-18 19:08:00 |             1 |          70
 2023-03-18 19:18:00 |             1 |          16
 2023-03-19 12:32:00 |             1 |          42
 2023-03-19 13:19:00 |             1 |         207
 2023-03-19 13:55:00 |             1 |         195
 2023-03-19 14:01:00 |             1 |         188
 2023-03-18 17:06:00 |             1 |         172
 2023-03-18 17:15:00 |             1 |         120
 2023-03-18 17:28:00 |             1 |          22
 2023-03-18 18:36:00 |             1 |         139
 2023-03-18 18:46:00 |             1 |         188
 2023-03-18 18:58:00 |             1 |         144
 2023-03-18 19:12:00 |             1 |          44
 2023-03-19 12:52:00 |             1 |         174

我们还可以使用SQL查询来计算更复杂的指标,例如交付成功率。这是一个示例SQL查询,可以计算每家餐厅的交付成功率:

CREATE MATERIALIZED VIEW restaurant_delivery_success_rate AS
SELECT
    restaurant_id,
    SUM(CASE WHEN order_state = 'DELIVERED' THEN 1 ELSE 0 END) / COUNT(*) AS delivery_success_rate
FROM 
    delivery_orders_source
GROUP BY
    restaurant_id;

此查询将restaurant_id分组,并计算每家餐厅的交付成功率。交付成功率计算为分配的订单数量除以每家餐厅的订单总数。

 restaurant_id | delivery_success_rate
---------------+-----------------------
             0 |                     0
             1 |                     1
             2 |                     0

更多分析您可以根据流数据结构进行。例如,您可以在一定时间内计算订单的平均交货时间和成本。

可选步骤:停止演示集群

完成后,您将删除容器和通过运行docker compose down生成的数据。

结论

通过使用SQL查询实时提取和分析数据,我们可以对交付性能获得宝贵的见解并确定改进领域。我们还可以使用实体视图来实时存储和更新计算的指标,从而使我们能够快速检索和可视化数据。

下一步

在整个演示中,我们仅介绍了Kafka上数据流的一些简单查询。也可以使用Rishingwave的实现视图将结果导出到另一个Kafka主题,并且还有许多其他功能,您可以在其中加入两个流,处理晚期事件,分析大量的子流和聚合输出。您可以通过检查前面提供的用例场景来观察它们。

相关资源

建议的内容

社区

1 at 17 Join the Risingwave Community

关于作者

访问我的个人博客:www.iambobur.com 1