使用Hazelcast和Pulsar的实时流处理
介绍
实时流处理的最有用的功能之一是结合各种技术的优势和优势,以提供独特的开发人员体验和一种实时处理数据的有效方法。 Hazelcast是一个实时分布式计算和存储平台,可针对实时事件流和传统数据源持续低延迟查询,聚合和状态计算。 Apache Pulsar是一个实时的多租户地理复制分布式酒吧消息传递和流媒体平台,用于实时工作负载,每小时处理数百万个活动。
但是,实时流处理并不是一件容易的事,尤其是在组合具有大量数据存储在外部数据存储中的多数数据以提供上下文和即时结果的数据时。当涉及使用时,Hazelcast可以用于实时流数据,静止数据或两者的组合,使用SQL直接查询流和批处理数据源,用于微服务的分布式协调,从一个区域复制数据,从到同一区域的数据中心之间的另一个或之间。
虽然Apache Pulsar可用于消息传递和流式用例,以取代多种产品,并提供其功能的超集。 Apache Pulsar是一个云本地的多元统一消息平台,可替换Apache Kafka,RabbitMQ,MQTT和旧消息传递平台。 Apache Pulsar为Hazelcast提供了无限的消息总线,可作为任何和所有数据源的即时来源和下沉。
Prerequisites
我们构建了一个应用程序,在该应用程序中,我们将Apache Pulsar的数据吸收到Hazelcast中,然后实时处理。要运行此应用程序,请确保您的系统具有以下组件:
如果您有Macos&Homebrew,则可以使用以下命令安装Hazelcast:
酿造淡淡的Hazelcast/Hz
酿造安装hazelcast@5.2.1
检查Hazelcast是否已安装:
hz -v
然后开始一个本地群集:
Hz开始
您应该在控制台中看到以下内容:
信息:[192.168.1.164]:5701 [dev] [5.2.1]
成员{size:1,ver:1} [
成员[192.168.1.164]:5701-4221D540-E34E-4FF2-8AD3-41E060B895CE这个
]
您可以使用以下命令在Docker中启动Pulsar:
Docker Run -IT -P 6650:6650 -P 8080:8080 \
--mount source=pulsardata,target=/pulsar/data \
--mount source=pulsarconf,target=/pulsar/conf \
apachepulsar/pulsar:2.11.0 bin/pulsar standalone
要安装管理中心,请使用以下方法之一,具体取决于您的操作系统:
酿造淡淡的Hazelcast/Hz
酿造安装hazelcast-management-center@5.2.1
检查安装了管理中心:
Hz -MC -V
数据采集:
对于我们的应用程序,我们希望通过Airnow数据提供商从美国各地摄入空气质量阅读。如果您想了解有关空气质量的更多信息,请查看AirNow的信息。
来源:https://docs.airnowapi.org/
使用简单的Java应用程序,我们向Airnow API进行休息,该API为美国各地的主要邮政编码提供空气质量阅读。 Java应用程序将JSON编码的Airnow数据发送到“ Airquality” Pulsar主题。
来源:https://github.com/tspannhw/spring-pulsar-airquality
我们还具有Java Pulsar功能,从空气标题中接收每个事件,并根据其类型的空气质量阅读将其解析为不同的主题。这包括PM2.5,PM10和臭氧。
来源:https://github.com/tspannhw/pulsar-airquality-function
示例空气数据
{“ dateObserved”:“ 2023-01-19”,“ houtobserved”:12,“ localtimezone”:“ est”,“ reportingingarea”:“ philadelphia”,“ cattecode”,“ cattecode”:“ pa”:“ pa”,“ latitude”,“ latitude”: 39.95,“经度”:-75.151,“ parametername”:“ pm10”,“ aqi”:19,“ category”:{“ number”:1,“ name”:“ good”,“ frome”,“ fromeproperties”:{}}},, “额外properties”:{}}
示例臭氧数据
{“ dateObserved”:“ 2023-01-19”,“ HOROBSEVER”:12,“ LOCALTIMEZONE”:“ EST”,“ reportingingarea”:“ Philadelphia”,“ cattecode”,“ cattecode”:“ PA”,“ parametername”:parametername“:: “ O3”,“ Latitude”:39.95,“经度”:-75.151,“ AQI”:8}
示例PM10数据
{“ dateObserved”:“ 2023-01-19”,“ HOROBSEVER”:12,“ LOCALTIMEZONE”:“ EST”,“ reportingingarea”:“ Philadelphia”,“ cattecode”,“ cattecode”:“ PA”,“ parametername”:parametername“:: “ PM10”,“ Latitude”:39.95,“经度”:-75.151,“ AQI”:19}
示例PM2.5数据
{“ dateObserved”:“ 2023-01-19”,“ HOROBSEVER”:12,“ LOCALTIMEZONE”:“ EST”,“ reportingingarea”:“ Philadelphia”,“ cattecode”,“ cattecode”:“ PA”,“ parametername”:parametername“:: “ PM2.5”,“ Latitude”:39.95,“经度”:-75.151,“ AQI”:54}
数据处理
为了处理收集的数据,我们使用theHazelcast Pulsar connector模块从pulsar主题中获取数据(注意:您可以使用相同的连接器写入PULSAR主题)。使用Hazelcast使我们能够在指定的流项目窗口上实时计算各种聚合功能(总和,AVG等)。 PULSAR连接器使用Pulsar客户端库,该库有两种不同的pulsar主题读取消息的方式。这些是消费者API和读取器API,都使用构建器模式(用于更多informationclick here)。
在您的POM文件中,导入以下依赖项。
<groupId>com.hazelcast</groupId>
<artifactId>hazelcast</artifactId>
<version>5.1.4</version>
<groupId>com.hazelcast.jet.contrib</groupId>
<artifactId>pulsar</artifactId>
<version>0.1</version>
<groupId>org.apache.pulsar</groupId>
<artifactId>pulsar-client</artifactId>
<version>2.10.1</version>
我们创建了一个pulsarsources.pulsarreaderbuilder实例,以连接到位于pulsar的Pulsar群集:// localhost:6650
streamSourcesource = pulsarsources.pulsarreaderbuilder(
)topicName,
() -> PulsarClient.builder().serviceUrl("pulsar://localhost:6650").build(),
() -> Schema.JSON(Event.class),
Message::getValue).build();
然后,我们创建一个管道,以通过滑动窗口和汇总计数从源读取,然后写入logger:
管道p = pipeline.create();
P.Readfrom(source)
.withnativetimestamps(0)
.groupingkey(event :: getuser)
.window(滑动(seconds.tomillis(60),seconds.tomillis(30)))
.aggregate(counting())
.writeto(sinks.logger(wr-> string.format(
) "At %s Pulsar got %,d messages in the previous minute from %s.",
TIME\_FORMATTER.format(LocalDateTime.ofInstant(
Instant.ofEpochMilli(wr.end()), ZoneId.systemDefault())),
wr.result(), wr.key())));
jobconfig cfg = new jobconfig()
.setProcessingGuarantee(ProcessingGuarantee.EXACTLY\_ONCE)
.setSnapshotIntervalMillis(SECONDS.toMillis(1))
.setName("pulsar-airquality-counter");
HazelCastInstance Hz = Hazelcast.bootstrappedInstance();
hz.getJet()。newjob(p,cfg);
您可以从IDE运行以前的代码(在这种情况下,它将创建自己的Hazelcast成员并在其上运行工作),也可以在先前启动的Hazelcast成员上运行此工作(在这种情况下,您需要创建一个可运行的jar,包括运行它所需的所有依赖项):
MVN软件包
bin/hz-cli提交目标/pulsar-example-1.0-snapshot.jar
取消工作并关闭Hazelcast群集:
bin/hz-cli取消pulsar-message-counter
Hz-Stop
结论
在这篇博客文章中,我们演示了如何结合各种技术的优势和优势,以提供独特的开发人员体验和一种实时处理数据的有效方法。我们将空气质量数据从Apache Pulsar流到Hazelcast,在那里我们实时处理数据。云技术的不断上升趋势,对实时智能应用程序的需求以及进行大规模处理数据的紧迫性使我们进入了实时流处理的新章节,在这些新章节中,衡量了潜伏期,而不是以几分钟的分钟,而是以毫秒为单位,并在毫秒内进行了。 。
Hazelcast允许您快速构建资源有效的实时应用程序。您可以从小边缘设备到大型云实例的任何规模部署它。一组Hazelcast节点共享数据存储和计算负载,这些节点可以在上下动态扩展。当您在群集中添加新节点时,数据会自动重新平衡整个群集,并且当前运行的计算任务(称为作业)通过处理保证金来快照其状态和规模。 PULSAR允许您使用您对消息传递协议的选择来快速在多种类型的消费者和生产者之间分发事件,并充当通用消息中心。 Pulsar将计算与存储分开,从而可以进行动态缩放和快速数据的有效处理。 Streamnative是由Apache Pulsar和Apache簿记员的原始创作者组成的公司。溪流为云和前提提供了Apache Pulsar的完整企业体验。
有关Hazelcast的更多信息
-
启动Viridian Serverless Cluster:无服务器是一种托管云服务,可提供付费的定价模型。无服务器集群自动尺度,以提供您的应用程序所需的资源。您只为您的应用程序所消耗的资源付费。
有关Apache Pulsar的更多信息
-
学习脉冲星的基础知识:虽然此博客没有涵盖Pulsar的基础知识,但有很多可用的资源可帮助您了解更多信息。如果您是Pulsar的新手,我们建议您服用self-paced Pulsar courses或instructor-led Pulsar training由Pulsar的一些原始创作者开发的。这将使您开始使用Pulsar并立即加速流。
-
在几分钟内旋转一个脉冲星群:如果您想尝试构建微服务而无需自己设置脉冲星群,请立即注册StreamNative Cloud。流云是一种简单,快速且具有成本效益的方式,可以在公共云中运行脉冲星。
-
加入Apache Pulsar Slack
https://github.com/tspannhw/pulsar-hazelcast-airquality
https://github.com/tspannhw/spring-pulsar-airquality
https://github.com/tspannhw/pulsar-airquality-function
有关作者的更多信息
蒂姆·班恩
溪流的开发人员倡导者
https://github.com/tspannhw/SpeakerProfile/blob/main/README.md
fawaz ghali
Hazelcast的主要开发人员倡导者