与Docker创建一个Kafka集群,并在Golang中创建和开发消费者并生产
#网络开发人员 #go #docker #kafka

介绍£O:

Apache Kafka是一个分布式的流媒体平台,允许发送和接收真实时间事件流。在Kafka的背景下,生产负责向一个或多个Tanpics发送消息,而消费量负责接收这些消息并处理它们。

在本教程中,我们将探讨如何使用Docker组成并使用Golang编程语言来配置Kafka群集。这种组合非常适合建立高性能和高性能应用,可以从Kafka的功能和灵活性中受益。

pron© - 条件:

开始之前,请确保母亲中安装了以下物品:

与Docker Compiss配置Kafka:

首先创建一个名为docker-compose.yml的文件,并在您选择的方向上添加以下内容:

version: '3'
services:
  zookeeper:
    image: confluentinc/cp-zookeeper:latest
    ports:
      - "2181:2181"
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000

  kafka:
    image: confluentinc/cp-kafka:latest
    depends_on:
      - zookeeper
    ports:
      - "9092:9092"
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092,PLAINTEXT_HOST://localhost:9092
      KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092,PLAINTEXT_HOST://0.0.0.0:9092
      KAFKA_AUTO_CREATE_TOPICS_ENABLE: "true"
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0

在此文件中,我们正在配置两个服务:Zookeeper,这对于Kafka的操作和Kafka本身所必需。端口2181用于连接到Zookeeper和端口9092,以连接到Kafka。请注意,我们定义了KAFKA_ADVERTISED_LISTENERS属性,以便在围栏(PLAINTEXT://kafka:9092)和外部(koud3)内都可以访问kafka。

要启动kafka群集,在项目的根部运行以下命令:

docker-compose up -d

opapel do zookeeper no kafka

Zookeeper是Kafka用来协调和管理群集的集中式服务。除了协助检测故障和过程故障方面,它还负责为每个方的聚会选举羊毛,维护段落的配置和状态信息。

在我们的示例中,我们将Zookeeper配置为Docker Compiss中的单独服务。 Kafka取决于Zookeeper来注册和维护群集信息,从而确保所有人都不同步。

配置GO环境

现在,卡夫卡(Kafka)处于执行状态,让我们设置我们的GO环境以开发消费者和生产。打开终端并为项目创建新的方向。在其中,通过运行以下命令来初始化职责:

go mod init kafka-tutorial

这将创建一个go.mod文件来管理项目依赖。

我们将使用lib kude5
lib Sarama是一个客户库,可以与GO中的Apache Kafka进行交互。它提供了用于创建Kafka消息生产者和消费者的幻想API,并管理Tâtpic,party and party and offset。

要安装它,执行以下命令:

go get github.com/Shopify/sarama

创造消费者和生产:

现在,让我们创建消费者和producy.go文件分别实施消费者和生产。

没有消费者。添加以下内容:

package main

import (
    "fmt"
    "log"

    "github.com/Shopify/sarama"
)

func main() {
    consumer, err := sarama.NewConsumer([]string{"localhost:9092"}, nil)
    if err != nil {
        log.Fatalln("Failed to start consumer:", err)
    }
    defer func() {
        if err := consumer.Close(); err != nil {
            log.Fatalln("Failed to close consumer:", err)
        }
    }()

    partitionConsumer, err := consumer.ConsumePartition("test-topic", 0, sarama.OffsetOldest)
    if err != nil {
        log.Fatalln("Failed to start partition consumer:", err)
    }
    defer func() {
        if err := partitionConsumer.Close(); err != nil {
            log.Fatalln("Failed to close partition consumer:", err)
        }
    }()

    for message := range partitionConsumer.Messages() {
        fmt.Printf("Received message: Key = %s, Value = %s\n", string(message.Key), string(message.Value))
    }
}

没有生产者。

package main

import (
    "log"

    "github.com/Shopify/sarama"
)

func main() {
    producer, err := sarama.NewSyncProducer([]string{"localhost:9092"}, nil)
    if err != nil {
        log.Fatalln("Failed to start producer:", err)
    }
    defer func() {
        if err := producer.Close(); err != nil {
            log.Fatalln("Failed to close producer:", err)
        }
    }()

    message := &sarama.ProducerMessage{
        Topic: "test-topic",
        Value: sarama.StringEncoder("Hello, Kafka!"),
    }

    partition, offset, err := producer.SendMessage(message)
    if err != nil {
        log.Fatalln("Failed to send message:", err)
    }

    log.Printf("Message sent! Partition = %d, Offset = %d\n", partition, offset)
}

在上面,我们正在消费者连接到KAFKA,并从最古老的偏移量中使用测试主题测试的消息。在制作人,我们正在发送带有“你好,卡夫卡!”的消息。对于测试主题”。

执行消费者和生产

执行生产,打开终端,导航项目方向并运行以下命令:

go run producer.go

要执行消费者,打开另一个终端,直接浏览项目并运行以下命令:

go run consumer.go

现在,您看到消费者收到生产者发送的消息。

找到£o:

在本教程中,我们学习了如何使用Docker组成以及如何使用GO语言来配置Kafka群集以及如何开发消费者和生产。 Kafka是一个强大的流媒体平台,可以与Vain应用程序集成以处理真实时期事件。有了这种组合,您准备构建scald应用程序