有时您的Kafka生产者代码正在做需要正确验证的事情,当然,我们开发人员求助于编写测试。如果我们要测试的功能很好地封装了,我们可以使用单元测试来做到这一点。 Kafka通过提供了称为“称为”的生产者<>接口的模拟实现,您猜对了,MockProducer
。
准备测试
TransactionProcessor
类以下是我们正在测试的类。它具有一个process(Transaction)
方法,该方法接收一个事务对象,在我们的示例中仅包含userId
和amount
属性。取决于处理器将决定编写哪个主题的数量。如果金额高于100.000,则将使用transactions_high_prio
主题。否则,它将为transactions_regular_prio
主题写一笔交易。
public class TransactionProcessor {
public static final double HIGH_PRIORITY_THRESHOLD = 100.000;
private final Producer<String, String> kafkaProducer;
private final String highPrioTopic;
private final String regularPrioTopic;
private final Gson gson = new Gson();
public TransactionProcessor(Producer<String, String> kafkaProducer, String highPrioTopic, String regularPrioTopic) {
this.kafkaProducer = kafkaProducer;
this.highPrioTopic = highPrioTopic;
this.regularPrioTopic = regularPrioTopic;
}
public void process(Transaction transaction){
String selectedTopic = regularPrioTopic;
if (transaction.getAmount() >= HIGH_PRIORITY_THRESHOLD) {
selectedTopic = highPrioTopic;
}
String transactionJson = gson.toJson(transaction);
ProducerRecord<String, String> record =
new ProducerRecord<>(selectedTopic, transaction.getUserId(), transactionJson);
kafkaProducer.send(record);
}
}
和交易类看起来像这样:
public class Transaction {
private String userId;
private double amount;
//removed for brevity
}
这里要注意的重要一件事是,TransactionProcessor
使用Producer
接口,而不是实现(这是Kafkaproducer类)。这将使使用MockProducer
进行单位测试我们的适配器。
模拟生产者在行动中
好,现在进入测试课程。 TransactionProcessorTest
创建了我们将提供给Transaction Processor的模拟生产商的实例。
class TransactionProcessorTest {
private static final String HIGH_PRIO_TOPIC = "transactions_high_prio";
private static final String REGULAR_PRIO_TOPIC = "transactions_regular_prio";
MockProducer<String, String> mockProducer =
new MockProducer<>(true, new StringSerializer(), new StringSerializer());
MockProducer
构造函数在我们的情况下,采用了几个参数,即键和值序列化器。第一个参数autocomplete
是一个布尔值,它告诉模拟生产商立即自动完成所有请求。在常规测试中,您需要将此参数设置为true
,以便立即发送消息。如果将其设置为false
,则需要在调用send()
方法后明确调用completeNext()
和errorNext(RuntimeException)
方法。您想对此做到这一点测试生产者中的错误处理(通过提供要作为errornext方法的参数处理的异常)。
我们创建了模拟生产商后,我们创建了希望测试的类的实例。
TransactionProcessor processor =
new TransactionProcessor(mockProducer, HIGH_PRIO_TOPIC, REGULAR_PRIO_TOPIC);
现在是时候测试基于金额的主题是否正确了。我们将创建两个交易对象,一个对象较低,第二个对象高于我们的阈值(即100.000)。
@Test
public void testPrioritySelection(){
Double lowAmount = 50.2d;
Double highAmount = 250000d;
Transaction regularPrioTransaction = new Transaction("user1", lowAmount);
processor.process(regularPrioTransaction);
Transaction highPrioTransaction = new Transaction("user2", highAmount);
processor.process(highPrioTransaction);
assertThat(mockProducer.history()).hasSize(2);
ProducerRecord<String, String> regTransactionRecord = mockProducer.history().get(0);
assertThat(regTransactionRecord.value()).contains(lowAmount.toString());
assertThat(regTransactionRecord.topic()).isEqualTo(REGULAR_PRIO_TOPIC);
ProducerRecord<String, String> highTransactionRecord = mockProducer.history().get(1);
assertThat(highTransactionRecord.value()).contains(highAmount.toString());
assertThat(highTransactionRecord.topic()).isEqualTo(HIGH_PRIO_TOPIC);
}
拨打处理器后处理(â€)方法两次后,我们想验证有两个记录发送给Kafka。为此,我们使用模拟生产者#history()方法,该方法返回生产者收到的记录列表,这些记录已将其发送给Kafka。我们从历史记录中获取每个记录,以确保将其发送到适当的主题。
Github上的代码
此博客文章中的所有代码示例可在Coding Harbour’s GitHub中找到。
您想了解有关卡夫卡的更多信息吗?
我创建了一个Kafka迷你课程,您可以绝对免费。在Coding Harbour上注册。
照片来源:@paulschnuerle