Apache Kafka 是一个分布式事件商店和流处理平台。这是由Java和Scala编写的Apache软件基金会开发的开源系统。该项目旨在提供一个统一的,高通量的低延迟平台来处理实时数据供稿。
我们将建立一个简单的门票购买系统,以了解 kafka
的基本消费者和生产者功能首先,我们需要设置Kafka,Zookeeper和Python Kafka软件包。
对于Kafka和Zookeeper,我正在使用Docker进行安装。这是docker-compose.yml
文件
version: '2'
services:
zookeeper:
image: confluentinc/cp-zookeeper:latest
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
ports:
- 22181:2181
kafka:
image: confluentinc/cp-kafka:5.3.1
depends_on:
- zookeeper
ports:
- 29092:29092
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092,PLAINTEXT_HOST://localhost:29092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
,您还必须安装kafka-python-package
pip install kafka-python
所有事情都完成了!让我们进入编码
概括
让我们从Frontend思考一下,假设用户将要求从Fronted购买票证,在现场,我们的kafka
生产商将以尊敬的数据将数据流发送到Kafka群集。一个kafka
消费者将允许应用程序从集群中读取数据流。我们可以尽可能多地作为生产者和消费者API做出不同的不同任务。
代码
producer.py
import json
from faker import Faker
# faker package just use for some random data
fake = Faker()
from kafka import KafkaProducer
producer = KafkaProducer(bootstrap_servers="localhost:29092")
for i in range(40_000):
data = {
"tiket_id":id,
"user_id": fake.name(),
"price": 100,
"bank_account": fake.bban()
}
producer.send("order_details", json.dumps(data).encode("utf-8"))
# order details: is just an event name in kafka cluster
print(f"done sending ..{i}")
交易。
from kafka import KafkaConsumer, KafkaProducer
import json
KAFKA_TOPIC = "order_details"
producer = KafkaProducer(bootstrap_servers="localhost:29092")
consumer = KafkaConsumer(
KAFKA_TOPIC,
bootstrap_servers="localhost:29092"
)
print('start listening"')
while True:
for i in consumer:
print('ongoing transaction')
consumed_message = json.loads(i.value.decode())
data = {
"price":consumed_message["price"] #retrieve price data from previous producer which is now in kafka cluster
}
producer.send("analytics", json.dumps(data).encode("utf-8")) #then just demo purpose I created another producer for calculation
print("Successful transaction..")
Analytics.py
from kafka import KafkaConsumer, KafkaProducer
import json
KAFKA_TOPIC = "analytics"
consumer = KafkaConsumer(
KAFKA_TOPIC,
bootstrap_servers="localhost:29092"
)
print('start listening"')
while True:
total_ticket_sell = 0
revenue = 0
for i in consumer:
consumed_message = json.loads(i.value.decode())
total_ticket_sell += 1
revenue += consumed_message['price']
print("============\n\n")
print("Receiving order")
print('----------------')
print(f'total ticket sell so far: {total_ticket_sell}')
print(f'total revenue so far: {revenue}')
完成了,现在是进行测试的时间。请记住,在3个不同的终端中运行Docker撰写文件和3个Python文件只是为了查看发生了什么。
n:b首先运行2个消费者文件,然后是producer.py文件