可扩展和事件驱动的简单门票购买系统与kafka&python
#教程 #python #kafka #basic

Apache Kafka 是一个分布式事件商店和流处理平台。这是由Java和Scala编写的Apache软件基金会开发的开源系统。该项目旨在提供一个统一的,高通量的低延迟平台来处理实时数据供稿。

我们将建立一个简单的门票购买系统,以了解 kafka

的基本消费者和生产者功能

kafka_output

首先,我们需要设置Kafka,Zookeeper和Python Kafka软件包。

对于Kafka和Zookeeper,我正在使用Docker进行安装。这是docker-compose.yml文件

version: '2'
services:
  zookeeper:
    image: confluentinc/cp-zookeeper:latest
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000
    ports:
      - 22181:2181
  kafka:
    image: confluentinc/cp-kafka:5.3.1
    depends_on:
      - zookeeper
    ports:
      - 29092:29092
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092,PLAINTEXT_HOST://localhost:29092
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1

,您还必须安装kafka-python-package

pip install kafka-python

所有事情都完成了!让我们进入编码

概括

让我们从Frontend思考一下,假设用户将要求从Fronted购买票证,在现场,我们的kafka生产商将以尊敬的数据将数据流发送到Kafka群集。一个kafka消费者将允许应用程序从集群中读取数据流。我们可以尽可能多地作为生产者和消费者API做出不同的不同任务。

代码

producer.py

import json
from faker import Faker
# faker package just use for some random data

fake = Faker() 
from kafka import KafkaProducer

producer = KafkaProducer(bootstrap_servers="localhost:29092")
for i in range(40_000):

    data = {
        "tiket_id":id,
        "user_id": fake.name(),
        "price": 100,
        "bank_account": fake.bban()
    }

    producer.send("order_details", json.dumps(data).encode("utf-8"))
# order details: is just an event name in kafka cluster
    print(f"done sending ..{i}")

交易。

from kafka import KafkaConsumer, KafkaProducer
import json

KAFKA_TOPIC = "order_details"

producer = KafkaProducer(bootstrap_servers="localhost:29092")

consumer = KafkaConsumer(
    KAFKA_TOPIC,
    bootstrap_servers="localhost:29092"
)

print('start listening"')

while True:
    for i in consumer:
        print('ongoing transaction')
        consumed_message = json.loads(i.value.decode())
        data = {
            "price":consumed_message["price"] #retrieve price data from previous producer which is now in kafka cluster 
        }

        producer.send("analytics", json.dumps(data).encode("utf-8")) #then just demo purpose I created another producer for calculation
        print("Successful transaction..")

Analytics.py

from kafka import KafkaConsumer, KafkaProducer
import json

KAFKA_TOPIC = "analytics"
consumer = KafkaConsumer(
    KAFKA_TOPIC,
    bootstrap_servers="localhost:29092"
)
print('start listening"')

while True:
    total_ticket_sell = 0
    revenue = 0
    for i in consumer:
        consumed_message = json.loads(i.value.decode())
        total_ticket_sell += 1
        revenue += consumed_message['price']
        print("============\n\n")
        print("Receiving order")
        print('----------------')
        print(f'total ticket sell so far: {total_ticket_sell}')
        print(f'total revenue so far: {revenue}')


完成了,现在是进行测试的时间。请记住,在3​​个不同的终端中运行Docker撰写文件和3个Python文件只是为了查看发生了什么。

n:b首先运行2个消费者文件,然后是producer.py文件

kafka_output