实时数据对于企业做出快速决策很重要。在视觉上看到这些数据可以帮助更快地做出决策。我们可以使用各种数据应用程序或仪表板创建数据的视觉表示。 Dash是 python库,它提供了各种内置组件,用于创建交互式图表,图形,表格和其他UI元素。 RisingWave是基于 SQL的流数据库用于实时数据处理。本文将解释如何使用Python,开源项目DASH和RisingWave对实时数据进行可视化。
如何实时可视化数据
我们知道,实时数据是从不同的数据源收集的,即立即生成和处理的数据。来源可以是典型的数据库,例如Postgres或MySQL,以及像Kafka这样的消息经纪人。实时数据可视化由几个步骤组成,首先我们摄入然后过程,最后在仪表板上显示此数据。
在订单交付数据的情况下,实时可视化此数据可以为餐厅或送货服务的性能提供宝贵的见解。例如,我们可以使用实时数据来监视交付订单需要多长时间,在交货过程中识别瓶颈,并随着时间的推移跟踪订单的变化。在处理不断变化的数据时,很难跟踪正在发生的一切并识别模式或趋势。使用仪表板和RisingWave之类的免费工具,我们可以创建交互式可视化,使我们能够探索和分析这种不断变化的数据。
在使用数据时,您可能会想到的第一种编程语言是Python,因为具有一系列库。 Dash是其中之一,它允许我们仅使用Python代码创建具有丰富且可自定义的用户界面的数据应用程序。 Dash建立在烧瓶,Plotly.js和React.js的顶部,它们是流行的Web开发工具,因此您不需要了解HTML,CSS或其他JavaScript Frameworks。
使用RisingWave,我们可以消耗various sources的数据流,创建了针对复杂查询进行优化的实质视图,而query real-time data using SQL。由于RisingWave与PostgreSQL与电线兼容,因此我们可以使用[psycopg2](https://pypi.org/project/psycopg2/)
(Python的PostgreSQL客户端库)驱动程序连接到RisingWave,进行查询操作。请参阅下一节。
可视化订单输送数据演示
在演示教程中,我们将利用以下GitHub repositoryâulisingwave演示,我们假设使用Docker compose建立了所有必要的东西。您可以在官方网站上查看其他方式到达run RisingWave。我们有一个名为delivery_orders
的Kafka主题,其中包含在食品交货网站上的每个订单的活动。每个事件都包括有关该订单的信息,例如order ID
,restaurant ID
和delivery status
。 Workload Generator(称为Datagen的Python脚本)将连续模拟随机模拟数据的生成,并将其流入Kafka主题。实际上,可以用来自Web应用程序或后端服务的数据替换此模拟数据。
在你开始之前
要完成本教程,您需要以下内容。
- 确保您在环境中安装了Docker和Docker Compose。
- 确保PostgreSQL交互式终端PSQL安装在您的环境中。有关详细说明,请参见Download PostgreSQL。
- 为您的操作系统下载并安装Python 3。
pip
命令将自动安装。
我在Windows OS,Docker Desktop和Python上测试的演示3.10.11安装了。
步骤1:设置RisingWave的演示群
首先,克隆RisingWave样品存储库到您当地的环境。
git clone https://github.com/risingwavelabs/risingwave.git
然后,integration_tests/delivery
目录并从docker compose file启动演示群。
cd risingwave/integration_tests/delivery
docker compose up -d
确保所有容器都启动并运行!
步骤2:安装DASH和PSYCOPG2库
要安装DASH,您也可以参考网站上的Dash installation指南。基本上,我们需要通过运行以下pip install
命令来安装两个库(dash本身和Pandas):
# This also brings along the Plotly graphing library.
# Plotly is known for its interactive charts
# Plotly Express requires Pandas to be installed too.
pip install dash pandas
我们还应该安装[psycopg2](https://pypi.org/project/psycopg2/)
与RisingWave流数据库进行交互:
pip install psycopg2-binary
步骤3:创建数据源
要使用RisingWave摄入实时数据,您首先需要设置data source。在演示项目中,应将KAFKA定义为数据源。我们将使用Python脚本在同一integration_tests/delivery
目录中创建一个名为create-a-source.py
的新文件,在该目录中我们连接到RisingWave,并创建一个表格来消费并持久使用delivery_orders
KAFKA主题。您可以简单地将以下代码复制到新文件中。
import psycopg2
conn = psycopg2.connect(database="dev", user="root", password="", host="localhost", port="4566") # Connect to RisingWave.
conn.autocommit = True # Set queries to be automatically committed.
with conn.cursor() as cur:
cur.execute("""
CREATE TABLE delivery_orders_source (
order_id BIGINT,
restaurant_id BIGINT,
order_state VARCHAR,
order_timestamp TIMESTAMP
) WITH (
connector = 'kafka',
topic = 'delivery_orders',
properties.bootstrap.server = 'message_queue:29092',
scan.startup.mode = 'earliest'
) ROW FORMAT JSON;""") # Execute the query.
conn.close() # Close the connection.
创建文件后,您将运行python create-a-source.py
,并且将在RisingWave中创建源表。
步骤4:创建一个实现的视图
接下来,我们创建了一个与我们创建表的方式相似的新的materialized view,我们创建了一个名为create-a-materialized-view.py
的新文件,并使用psycopg2
库运行SQL查询。也可以将最后两个步骤合并到一个python脚本文件中。
import psycopg2
conn = psycopg2.connect(database="dev", user="root", password="", host="localhost", port="4566")
conn.autocommit = True
with conn.cursor() as cur:
cur.execute("""CREATE MATERIALIZED VIEW restaurant_orders_view AS
SELECT
window_start,
restaurant_id,
COUNT(*) AS total_order
FROM
HOP(delivery_orders_source, order_timestamp, INTERVAL '1' MINUTE, INTERVAL '15' MINUTE)
WHERE
order_state = 'CREATED'
GROUP BY
restaurant_id,
window_start;""")
conn.close()
在SQL查询上方,在过去15分钟内实时计算从特定餐厅创建的总订单数量,并在物有的视图中缓存结果。如果发生任何数据更改或新的Kafka主题到来,RisingWave 自动增加并更新实体视图的结果。设置数据源后,实现的视图,您可以开始摄入数据并使用DASH可视化这些数据。
步骤5:构建仪表板应用程序
现在,我们构建了一个破折号应用程序来查询和可视化RisingWave中我们拥有的物质视图内容。您可以关注教程Dash in 20 mins,以了解DASH的基本构件。我们的示例应用程序代码以表格和图形格式显示餐厅订单数据。请参阅dash-example.py
中的下面的Python代码:
import psycopg2
import pandas as pd
import dash
from dash import dash_table
from dash import dcc
import dash_html_components as html
import plotly.express as px
# Connect to the PostgreSQL database
conn = psycopg2.connect(database="dev", user="root", password="", host="localhost", port="4566")
# Retrieve data from the materialized view using pandas
df = pd.read_sql_query("SELECT window_start, restaurant_id, total_order FROM restaurant_orders_view;", conn)
# Create a Dash application
app = dash.Dash(__name__)
# Define layout
app.layout = html.Div(children=[
html.H1("Restaurant Orders Table"),
dash_table.DataTable(id="restaurant_orders_table", columns=[{"name": i, "id": i} for i in df.columns], data=df.to_dict("records"), page_size=10),
html.H1("Restaurant Orders Graph"),
dcc.Graph(id="restaurant_orders_graph", figure=px.bar(df, x="window_start", y="total_order", color="restaurant_id", barmode="group"))
])
# Run the application
if __name__ == '__main__':
app.run_server(debug=True)
此代码摘要从 restaurant_orders_view
使用pandas从 restaurant_orders_view
检索数据,并使用 dash_table.DataTable
和使用dcc.Graph
的条形图在仪表板表中显示它。表格和条形图的列具有与实体视图('window_start','total_order'和'restaurant_id')中的列相对应的,而行与实体视图中的数据相对应。
步骤6:查看结果
您可以通过在Web浏览器中运行上述dash-example.py
脚本来运行该应用程序,并导航到 http://localhost:8050/
(您将在终端中收到一条消息,告诉您转到此链接)。
概括
总的来说,Dash是创建需要复杂UI和数据可视化功能的数据分析视图的强大工具,所有这些都使用Python编程语言的简单性和优雅性。当我们将其与RisingWave流数据库一起使用时,我们会深入了解实时数据,并可以帮助我们做出更明智的决策并采取行动以优化性能。
相关资源
- Real-time data analytics with Apache Superset, Redpanda, and RisingWave.
- How to monitor live stream metrics.
建议的内容
社区
°1âMySQL
关于作者
- 请访问我的博客:âwww.iambobur.com
- 在Twitter上关注我:@BoburUmurzokov