ApacheKafka®Java客户库库的第一步
#java #data #apachekafka

很难想象关键任务软件的开发而不依赖于ApacheKafka®等事件流平台。但是,也许您是ApacheKafka®的新手,并且想更深入。你在正确的地方!本文将使用Java使用ApacheKafka®引导您的第一步。

如果您迫不及待地想看到最终结果,this GitHub repository has the producer and consumer我们将在本文提供的分步指南中撰写。

配备了您的需求

在这篇博客文章中,您将学习如何在Java创建ApacheKafka®生产商和消费者。您将准备安全连接所需的配置文件,然后编写一些Java将消息发送到群集并进行轮询。

在开始编写代码之前,您需要准备几件事。

Apache Kafka群集

首先,您需要Apache Kafka群集本身。为了简化设置,您可以使用Aiven for Apache Kafka®。 ApacheKafka®的Aiven是一种完全管理的解决方案,它在短短几分钟内就可以使用正确的配置来构建一个群集,从而照顾安全的身份验证和其他必需品。如果您还没有Aiven帐户,请注册a free trial

进入控制台后,创建一个新服务:在中创建服务对话框选择 apache kafka 您的云选择,最接近您云区域启动服务计划就足够了。为您的服务设置名称,例如 apache-kafka-playground

部署服务时,您可以执行其他任务。

Screenshot showing newly created Aiven for Apache Kafka service, service is still rebuilding

Java项目与依赖关系

您需要的第二件事是计算机上安装的JDK和一个基本的Java项目。本文假设您对Java具有基本知识。运行此代码时,我使用了Java 11 JDK,但是ApacheKafka®支持Java 17,因此您有很多选择。

您还需要一个官方的低级ApacheKafka®客户库,用于Java,A 参考客户端,以创建生产者和消费者。请注意,如果您打算与Java API一起使用Kafka Streams或Kafka Connect,则需要额外的库。

在Java项目中包含kafka-client的最方便方法是使用MavenGradle。选择最新版本的kafka-client from mvnrepository,选择您使用的构建工具,复制依赖项并将其添加到项目中。

Screenshot showing selecting gradle dependency for apache kafka client from mvnrepository

我使用了gradle。我将依赖项粘贴到 build.gradle 文件中,并通过选择重新加载所有Gradle项目来让Intellij Idea加载必要的文件。

除了ApacheKafka®客户端外,您还需要其他几个库:

设置ApacheKafka®的配置和身份验证

在创建生产者和消费者之前,您需要指定几个配置属性。这些确保Kafka经纪人和客户交换的信息保持完整,安全和机密。

Aiven提供了两种身份验证方法: tls sasl 。在本文中,我们将使用TLS进行身份验证和加密。如果您想使用SASL,请查看the SASL instructions in Aiven's documentation

通常,要执行TLS握手,您需要配置ApacheKafka®经纪人和客户端。为了简化事物,Aiven会为经纪人提供TLS配置,因此您只需要配置客户端即可。而且,正如我们将看到的那样,即使客户Aiven为您完成了大部分工作。

要在客户端和服务器之间建立TLS连接,需要发生三件事:

  1. 客户需要验证服务器的身份。
  2. 服务器需要验证客户端的身份。
  3. 必须对客户端和服务器之间的所有传输中的所有消息进行加密。

为此,我们将使用Java TrustStore和Keystores。

Java中的一个信托基地是您存储您信任的外部系统证书的地方。这些证书不包含敏感信息,但是识别并连接到第三方系统很重要。另一方面,密钥库包含专用访问密钥及其相应的访问证书,这是对客户端进行身份验证所需的。您不应该与任何人共享密钥库数据。

如果您冒险,可以手动创建这些文件(这是the guide如何做到这一点)。但是,您也可以使用方便的快捷方式,让Aiven平台为我们完成所有工作。

使用Aiven CLI运行koude1,并提供有关服务和用户的信息:

  • 您的服务名称 - 在创建期间定义的Apache Kafka服务的名称
  • your-user-name - 执行操作的用户名称(如果您有疑问,请运行avn service user-list --format '{username}' --project YOUR-PROJECT-NAME YOUR-SERVICE-NAME查看用户)
  • 密码 - 为您的密钥库和TrustStore选择一个安全密码

现在将这些字段应用于下面的命令并运行:

avn service user-kafka-java-creds YOUR-SERVICE-NAME --username YOUR-USER-NAME -d src/main/resources --password PASSWORD

如果一切顺利,您将看到 Resources 文件夹中出现六个新文件。 Aiven下载必要的证书,创建密钥库和Trustore,并将所有引用都放入一个文件 client.properties

6 new files that were added after running  raw ``avn service user-kafka-java-creds`` endraw

为了使阅读位于 client.properties 的设置变得更加容易,请在新的类Utils中添加一个小的静态方法loadProperties

package org.example;

import java.io.IOException;
import java.io.InputStream;
import java.util.Properties;

import org.apache.kafka.common.serialization.StringSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class Utils {
    public static Properties loadProperties() {
        Properties properties = new Properties();
        try (InputStream input = ProducerOneMessage.class.getClassLoader().getResourceAsStream("client-ssl.properties")) {
            if (input == null) {
                System.out.println("Sorry, unable to find config.properties");
            }
            properties.load(input);
            properties.put("key.serializer", StringSerializer.class.getName());
            properties.put("value.serializer", StringSerializer.class.getName());
        } catch (IOException ex) {
            ex.printStackTrace();
        }
        return properties;
    }
}

恭喜!您已经完成了配置设置。

向ApacheKafka®集群派遣活动

是时候将数据发送到ApacheKafka®群集了。为此,您需要一个生产者。
在您的项目中,创建一个名为Producer的新Java类,并在其中添加主要方法。

要发送一条消息,您需要执行以下四个步骤:

public class Producer {
    public static void main(String[] args) {
        // Step # 1: create a producer and connect to the cluster
        // Step # 2: define the topic name
        // Step # 3: create a message record
        // Step # 4: send the record to the cluster
    }
}

对于每个步骤,您可以依靠官方ApacheKafka®客户端库为Java提供的功能,您先前添加为依赖项。

这是您必须导入的生产者类才能工作的方法:

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import org.json.JSONObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Properties;

使用Logger实例来记录事件也是一个好主意。

Logger logger = LoggerFactory.getLogger(Producer.class.getName());

步骤1:创建生产者并连接到集群

来自 apache kafka客户端库中的KafkaProducer的构造函数期望属性列表建立连接。您已经做了大部分繁重的举重来定义上一节中的连接配置。现在,只需用您上面添加的有用的实用方法Utils.loadProperties()引用这些条目。

// step # 1: create a producer and connect to the cluster
// get connection data from the configuration file

Properties properties = Utils.loadProperties();
KafkaProducer<String,String> producer =
        new KafkaProducer<>(properties);

您还需要定义的另一件事是序列化数据的格式。在此示例中,我们将以StringSerializer的形式发送JSON对象。您还应该为密钥添加一个序列化器。即使您不需要在第一个示例中明确使用键,但指定key.serializer是强制性的。

properties.put("key.serializer", StringSerializer.class.getName());
properties.put("value.serializer", StringSerializer.class.getName());

现在您拥有一组属性来建立连接,您可以创建一个KafkaProducer的实例并将属性传递到其构造函数:

KafkaProducer<String,String> producer = new KafkaProducer<String, String>(properties);

此时,您没有向群集发送任何数据。但是,运行Producer以查看如何建立服务器的连接以及是否存在任何错误很有用:

establishing-connection

步骤2:定义主题名称

将数据发送到集群时,您需要定义一个主题以将消息发送到。

我创建了一个名为客户活动的主题,该主题在在线商店中记录了客户的活动。您可以更具创造力,并为您的消息选择一个不同的主题!

String topicName = "customer-activity";

请注意,一旦您选择了主题的名称,就需要在ApacheKafka®群集的Aiven中创建它。即使您可以配置ApacheKafka®以在消息到达时自动创建主题,但最好将该选项保留为禁用,以避免意外地创建一堆不必要的主题。您可以使用方便的CLI快捷方式koude13在ApacheKafka®中创建一个主题,或通过Aiven Console关注these steps to create a topic

这是我使用的主题的配置,您可以看到它包含三个分区和三个复制:

Screenshot that shows adding a new topic through Aiven's console

步骤#3:创建消息记录

可以以各种格式发送消息:字符串,JSON,AVRO,PROTOBUF等。实际上,Kafka对您要发送的数据结构没有任何意见,这使得平台非常灵活。有时这会变得凌乱,但是Karapace, Aiven's open source schema registry可以帮助您在需要时更好地组织数据。

为了简单起见,将JSON用于此示例,并定义具有三个属性的对象:客户名称,执行的操作以及受影响的产品。

JSONObject message = new JSONObject();
message.put("customer", "Judy Hopps🐰");
message.put("product", "Carrot 🥕");
message.put("operation", "ordered");

通过将主题名称和消息传递给构造函数来创建一个新的ProductRecord实例:

// package the message in the record
ProducerRecord<String, String> record = new ProducerRecord<>(topicName, message.toString());
logger.info("Record created: " + record);

请注意,使用<String, String>表明生产者期望String格式中的密钥和值。

步骤#4:将记录发送到集群

最后,要将消息发送到Apache群集主题,请调用生产者实例的send()方法并提供记录:

producer.send(record);
producer.flush();
producer.close();

要运行生产者,请在IDE的帮助下调用Producer类的main()方法。另外,您可以使用Gradle并设置任务来运行生产者,就像它已经完成了in the accompanying repository一样。您应该看到与此类似的输出:

Screenshot showing running producer that sent a single message to the cluster

生产者的send()方法还接受回调接口,该接口为我们提供了元数据和有关例外的信息。您可以通过进行以下更改来介绍它:

producer.send(record, new Callback() {
    @Override
    public void onCompletion(RecordMetadata metadata, Exception exception) {
        if(exception == null) {
            logger.info("Sent successfully. Metadata: " + metadata.toString());
        } else {
            exception.printStackTrace();
        }
    }
});
producer.flush();
producer.close();

RecordMetadataCallback将需要额外的进口:

import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.clients.producer.Callback;

发送多条消息

太好了,您成功地向集群发送了一条消息!但是,一一发送消息是乏味的。转向消费者之前,将代码转换为模仿数据流的连续(即使过度简化)。

为此,让我们分开生成消息的方法:


static final String[] operations = {"searched", "bought"};
static final String[] customers = {"Judy Hopps🐰", "Nick Wilde🦊", "Chief Bogo🐃", "Officer Clawhauser😼", "Mayor Lionheart 🦁", "Mr. Big 🪑", "Fru Fru💐"};
static final String[] products = {"Donut 🍩", "Carrot 🥕", "Tie 👔", "Glasses 👓️️", "Phone ☎️", "Ice cream 🍨", "Dress 👗", "Pineapple pizza 🍕"};

public static JSONObject generateMessage() {
    JSONObject message = new JSONObject();

    // randomly assign values
    Random randomizer = new Random();
    message.put("customer", customers[randomizer.nextInt(7)]);
    message.put("product", products[randomizer.nextInt(7)]);
    message.put("operation", operations[randomizer.nextInt(30) < 25 ? 0 : 1]); // prefer 'search' over 'bought'

    return message;
}

现在结合了在循环时无限内生成和发送数据的步骤。请注意,使用while(true)Thread.sleep不是您在生产环境中要做的事情,但是出于我们的目的,它们运行良好:

try (KafkaProducer<String,String> producer = new KafkaProducer<>(properties)) {
    // step # 2: define the topic name
    String topicName = "customer-activity";

    // step # 3: generate and send message data
    while(true) {
        // generate a new message
        JSONObject message = generateMessage();

        // package the message in a record
        ProducerRecord<String, String> record =
                new ProducerRecord<>(topicName, message.toString());
        logger.info("Record created: " + record);

        // send data
        producer.send(record, new Callback() {
            @Override
            public void onCompletion(RecordMetadata metadata, Exception exception) {
                if(exception == null) {
                    logger.info("Sent successfully. Metadata: " + metadata.toString());
                } else {
                    exception.printStackTrace();
                }
            }
        });
        Thread.sleep(1000);
    }
}
}

现在运行生产者时,您将记录连续发送到集群中:

Screenshot showing the producer sending multiple messages to the Kafka topic

消耗来自Apache Kafka主题的数据

现在,消息已由生产者生成并发送到集群中,您可以创建一个消费者来进行轮询和处理这些消息。

创建简单的消费者可以分为三个步骤:

public class Consumer {
    public static void main(String[] args) {
        // Step # 1: create a consumer and connect to the cluster
        // Step # 2: subscribe consumer to the topics
        // Step # 3: poll and process new data
    }
}

这是以下代码的导入:

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.time.Duration;
import java.util.Collections;
import java.util.Properties;

步骤1:创建消费者并连接到集群

类似于您配置生产商的属性的方式,您需要为消费者指定连接信息。

// step # 1: create a consumer and connect to the cluster
// get connection data from the configuration file
Properties properties = Utils.loadProperties();
properties.put("key.deserializer", StringDeserializer.class.getName());
properties.put("value.deserializer", StringDeserializer.class.getName());
properties.put("group.id", "first");
properties.put("auto.offset.reset", "earliest"); //choose from earliest/latest/none

除了您用于生产者的属性外,消费者还有几个新的。首先,消费者需要能够对其从集群中读取的数据进行估算,因此,您不是定义次要化的序列化属性:

properties.put("key.deserializer", StringDeserializer.class.getName());
properties.put("value.deserializer", StringDeserializer.class.getName());

您还需要将消费者分配给消费者群体。通过指定group.id
来执行此操作

properties.put("group.id", "first");

您应该定义的最后一件事是,当消费者首先连接到主题时,消费者应开始阅读数据。您可以定义特定的偏置,或者,或者指向该主题中当前存在的最早或最新消息。将auto.offset.reset设置为earliest从一开始读取消息。

properties.put("auto.offset.reset", "earliest"); 

使用您定义的连接属性,您可以创建消费者:

KafkaConsumer<String,String> consumer = new KafkaConsumer<String, String>(properties);

步骤2:将消费者订阅该主题

将消费者订阅一个或多个主题:

String topicName = "customer-activity";
consumer.subscribe(Collections.singleton(topicName));

步骤3:民意调查和处理新数据

最后一步是从ApacheKafka®主题定期进行投票数据。为此,使用poll()方法并指定消费者应等待新消息到达的时间。

// step # 3 poll andprocess new data
while (true) {
    // poll new data
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
    // process new data
    for (ConsumerRecord<String, String> record : records) {
        logger.info("message: " + record.value());
    }
}

最后,是时候开始消费者阅读生产者发送的所有数据了。同样,您可以使用IDE的帮助来运行themain()方法,也可以使用 gradle 的功能,请参见随附的存储库中的how it's done

Screenshot showing consumer polling and printing out data

维护每个客户的消息顺序

使用生产者和消费者创建,您现在可以从Kafka群集发送和阅读数据。
但是,如果您仔细查看记录,您可能会注意到,消费者所阅读的记录顺序与生产者发送的记录不同。

即使这是分布式系统的自然副作用,您也经常想维护整个消息的顺序。在单独的博客文章ways to balance your data across Apache Kafka® partitions中详细讨论了这一挑战。在这篇文章中,我们将使用该文章中建议的一种策略:在钥匙的帮助下保留消息的顺序。

在一家在线商店中,客户执行的运营顺序很重要。客户首先将产品添加到篮子中,然后为此付费,然后才能派遣商品。为了在平衡跨分区的数据时维护与每个客户相关的消息的顺序,您可以将客户的id用作密钥。

在制作记录时,在生产者方面,指定记录的键:

// create a producer record
String key = message.get("customer").toString();
String value = message.toString();
ProducerRecord<String, String> record = new ProducerRecord<>(topicName, key, value);

要查看此更改对消费者端的影响,请在处理数据时从经纪人中打印出每个记录的分区和偏移:

for (ConsumerRecord<String,String> record : records) {
    logger.info("partition " + record.partition() +
            "| offset " + record.offset() +
            "| " + record.value() );
}

现在您可以运行更新的生产商和消费者。在消费者输出中,请注意,每个客户的数据始终添加到同一分区中。这样,即使可以重新安装有关客户的消息,与同一客户有关的消息仍保持其原始订单。

您可以使用客户执行的每次单独的购物旅行作为钥匙来进一步改善此设置。客户进行多次购物旅行,但是每次旅行都是唯一的,并且包含一系列事件,这些事件必须在消费时保持完全相同的顺序。购物之旅的记录比客户的整体活动少,因此导致分区不平衡的可能性更少。

最后的想法和下一步

在本文中,我们介绍了开始使用官方Java客户端库开始使用Apache Kafka的第一步。您可以在a github repository中找到本文使用的代码。

使用Apache Kafka时,仍然有很多东西要发现,因此,如果您想了解更多信息,请查看这些文章:

Apache Kafka® simply explained
Apache Kafka® key concepts, A glossary of terms related to Apache Kafka
Ways to balance your data across Apache Kafka® partitions
What is Karapace? Find out more about the magic that is the schema registry!

或围绕我们的Apache Kafka documentation戳,然后尝试Aiven for Apache Kafka