介绍£O:
Apache Kafka是一个分布式的流媒体平台,允许发送和接收真实时间事件流。在Kafka的背景下,生产负责向一个或多个Tanpics发送消息,而消费量负责接收这些消息并处理它们。
在本教程中,我们将探讨如何使用Docker组成并使用Golang编程语言来配置Kafka群集。这种组合非常适合建立高性能和高性能应用,可以从Kafka的功能和灵活性中受益。
pron© - 条件:
开始之前,请确保母亲中安装了以下物品:
- Docker: https://docs.docker.com/get-docker/
- Docker撰写:https://docs.docker.com/compose/install/
- Go (Golang): https://golang.org/doc/install
与Docker Compiss配置Kafka:
首先创建一个名为docker-compose.yml
的文件,并在您选择的方向上添加以下内容:
version: '3'
services:
zookeeper:
image: confluentinc/cp-zookeeper:latest
ports:
- "2181:2181"
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
kafka:
image: confluentinc/cp-kafka:latest
depends_on:
- zookeeper
ports:
- "9092:9092"
environment:
KAFKA_BROKER_ID: 1
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092,PLAINTEXT_HOST://localhost:9092
KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092,PLAINTEXT_HOST://0.0.0.0:9092
KAFKA_AUTO_CREATE_TOPICS_ENABLE: "true"
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
在此文件中,我们正在配置两个服务:Zookeeper,这对于Kafka的操作和Kafka本身所必需。端口2181用于连接到Zookeeper和端口9092,以连接到Kafka。请注意,我们定义了KAFKA_ADVERTISED_LISTENERS
属性,以便在围栏(PLAINTEXT://kafka:9092
)和外部(koud3)内都可以访问kafka。
要启动kafka群集,在项目的根部运行以下命令:
docker-compose up -d
opapel do zookeeper no kafka
Zookeeper是Kafka用来协调和管理群集的集中式服务。除了协助检测故障和过程故障方面,它还负责为每个方的聚会选举羊毛,维护段落的配置和状态信息。
。在我们的示例中,我们将Zookeeper配置为Docker Compiss中的单独服务。 Kafka取决于Zookeeper来注册和维护群集信息,从而确保所有人都不同步。
配置GO环境
现在,卡夫卡(Kafka)处于执行状态,让我们设置我们的GO环境以开发消费者和生产。打开终端并为项目创建新的方向。在其中,通过运行以下命令来初始化职责:
go mod init kafka-tutorial
这将创建一个go.mod
文件来管理项目依赖。
我们将使用lib kude5
lib Sarama
是一个客户库,可以与GO中的Apache Kafka进行交互。它提供了用于创建Kafka消息生产者和消费者的幻想API,并管理Tâtpic,party and party and offset。
要安装它,执行以下命令:
go get github.com/Shopify/sarama
创造消费者和生产:
现在,让我们创建消费者和producy.go文件分别实施消费者和生产。
没有消费者。添加以下内容:
package main
import (
"fmt"
"log"
"github.com/Shopify/sarama"
)
func main() {
consumer, err := sarama.NewConsumer([]string{"localhost:9092"}, nil)
if err != nil {
log.Fatalln("Failed to start consumer:", err)
}
defer func() {
if err := consumer.Close(); err != nil {
log.Fatalln("Failed to close consumer:", err)
}
}()
partitionConsumer, err := consumer.ConsumePartition("test-topic", 0, sarama.OffsetOldest)
if err != nil {
log.Fatalln("Failed to start partition consumer:", err)
}
defer func() {
if err := partitionConsumer.Close(); err != nil {
log.Fatalln("Failed to close partition consumer:", err)
}
}()
for message := range partitionConsumer.Messages() {
fmt.Printf("Received message: Key = %s, Value = %s\n", string(message.Key), string(message.Value))
}
}
没有生产者。
package main
import (
"log"
"github.com/Shopify/sarama"
)
func main() {
producer, err := sarama.NewSyncProducer([]string{"localhost:9092"}, nil)
if err != nil {
log.Fatalln("Failed to start producer:", err)
}
defer func() {
if err := producer.Close(); err != nil {
log.Fatalln("Failed to close producer:", err)
}
}()
message := &sarama.ProducerMessage{
Topic: "test-topic",
Value: sarama.StringEncoder("Hello, Kafka!"),
}
partition, offset, err := producer.SendMessage(message)
if err != nil {
log.Fatalln("Failed to send message:", err)
}
log.Printf("Message sent! Partition = %d, Offset = %d\n", partition, offset)
}
在上面,我们正在消费者连接到KAFKA,并从最古老的偏移量中使用测试主题测试的消息。在制作人,我们正在发送带有“你好,卡夫卡!”的消息。对于测试主题”。
执行消费者和生产
执行生产,打开终端,导航项目方向并运行以下命令:
go run producer.go
要执行消费者,打开另一个终端,直接浏览项目并运行以下命令:
go run consumer.go
现在,您看到消费者收到生产者发送的消息。
找到£o:
在本教程中,我们学习了如何使用Docker组成以及如何使用GO语言来配置Kafka群集以及如何开发消费者和生产。 Kafka是一个强大的流媒体平台,可以与Vain应用程序集成以处理真实时期事件。有了这种组合,您准备构建scald应用程序