KAFKA:事件驱动的微服务
#python #kafka #dataengineering

简介:
Microservices Architecture由于其能够构建可扩展和可维护的系统的能力而获得了巨大的知名度。在本文中,我们将使用Apache Kafka作为中央事件总线探讨事件驱动的微服务的概念。这种方法可以构建高度可扩展,松散耦合和实时系统。

Kafka中以事件为导向的微服务是什么?

KAFKA中的事件驱动的微服务是指软件体系结构模式,其中各个微服务通过使用Apache Kafka作为中央事件总线的事件进行异步传达。在这种模式下,服务通过产生和消费事件而脱钩并相互作用。

它是如何工作的?

  1. 事件制作:
    当某些动作或状态变化发生在其域内时,每个微服务都会产生事件。这些事件代表了服务中有意义的事件或更新。微服务将这些事件发布给KAFKA主题,指定与正在产生的事件类型相对应的主题。

  2. 事件消费:
    对特定类型事件感兴趣的其他微服务订阅相关的Kafka主题并消费事件。他们按照生产的顺序获得活动并独立处理。消费微服务可以执行各种操作,例如更新其内部状态,触发进一步的业务逻辑或为响应而产生新事件。

  3. 事件模式和序列化:
    Kafka事件通常以JSON或AVRO等特定格式序列化。微服务需要就架构和序列化格式达成共识,以有效地产生和消费事件。使用架构注册表或版本控制策略有助于在发展事件结构时保持向后兼容性。

  4. 事件采购和重播:
    Kafka的耐用性和保留功能使其适用于活动采购。事件采购涉及将应用程序的状态持续为KAFKA中的一系列事件。这使审计,重建状态并保持变化的历史记录。微服务可以在任何时间点重建其状态。

  5. 可伸缩性和容错性:
    Kafka的分布式性质允许高可扩展性和容错性。微服务的多个实例可以并行消费来自Kafka主题的事件,从而可以水平缩放。 Kafka的复制确保数据耐用性和容错性,即使发生故障,也不会丢失事件。

  6. 事件驱动的处理和分析:
    事件驱动的微服务体系结构允许在事件流进行实时处理和分析。微服务可以根据他们消耗的事件来分析模式,生成见解和触发操作。例如,服务可能会检测异常​​,生成警报,更新实时仪表板或将数据馈送到机器学习模型中以进行预测。

与Webhook和数据库集成

可以将Webhooks和数据库集成到事件驱动的微服务体系结构中,以增强功能并启用无缝的通信和数据持久性。可以使用以下方式:

webhooks:
Webhooks是当发生特定事件时应用程序接收实时通知或回调的一种方式。它们可以按以下方式集成到事件驱动的微服务中:
事件通知:Microservice可以使用另一个微服务或第三方服务注册Webhook回调URL,而不是直接消费Kafka主题的事件。当发生相关事件时,生产微服务将事件发布给Kafka,并触发已注册URL的Webhook通知。然后,消费微服务可以通过处理Webhook请求来处理事件。

外部服务集成:Webhooks可用于与不本地支持Kafka的外部服务集成。例如,当您的微服务中发生事件时,您可以将事件发布给Kafka,并同时将Webhook通知发送到外部服务,以使其实时更新。

脱钩的通信:网络钩提供了微服务之间的松散耦合通信机制。一个微服务可以通过Webhooks通知另一个服务,而不是直接服务到服务通信,从而使服务可以独立发展并减少紧密的耦合。

数据库集成:
数据库在事件驱动的微服务体系结构中起着至关重要的作用,用于数据持久性和维护应用程序状态。以下是可以使用数据库的方式:
状态微服务:由于各种原因,某些微服务可能需要维护其状态。数据库可用于存储和检索这些微服务的状态信息。消耗事件时,微服务可以相应地在数据库中更新其状态。

事件采购:数据库通常用于事件采购,其中事件存储在事件日志或事件存储中。与其仅依靠KAFKA,还可以在数据库中坚持使用事件,以支持事件重播,审核和重建微服务状态。

数据丰富:微服务可能需要通过来自外部来源或参考数据的其他数据丰富消耗的事件。数据库可用于存储和检索这些其他数据,从而使微服务在事件处理过程中丰富事件数据。

缓存:数据库可用作缓存层,以提高性能并减少微服务的负载。微服务可以从数据库中的事件中缓存经常访问数据,避免重复处理相同事件。

创建事件驱动的微服务并添加Webhooks和数据库。

您可以在此处找到整个代码github.com/James-Wachuka/event-driven-microservices

示例:发布Kafka事件并发送Webhook通知的Python代码。

from kafka import KafkaProducer
import requests
import json

# Kafka producer configuration
bootstrap_servers = 'localhost:9092'

# Webhook URLs
user_created_webhook = 'http://localhost:5000/webhook/user_created'
order_placed_webhook = 'http://localhost:5000/webhook/order_placed'

# Create Kafka producer
producer = KafkaProducer(bootstrap_servers=bootstrap_servers,
                         value_serializer=lambda v: json.dumps(v).encode('utf-8'))

# Publish user created event
user = {'id': 11, 'name': 'King'}
producer.send('user_created', value=user)

# Send webhook notification for user created event
requests.post(user_created_webhook, json=user)

# Publish order placed event
order = {'id': 11, 'product': 'sofa', 'amount': 100000}
producer.send('order_placed', value=order)

# Send webhook notification for order placed event
requests.post(order_placed_webhook, json=order)

# Close the producer connection
producer.close()

一个带有两个Webhook端点的烧瓶应用程序来处理创建和订购的用户。

from flask import Flask, request

app = Flask(__name__)

@app.route('/webhook/user_created', methods=['POST'])
def handle_user_created_webhook():
    payload = request.get_json()
    # Perform necessary actions or trigger other processes based on the user created event
    print('New user created:', payload)
    # ...
    return 'Webhook received and processed successfully', 200

@app.route('/webhook/order_placed', methods=['POST'])
def handle_order_placed_webhook():
    payload = request.get_json()
    # Perform necessary actions or trigger other processes based on the order placed event
    print('New order placed:', payload)
    # ...
    return 'Webhook received and processed successfully', 200

if __name__ == '__main__':
    app.run()

数据库示例:下面的代码建立了与PostgreSQL数据库的连接,并为用户和订单创建表。它使用Kafka消费者来消费来自“用户_Created”和“ order_phaced”的主题的消息。消耗的消息将插入相应的数据库表中,将更改插入数据库。

# Consume user_created and order_placed events
for message_1, message_2 in zip(user_consumer,order_consumer):

    user = message_1.value
    order = message_2.value

    cursor = conn.cursor()
    # Insert user data into the database
    user_query = f"INSERT INTO users (id, name) VALUES ({user['id']}, '{user['name']}')"
    order_query = f"INSERT INTO orders (id, product, amount) VALUES ({order['id']}, '{order['product']}', {order['amount']})"

    cursor.execute(user_query)
    print('New user created:', user)
    cursor.execute(order_query)
    print('New order placed:', order)


    conn.commit()
    cursor.close()


# Close the database connection
conn.close()

使用烧瓶应用程序在共享数据库上执行CRUD操作:


# API endpoint to update user information
@app.route('/users/<user_id>', methods=['PUT'])
def update_user(user_id):
    try:
        # Extract updated user information from the request
        user_data = request.get_json()
        name = user_data['name']

        # Update user information in the database
        cursor = conn.cursor()
        query = f"UPDATE users SET name = '{name}' WHERE id = {user_id}"
        cursor.execute(query)
        conn.commit()

        # Publish user_updated event to Kafka
        event_data = {'id': int(user_id), 'name': name}
        producer.send('user_updated', value=event_data)

        return jsonify({'message': 'User updated successfully'})

    except Exception as e:
        return jsonify({'error': str(e)}), 500

带有Kafka事件驱动的微服务提供了几种好处,使它们在现实世界中很重要:

可伸缩性和灵活性:
通过通过事件解开服务,体系结构变得更加可扩展和灵活。每个服务都可以独立开发,部署和缩放,使团队可以同时处理不同的服务。可以在不影响整个系统的情况下添加,修改或删除服务。卡夫卡(Kafka)的分布式性质确保即使在高流量场景中,事件也可以可靠地交付给所有感兴趣的服务。

松散的耦合和弹性:
事件驱动的方法促进了服务之间的松散耦合。服务只需要了解他们产生和消费的事件的结构,而不是其他服务的具体实施详细信息。这种松散的耦合使系统对更改更具弹性,因为服务可以独立发展而不会破坏其他人。如果服务暂时不可用,则可以将事件存储在Kafka中,直到服务恢复为止。

实时处理和分析:
通过由Kafka提供的事件驱动的架构,在事件流上执行实时处理和分析变得更加容易。服务可以实时消费事件,分析模式,生成洞察力和触发操作。例如,服务可能会检测异常​​,生成警报,更新实时仪表板或将数据馈送到机器学习模型中以进行预测。

集成和生态系统:
Kafka拥有丰富的生态系统,并支持广泛的连接器,框架和工具。这使其更容易与其他系统和服务集成,例如数据库,数据仓库,流处理框架和监视工具。 Kafka Connect启用与外部系统的无缝集成,而Kafka流和其他流处理框架为数据处理和转换提供了强大的功能。

结论:
带有KAFKA的事件驱动的微服务为构建可扩展,弹性和松散耦合的系统提供了强大的方法。此模式在各种领域中广泛采用,包括电子商务,金融,电信,物流,物流和物联网,响应能力,可伸缩性以及响应性,可伸缩性以及响应性,以及可伸缩性,以及可伸缩性,以及可伸缩性和可伸缩性。适应性对于成功至关重要。