使用模拟生产商的单位测试Kafka生产商
#测试 #java #kafka

有时您的Kafka生产者代码正在做需要正确验证的事情,当然,我们开发人员求助于编写测试。如果我们要测试的功能很好地封装了,我们可以使用单元测试来做到这一点。 Kafka通过提供了称为“称为”的生产者<>接口的模拟实现,您猜对了,MockProducer

准备测试

TransactionProcessor类以下是我们正在测试的类。它具有一个process(Transaction)方法,该方法接收一个事务对象,在我们的示例中仅包含userIdamount属性。取决于处理器将决定编写哪个主题的数量。如果金额高于100.000,则将使用transactions_high_prio主题。否则,它将为transactions_regular_prio主题写一笔交易。

public class TransactionProcessor {
    public static final double HIGH_PRIORITY_THRESHOLD = 100.000;
    private final Producer<String, String> kafkaProducer;
    private final String highPrioTopic;
    private final String regularPrioTopic;

    private final Gson gson = new Gson();

    public TransactionProcessor(Producer<String, String> kafkaProducer, String highPrioTopic, String regularPrioTopic) {
        this.kafkaProducer = kafkaProducer;
        this.highPrioTopic = highPrioTopic;
        this.regularPrioTopic = regularPrioTopic;
    }

    public void process(Transaction transaction){
        String selectedTopic = regularPrioTopic;
        if (transaction.getAmount() >= HIGH_PRIORITY_THRESHOLD) {
            selectedTopic = highPrioTopic;
        }
        String transactionJson = gson.toJson(transaction);
        ProducerRecord<String, String> record = 
            new ProducerRecord<>(selectedTopic, transaction.getUserId(), transactionJson);
        kafkaProducer.send(record);
    }
}

和交易类看起来像这样:

public class Transaction {
    private String userId;
    private double amount;
    //removed for brevity
}

这里要注意的重要一件事是,TransactionProcessor使用Producer接口,而不是实现(这是Kafkaproducer类)。这将使使用MockProducer进行单位测试我们的适配器。

模拟生产者在行动中

好,现在进入测试课程。 TransactionProcessorTest创建了我们将提供给Transaction Processor的模拟生产商的实例。

class TransactionProcessorTest {

    private static final String HIGH_PRIO_TOPIC = "transactions_high_prio";
    private static final String REGULAR_PRIO_TOPIC = "transactions_regular_prio";
    MockProducer<String, String> mockProducer = 
        new MockProducer<>(true, new StringSerializer(), new StringSerializer());

MockProducer构造函数在我们的情况下,采用了几个参数,即键和值序列化器。第一个参数autocomplete是一个布尔值,它告诉模拟生产商立即自动完成所有请求。在常规测试中,您需要将此参数设置为true,以便立即发送消息。如果将其设置为false,则需要在调用send()方法后明确调用completeNext()errorNext(RuntimeException)方法。您想对此做到这一点测试生产者中的错误处理(通过提供要作为errornext方法的参数处理的异常)。

我们创建了模拟生产商后,我们创建了希望测试的类的实例。

TransactionProcessor processor = 
    new TransactionProcessor(mockProducer, HIGH_PRIO_TOPIC, REGULAR_PRIO_TOPIC);

现在是时候测试基于金额的主题是否正确了。我们将创建两个交易对象,一个对象较低,第二个对象高于我们的阈值(即100.000)。

    @Test
    public void testPrioritySelection(){
        Double lowAmount = 50.2d;
        Double highAmount = 250000d;
        Transaction regularPrioTransaction = new Transaction("user1", lowAmount);
        processor.process(regularPrioTransaction);
        Transaction highPrioTransaction = new Transaction("user2", highAmount);
        processor.process(highPrioTransaction);

        assertThat(mockProducer.history()).hasSize(2);

        ProducerRecord<String, String> regTransactionRecord = mockProducer.history().get(0);
        assertThat(regTransactionRecord.value()).contains(lowAmount.toString());
        assertThat(regTransactionRecord.topic()).isEqualTo(REGULAR_PRIO_TOPIC);

        ProducerRecord<String, String> highTransactionRecord = mockProducer.history().get(1);
        assertThat(highTransactionRecord.value()).contains(highAmount.toString());
        assertThat(highTransactionRecord.topic()).isEqualTo(HIGH_PRIO_TOPIC);
    }

拨打处理器后处理(â€)方法两次后,我们想验证有两个记录发送给Kafka。为此,我们使用模拟生产者#history()方法,该方法返回生产者收到的记录列表,这些记录已将其发送给Kafka。我们从历史记录中获取每个记录,以确保将其发送到适当的主题。

Github上的代码

此博客文章中的所有代码示例可在Coding Harbour’s GitHub中找到。

您想了解有关卡夫卡的更多信息吗?

我创建了一个Kafka迷你课程,您可以绝对免费。在Coding Harbour上注册。

照片来源:@paulschnuerle