如何使用Kafka和Python构建事件驱动的架构
#python #体系结构 #eventdriven #kafka

什么是事件驱动的架构?

事件驱动的体系结构是一种软件设计模式,允许应用程序实时或接近实时对事件作用。它通常与微服务并排使用。事件是应用程序中发生的任何重大事件或状态变化。例如,在电子商务平台中,事件可能是用户互动,例如将项目添加到购物车,退房或付款。

事件驱动的架构具有三个主要组成部分:事件生产者,路由器和消费者。事件生产者是生成事件的软件应用程序的组件。这些事件可以通过用户互动或数据更改触发。激活事件后,生产者将向路由器发送包含事件数据的消息。事件路由器过滤事件,改变它们并将其发送给需要它们的活动消费者。

事件驱动架构的好处

1)生产者和消费者的解耦:

在事件驱动的体系结构中将生产商和消费者解耦可使服务能够在无需了解消费者的情况下进行交流,反之亦然。无需等待彼此的响应,从而改善了响应时间。

2)减少了系统故障的可能性:

在事件驱动的体系结构中,如果一项服务失败,则不会导致应用程序中其他服务的失败。事件路由器是事件驱动的体系结构中的重要组件,它是缓冲区并存储事件。每当失败的服务返回在线时,活动路由器都会将活动提供给服务。事件驱动的体系结构可确保事件路由到正确的服务,并彼此隔离,从而降低了系统故障的风险。

3)灵活性:

事件驱动的架构使得可以轻松,快速地在应用程序中添加新的微服务。新的微服务可以轻松消耗当前事件。这带来了灵活性并为创新提供了空间。

了解卡夫卡

Kafka是最初由LinkedIn设计的开源流平台。成千上万的公司将其用于数据集成,高性能数据管道和实时数据分析。 Kafka可以在事件驱动的体系结构中以微服务作为生产者和消费者进行路由事件。 KAFKA的流行用例包括消息传递,活动监控,日志聚合和数据库。

卡夫卡的核心概念

1)主题:

Kafka中的主题是用于组织消息的类别。每个主题在Kafka群集上都有一个唯一的名称。生产者向主题发送活动,消费者从主题中读到。 Kafka主题使微服务可以轻松发送和接收活动。

2)生产者:

生产者是将事件发送到Kafka系统的应用程序。这些事件被发送到他们选择的特定主题。

3)消费者:

消费者是使用发送给Kafka系统的事件的应用程序。他们订阅了他们选择的主题并使用其中的数据。

4)经纪人:

经纪人是Kafka的实例,负责接收和发送事件驱动的体系结构。

设置环境。

首先,您必须在本地计算机上安装了Apache Kafka和Zookeeper。如果您使用Ubuntu,这里是安装Apache Kafaka https://www.digitalocean.com/community/tutorials/how-to-install-apache-kafka-on-ubuntu-20-04的教程。其他操作系统存在教程。

下一步是使用pip安装kafka-python

pip install kafka-python

您必须启动Zookeeper服务器和Kafka经纪人,然后才能执行要构建的项目。

项目代码

在此项目中,您将创建一个生产商,将文本从字符串列表发送到Kafka经纪人。然后,我们将创建一个读取文本并将其保存在蒙古集合中的消费者。

使用kafka的优点之一是,每当消费者分解时,您就可以修复它,消费者将继续从较早留下的地方阅读。您还可以创建另一个消费者,以继续从较早的地方留下来。这确保所有数据都存储在数据库中而不会丢失任何数据。

让我们开始构建您的项目;创建一个名为Producter.py的新的Python文件。导入所需的库。

from time import sleep 
from kafka import KafkaProducer 
from json import dumps

现在您已导入所需的库,下一步是初始化Kafka生产商。注意以下参数:

  1. bootstrap_servers = ['localhost:9092']:这用于设置主机和端口,以识别生产者和消费者将连接到的Kafka经纪人。由于默认值为localhost:9092

  2. ,因此不强制性设置此设置。
  3. value_serializer = lambda x:dumps(x).encode('utf-8'):此函数用于在将数据发送到KAFKA代理之前序列化数据。数据转换为JSON格式并编码为UTF-8。

producer = KafkaProducer(bootstrap_servers=['localhost:9092'], value_serializer=lambda x:dumps(x).encode('utf-8'))

现在,您将创建要发送给经纪人的项目列表。

messages = ["apple", "banana", "mango", "strawberry", "grapes", "orange", "pineapple", "peach", "kiwi"]

使用for循环,您将在列表中迭代,将每个文本作为词典以另一个“消息”作为经纪人的键。该密钥不是主题,而只是数据的关键。这可以通过将发送方法称为生产者来完成。发送方法提出了两个参数;您的主题和数据。每次迭代后您都会休息5秒。

for message in messages:
    data = {'message': message}
    producer.send('message_test', data) sleep(5)

下一步是创建消费者。创建另一个称为consumer.py的文件,并导入必要的库,例如JSON.LOADS,MONGOCLIENT和KAFKACONSUMER。使用pymongo并不是强制性的。您可以使用任何您舒适的数据库。

from pymongo import Mongoclient
from kafka import KafkaConsumer
from json import loads

下一步是创建kafkaconsumer。注意以下参数:

  1. 主题:这是第一个参数;就您而言,这是Message_test

  2. bootstrap_server = ['localhostâ:9092]:与生产者相同

  3. auto_offset_reset =“最早”:此参数用于处理消费者在故障后重新读取消息的地方。它可以设置为“最早”或“最新”。如果设置为“最新”,则消费者将从日志末尾开始阅读。如果设置为最早,则消费者将从最新的偏移量开始阅读。

  4. ENABL

  5. auto_commit_interval_ms = 1000ms:这设置了1秒钟的两个提交之间的间隔。

  6. group_id =``Messge_reader':这是消费者所属的组。消费者必须是一个团队的一部分,以使他们自动工作。

  7. value_deserializer = lambda x:loads(x.decode('utf-8'):这用于将数据归为一般JSON格式。

consumer = KafkaConsumer( 
    'message_test', 
    bootstrap_servers = ['localhost : 9092'], 
    auto_offset_reset = 'earliest', 
    enable_auto_commit = True, 
    group_id = 'my-group', 
    value_deserializer = lambda x : loads(x.decode('utf-8')) 
    )

下一步是连接到mongodb的Message_test集合。

client = MongoClient('localhost: 27017')
collection = client.message_test.message_test.message_test

可以通过通过消费者循环来提取KAFKA主题中的数据。然后,每个数据可以在循环时插入MongoDB集合中。

for message in consumer:
    message = message.value
    collection.insert_one(message)
    print(message + "added to" + collection)

要测试代码,执行product.py文件,然后打开一个新的终端并执行消耗。您会注意到列表中的所有消息是如何显示的。

按CTRL + C中断消费者,记下最后一条消息,然后再次执行消费者。您会注意到,消费者会拿起所有错过的消息,然后继续聆听新消息。

请注意,如果您在读取后一秒钟内中断,重新启动时将再次检索最后一条消息。这是因为auto_commit_interval设置为1秒。

使用Kafka和Python构建事件驱动的体系结构,有效地为数据流和存储创建了一个高度可扩展且可靠的平台。开发人员可以利用Kafka和Python的功能来开发能够实时处理大量数据的应用程序。借助事件驱动的架构,开发人员可以确保其应用程序可扩展和高效。