使用Java,Spring和流数据库查询实质性的视图
#database #java #springboot #microservices

实现的视图是数据库管理系统中的一个强大工具,它允许用户预先计算和存储查询结果,提高性能并减少对昂贵计算的需求。但是,在现代快速的业务环境中,数据库中的数据可能会经常更改,需要频繁更新以实现的视图。流数据库使我们能够连续摄入,并实时处理来自不同数据源的大量数据。在本文中,我们将探索如何使用Java,Spring和流数据库(例如RisingWave)查询实质性的视图。

学习目标

您将在整篇文章中学习以下内容:

  • 如何摄入kafka主题并运行实时查询。
  • 如何使用流数据库创建实现的视图。
  • 构建一个Java应用程序,该应用程序读取并从物有的视图中揭示数据。

建议的内容

在开始这篇文章之前,我强烈建议您还阅读其他帖子以了解how a streaming database differs from a traditional databasewhy we need to use materialized views to query microserviceshow to query real-time data in Kafka using SQL

样本方案:分析订单交付性能

订购交付性能对于任何在线销售产品的企业都是至关重要的指标。它反映了订单履行过程的效率和有效性,并直接影响客户满意度。为了分析订单交付绩效,企业需要跟踪关键绩效指标(KPI),例如交货时间,订单准确性和交付成本。在本文的范围内,我们将创建一个使用Java和Spring Boot构建的示例order-delivery application(您可以在GitHub上找到源代码),并学习如何使用流数据库并实现的视图来实时分析食品订单交付性能。

在建筑图下面说明了总体数据流:

Analyzing order delivery performance

  1. 我们有一个名为delivery_orders的Kafka主题,其中包含在食品交付网站上放置的每个订单的活动。每个事件都包括有关该订单的信息,例如order IDrestaurant IDdelivery status。 Workload Generator(称为Datagen的Python脚本)将连续模拟随机模拟数据的生成,并将其流入Kafka主题。实际上,可以用来自应用程序的数据替换此模拟数据。
  2. RisingWave Steaming数据库连接到Kafka经纪人,并使用其Kafka connector读取Kafka主题。它还将来自源的数据持续到RisingWave table
  3. RisingWave允许我们使用SQL 来查询流数据,我们可以通过减少昂贵的连接和计算的需求来创建实质性的视图,可针对复杂查询进行优化。随着新的更改到达,存储在物质视图中的数据将自动更新。
  4. 那么,我们从样本Spring boot Web API中查询实现的视图。

查询实质性的视图演示

让我们假设我们有一个名为restaurant_orders_view的物有意义的视图,该视图计算了过去15分钟内实时从不同餐馆创建的总订单数量。现在,我们的任务是将这种实现的视图结果揭示为API endpoint/orders/count-total-restaurant-orders的订单服务中。在Java中,您可以使用JDBC查询实质性的视图,这是用于连接数据库的Java API。此外,您可以利用弹簧框架来简化与JDBC合作的过程并构建REST API。

由于RisingWave与PostgreSQL互相兼容,您可以使用PostgreSQL JDBC驱动程序连接到RisingWave并与Java应用程序与RisingWave进行交互。您也可以将其他客户库用于不同的编程语言(NodeJSPythonGo)。

下一个部分向您展示了一个逐步指南,以查询Java和Spring中的实现视图。

开始之前

本教程包括设置两个项目:

  • 我们将在当地环境上旋转github上现有的RisingWave全面功能齐全的demo cluster,该环境由多个Risingwave组件组成。为了简化此任务,它利用docker-compose.yaml文件,其中包括Kafka消息经纪的其他容器和数据生成服务。
  • Spring Boot应用程序,一个具有必要依赖项的Maven项目,我们还将在本地运行。

要完成本教程,您需要以下内容:

步骤1:设置演示群

首先,克隆RisingWave的存储库到您当地的环境。

git clone https://github.com/risingwavelabs/risingwave.git

然后,导航到integration_tests/delivery目录,并从docker compose file启动演示集群。

cd risingwave/integration_tests/delivery
docker compose up -d

确保所有容器都启动并运行!

步骤2:复制并打开Spring Boot应用程序。

设置了流数据库后,我们将运行Spring Boot应用程序。另外,叉/克隆项目并在Java编辑器中打开。

git clone https://github.com/Boburmirzo/risingwave-java-spring.git

步骤3:安装项目。

然后从项目根文件夹中执行mvn clean install。在Maven pom.xml文件中,我们将postgresql依赖项和spring-boot-starter-jdbc驱动程序依赖性添加到我们的项目中。这些依赖项将为春季与JDBC合作提供必要的类和接口。

步骤4:运行演示后波应用程序。

现在,我们可以从编辑器中运行主要的DemoRisingWaveApplication.java。运行应用程序后,您访问端口8080上的端点或在浏览器上打开此URL。 http://localhost:8080/orders/count-total-restaurant-orders。您将从Risingwave获得实现的视图响应。

[["2023-05-06T05:28:00.000+00:00",1,1],["2023-05-06T05:33:00.000+00:00",1,3...]]

分解项目文件

让我们分解项目中的一些重要文件以了解每个角色。

连接到RisingWave

要查询Spring Boot应用程序中的实现视图,首先需要通过配置Spring Boot application.yml文件来建立与流数据库的连接。 datasource 部分设置了一个PostgreSQL数据库连接,并带有kude12的RisingWave数据库URL(它指向在RisingWave Demo cluster上运行的dev数据库),一个“ root”的用户名和一个空密码。

server:
  port: 8080
spring:
  profiles:
    active: dev
  application:
    name: materialized-view-api
  datasource:
    url: jdbc:postgresql://localhost:4566/dev
    username: root
    password:
...

创建数据源并实现的视图

接下来,在Startup.java文件中,我们有三个SQL语句,当Spring Boot应用程序启动时,将执行这些sql语句,以创建一个源为risingwave中的kafka,删除现有的实体视图,并通过从应用程序资源目录中的view.json文件中读取此数据,以包含一个列表列表的Interallied视图,从而创建新的。例如,以下是一个SQL语句,该语句在RisingWave流数据库中创建一个名为 delivery_orders_source 的表。该表有四个列: order_id restaurant_id order_state order_timestamp

public static final String CREATE_RISINGWAVE_SOURCE_TABLE = "CREATE TABLE IF NOT EXISTS delivery_orders_source (\n" +
        "    order_id BIGINT,\n" +
        "    restaurant_id BIGINT,\n" +
        "    order_state VARCHAR,\n" +
        "    order_timestamp TIMESTAMP\n" +
        ") WITH (\n" +
        "    connector = 'kafka',\n" +
        "    topic = 'delivery_orders',\n" +
        "    properties.bootstrap.server = 'message_queue:29092',\n" +
        "    scan.startup.mode = 'earliest'\n" +
        ") ROW FORMAT JSON;";

该语句还包括 WITH 子句,该条款指定了表的其他选项。在这种情况下, connector 选项设置为 kafka ,这表明该表将连接到KAFKA主题。 topic 选项设置为 delivery_orders ,指定了kafka主题的名称。

查询使用JDBCTEMPLATE的实现视图

Spring提供了许多功能,可帮助您使用数据库,包括 JdbcTemplate 类,这简化了执行SQL语句和处理结果的过程。要查询春季的实现视图,您可以使用 JdbcTemplate 类执行SQL查询,该查询从实现的视图中选择数据,然后从结果集中提取数据。

String createSql = String
    .format(CREATE_MATERIALIZED_VIEW, materializeView.getName(), materializeView.getQuery());
jdbcTemplate.execute(createSql);

查询JPA

的实现视图

一旦创建了实现的视图,也可以使用Spring Data JPA存储库在Java中查询。您可以创建一个名为RestaurantOrdersView的新的entity class,该26映射到RisingWave中物化的视图表中的列。

@Entity
@Getter
@Setter
@Table(name = "restaurant_orders_view")
public class RestaurantOrdersView implements Serializable {

    @Id
    @Column(name = "restaurant_id")
    private Long restaurantId;

    @Column(name = "window_start")
    private Timestamp windowStart;

    @Column(name = "total_order")
    private BigInteger totalOrder;
}

使用实体类,我们可以创建一个弹簧数据JPA存储库,该存储库可以使用标准的JPA查询方法查询实现的视图,也可以使用本机SQL查询。例如,以下代码检索总餐厅订单:

@Component
public class OrderDeliveryRepositoryImpl implements OrderDeliveryRepository {

    @PersistenceContext
    private EntityManager entityManager;

    @Override
    public List<RestaurantOrdersView> countTotalRestaurantOrders() {
        Query query = entityManager.createNativeQuery("SELECT * FROM restaurant_orders_view WHERE restaurant_id = 1");
        return query.getResultList();
    }
}

然后可以从我们的OrderDeliveryService调用此方法countTotalRestaurantOrders我们可以添加额外的业务逻辑并将其传递给OrderDeliveryController,以通过剩余端点使此数据可用。

@RestController
@RequiredArgsConstructor
@RequestMapping("/orders")
@CrossOrigin(value = "*", allowedHeaders = "*")
public class OrderDeliveryController {

    private final OrderDeliveryService orderDeliveryService;

    @GetMapping("/count-total-restaurant-orders")
    @CrossOrigin(value = "*", allowedHeaders = "*")
    public List<RestaurantOrdersView> getCountTotalRestaurantOrders() {
        return orderDeliveryService.countTotalRestaurantOrders();
    }
}

结论

总而言之,使用流数据库,我们可以实时从一个或多个数据源进行摄入流数据,您可以合并多个流并创建实质性的视图。 Spring Boot为与流行数据源集成(例如流数据库)提供了内置支持。我们已经看到企业如何通过流数据库,Java和Spring实时分析订单交付性能,从而使他们能够识别瓶颈并提高其订单履行过程的效率和有效性。

相关资源

社区

ðJoin the Risingwave Community

关于作者

请访问我的博客:âwww.iambobur.com