最近,我有机会尝试Apache的Kafka进行监视服务,我感到惊讶的是,您如何在几行代码中设置完整的刚起步的事件流系统。我很快意识到,我们可以在事物的中心与Kafka建立强大的系统。通知系统,分布式数据库同步,监视系统是考虑Kafka的用例时会想到的一些应用程序。
为了更深入地了解Kafka,我尝试设置一个系统监视应用程序,该应用程序查找CPU和RAM使用之类的系统统计信息,并在仪表板中可视化它们。
我使用的工具是
- kafka作为消息队列
- Golang用于几个微服务
- 仪表板的反应
- Docker和Docker编写编排
建筑学
我要去的架构是这个。
服务
服务 | 描述 |
---|---|
系统统计 | Golang服务,可捕获Host Machine的CPU和RAM使用 |
经纪人 | KAFKA经纪人,促进活动流 |
生产者 | Golang服务,从系统统计服务接收使用数据并将其推入Kafka |
消费者 | Golang服务,订阅了Kafka经纪人并将数据流到仪表板 |
仪表板 | 实时可视化系统统计信息。通过Websocket从消费者那里获取系统统计数据。使用React | 建造
tl; dr
如果您了解上述每个服务的体系结构和目的,则可以在以下存储库中找到实现
如果您决定坚持下去,我们将详细介绍这些服务。
深潜
好吧!感谢您坚持。我们将介绍每项服务的关键部分,最后我们将看到这与Docker Compose的混合在一起。
系统统计
此服务的主要目的是从主机机器收集系统使用统计信息并将其发送到producer
服务。
在这里,我正在阅读/proc/meminfo
文件中的内存统计信息,其中包含有关当前内存使用情况的信息。此文件可在所有基于Linux/UNIX的系统
我们本来可以使用一个更为集中的工具,例如Prometheus或Cadvisor来收集系统统计数据,但这不是本文的主要目标。
我们想每秒收集记忆统计。因此,我们需要一个CRON作业,每秒运行并读取/proc/meminfo
文件中的内存信息。我们将编写一个读取,解析和返回内存统计的函数。
接下来,我们需要一个函数,该函数接收统计数据并将其发送给生产者。这将是一个简单的http帖子呼叫。
最后,我们需要将这两者放在一起。每次作业运行时,这两个功能都需要串联调用。我们为此服务的main
功能看起来像这样。
真棒,现在我们有一项服务,每秒从主机计算机收集内存信息,并将其发送给生产者
制作人
此生产者服务的目的是接收系统统计数据并将其推入Kafka经纪人。为此,我们有一个HTTP帖子端点,该端点获取体内的统计信息并将其写入Kafka主题。
我们在此服务中的主要功能非常简单。有一个帖子端点要接收统计。我正在使用Gin在这里路由的框架。
在我们写信给Kafka之前,我们需要设置我们的Kafka作家。让我们这样做。
现在,我们可以设置我们的PushToKafka
函数,这是我们帖子端点的处理程序。
太好了!我们有一项Producer
服务,该服务通过帖子端点接收系统统计信息,并将这些数据写入我们的Kafka经纪人。
消费者
消费者服务做两件事。
- 订阅KAFKA主题以接收系统统计数据
- 继电器系统统计数据通过WebSockets到
Dashboard
在我们的服务可以收听Kafka消息之前,我们需要设置KAFKA读取器
现在,我们的服务可以订阅并收听我们的Producer
推入Kafka队列的数据。我们将设置一个侦听功能,该功能读取来自Kafka的消息
请注意,它采用回调函数。每当KAFKA发出消息时,此回调函数就会被调用。我们将使用此回调函数将系统统计数据传递到
Dashboard
Service
下一步是每当我们从kafka收到它时,将系统统计数据发送到仪表板。但是,在我们执行此操作之前,我们需要一个端点,特别是插座端子服务可以连接到的套接字端点。我们将在我们的main
函数中定义此端点
和我们的升级处理程序看起来像
要建立WebSocket连接,服务和客户端之间的HTTP连接必须升级才能使用WebSocket协议。
connections.Subscribe(conn)
调用是为了跟踪所有插座连接。
我们的Consumer
服务的最后一步是将消息从Kafka队列传递到仪表板服务。为此,我们设置了一个称为SendMessage
的函数,这将是我们的回调函数Kafka Listen
函数。
真棒!现在,我们有了Consumer
服务,该服务聆听我们的Kafka队列中的消息,并通过Websocket将消息传递给我们的仪表板服务。
仪表板
仪表板是一项非常简单的服务,它通过Websocket连接到Consumer
服务,并在表UI中呈现系统统计信息。
无需详细介绍如何设置React应用程序或标记和样式如何,这里重要的部分是使用Consumer
Service的套接字端点创建套接字连接,并在我们的套接字上设置onmessage
回调函数。<<<<<<<<<<<<<<<<<<<<<<<<<<<<< /p>
我们有一个以系统统计数据为状态的React组件。每当我们从WebSocket收到数据时,我们都会更新此状态。
卡夫卡经纪人
设置的最终服务确实是Kafka经纪人,它促进了整个消息队列。
我正在使用示例Docker从Kafka的Github
组成配置就是这样!我们已经完成了设置。
将所有这些放在一起
现在,如果我们使用docker-compose
运行所有服务,
docker-compose build
docker-compose up
导航到http://localhost:WEB_UI_PORT
,我们可以看到仪表板每秒使用系统统计数据进行更新。
真棒!如您所见,我们的表每秒获得最新的系统统计值的更新。
我尚未详细介绍如何为我们的服务设置构建流程。请参阅
Dockerfile
和docker-compose.yml
文件以获取更多信息。
结论
这是一个准骨kafka设置,无非是单个主题中的继电器消息。我写了这篇文章,只是为了对Kafka的含义有基本的了解,并了解可以使用Kafka的方法。希望阅读此书的人们能得到一些东西。
感谢您的来临,欢呼!