用Python,Dash和RisingWave可视化实时数据
#python #sql #data #ui

实时数据对于企业做出快速决策很重要。在视觉上看到这些数据可以帮助更快地做出决策。我们可以使用各种数据应用程序或仪表板创建数据的视觉表示。 Dash python库,它提供了各种内置组件,用于创建交互式图表,图形,表格和其他UI元素。 RisingWave是基于 SQL的流数据库用于实时数据处理。本文将解释如何使用Python,开源项目DASH和RisingWave对实时数据进行可视化。

如何实时可视化数据

我们知道,实时数据是从不同的数据源收集的,即立即生成和处理的数据。来源可以是典型的数据库,例如PostgresMySQL,以及像Kafka这样的消息经纪人。实时数据可视化由几个步骤组成,首先我们摄入然后过程,最后在仪表板上显示此数据

在订单交付数据的情况下,实时可视化此数据可以为餐厅或送货服务的性能提供宝贵的见解。例如,我们可以使用实时数据来监视交付订单需要多长时间,在交货过程中识别瓶颈,并随着时间的推移跟踪订单的变化。在处理不断变化的数据时,很难跟踪正在发生的一切并识别模式或趋势。使用仪表板和RisingWave之类的免费工具,我们可以创建交互式可视化,使我们能够探索和分析这种不断变化的数据。

在使用数据时,您可能会想到的第一种编程语言是Python,因为具有一系列库。 Dash是其中之一,它允许我们仅使用Python代码创建具有丰富且可自定义的用户界面的数据应用程序。 Dash建立在烧瓶,Plotly.js和React.js的顶部,它们是流行的Web开发工具,因此您不需要了解HTML,CSS或其他JavaScript Frameworks。

使用RisingWave,我们可以消耗various sources的数据流,创建了针对复杂查询进行优化的实质视图,而query real-time data using SQL。由于RisingWave与PostgreSQL与电线兼容,因此我们可以使用[psycopg2](https://pypi.org/project/psycopg2/)(Python的PostgreSQL客户端库)驱动程序连接到RisingWave,进行查询操作。请参阅下一节。

visualize data in real-time

可视化订单输送数据演示

在演示教程中,我们将利用以下GitHub repositoryâulisingwave演示,我们假设使用Docker compose建立了所有必要的东西。您可以在官方网站上查看其他方式到达run RisingWave。我们有一个名为delivery_orders的Kafka主题,其中包含在食品交货网站上的每个订单的活动。每个事件都包括有关该订单的信息,例如order IDrestaurant IDdelivery status。 Workload Generator(称为Datagen的Python脚本)将连续模拟随机模拟数据的生成,并将其流入Kafka主题。实际上,可以用来自Web应用程序或后端服务的数据替换此模拟数据。

在你开始之前

要完成本教程,您需要以下内容。

我在Windows OS,Docker Desktop和Python上测试的演示3.10.11安装了。

步骤1:设置RisingWave的演示群

首先,克隆RisingWave样品存储库到您当地的环境。

git clone https://github.com/risingwavelabs/risingwave.git

然后,integration_tests/delivery目录并从docker compose file启动演示群。

cd risingwave/integration_tests/delivery
docker compose up -d

确保所有容器都启动并运行!

步骤2:安装DASH和PSYCOPG2库

要安装DASH,您也可以参考网站上的Dash installation指南。基本上,我们需要通过运行以下pip install命令来安装两个库(dash本身和Pandas):

# This also brings along the Plotly graphing library.
# Plotly is known for its interactive charts
# Plotly Express requires Pandas to be installed too.
pip install dash pandas

我们还应该安装[psycopg2](https://pypi.org/project/psycopg2/)与RisingWave流数据库进行交互:

pip install psycopg2-binary

步骤3:创建数据源

要使用RisingWave摄入实时数据,您首先需要设置data source。在演示项目中,应将KAFKA定义为数据源。我们将使用Python脚本在同一integration_tests/delivery目录中创建一个名为create-a-source.py的新文件,在该目录中我们连接到RisingWave,并创建一个表格来消费并持久使用delivery_orders KAFKA主题。您可以简单地将以下代码复制到新文件中。

import psycopg2

conn = psycopg2.connect(database="dev", user="root", password="", host="localhost", port="4566") # Connect to RisingWave.

conn.autocommit = True # Set queries to be automatically committed.

with conn.cursor() as cur:
    cur.execute("""
CREATE TABLE delivery_orders_source (
    order_id BIGINT,
    restaurant_id BIGINT,
    order_state VARCHAR,
    order_timestamp TIMESTAMP
) WITH (
    connector = 'kafka',
    topic = 'delivery_orders',
    properties.bootstrap.server = 'message_queue:29092',
    scan.startup.mode = 'earliest'
) ROW FORMAT JSON;""") # Execute the query.

conn.close() # Close the connection.

创建文件后,您将运行python create-a-source.py,并且将在RisingWave中创建源表。

步骤4:创建一个实现的视图

接下来,我们创建了一个与我们创建表的方式相似的新的materialized view,我们创建了一个名为create-a-materialized-view.py的新文件,并使用psycopg2库运行SQL查询。也可以将最后两个步骤合并到一个python脚本文件中。

import psycopg2

conn = psycopg2.connect(database="dev", user="root", password="", host="localhost", port="4566")
conn.autocommit = True

with conn.cursor() as cur:
    cur.execute("""CREATE MATERIALIZED VIEW restaurant_orders_view AS
SELECT
    window_start,
    restaurant_id,
    COUNT(*) AS total_order
FROM
    HOP(delivery_orders_source, order_timestamp, INTERVAL '1' MINUTE, INTERVAL '15' MINUTE)
WHERE
        order_state = 'CREATED'
GROUP BY
    restaurant_id,
    window_start;""")

conn.close()

在SQL查询上方,在过去15分钟内实时计算从特定餐厅创建的总订单数量,并在物有的视图中缓存结果。如果发生任何数据更改或新的Kafka主题到来,RisingWave 自动增加并更新实体视图的结果。设置数据源后,实现的视图,您可以开始摄入数据并使用DASH可视化这些数据。

步骤5:构建仪表板应用程序

现在,我们构建了一个破折号应用程序来查询和可视化RisingWave中我们拥有的物质视图内容。您可以关注教程Dash in 20 mins,以了解DASH的基本构件。我们的示例应用程序代码以表格和图形格式显示餐厅订单数据。请参阅dash-example.py中的下面的Python代码:

import psycopg2
import pandas as pd
import dash
from dash import dash_table
from dash import dcc
import dash_html_components as html
import plotly.express as px

# Connect to the PostgreSQL database
conn = psycopg2.connect(database="dev", user="root", password="", host="localhost", port="4566")

# Retrieve data from the materialized view using pandas
df = pd.read_sql_query("SELECT window_start, restaurant_id, total_order FROM restaurant_orders_view;", conn)

# Create a Dash application
app = dash.Dash(__name__)

# Define layout
app.layout = html.Div(children=[
    html.H1("Restaurant Orders Table"),
    dash_table.DataTable(id="restaurant_orders_table", columns=[{"name": i, "id": i} for i in df.columns], data=df.to_dict("records"), page_size=10),
    html.H1("Restaurant Orders Graph"),
    dcc.Graph(id="restaurant_orders_graph", figure=px.bar(df, x="window_start", y="total_order", color="restaurant_id", barmode="group"))
])

# Run the application
if __name__ == '__main__':
    app.run_server(debug=True)

此代码摘要从 restaurant_orders_view 使用pandas从 restaurant_orders_view 检索数据,并使用 dash_table.DataTable 和使用dcc.Graph的条形图在仪表板表中显示它。表格和条形图的列具有与实体视图('window_start','total_order'和'restaurant_id')中的列相对应的,而行与实体视图中的数据相对应。

步骤6:查看结果

您可以通过在Web浏览器中运行上述dash-example.py脚本来运行该应用程序,并导航到 http://localhost:8050/ (您将在终端中收到一条消息,告诉您转到此链接)。

Dash materialized view visualization

概括

总的来说,Dash是创建需要复杂UI和数据可视化功能的数据分析视图的强大工具,所有这些都使用Python编程语言的简单性和优雅性。当我们将其与RisingWave流数据库一起使用时,我们会深入了解实时数据,并可以帮助我们做出更明智的决策并采取行动以优化性能。

相关资源

建议的内容

社区

°1âMySQL

关于作者