今天,让我们谈谈如何测试使用Apache Kafka作为sistema de mensageria的Spring Boot应用程序。为了确保在集成测试中一切正常工作,让我们使用TestContainers库,这使我们能够创建一个自动化和隔离的测试环境。
Apache Kafka是一个流行的分布式流媒体平台,用于实时消息传递。当我们开发使用KAFKA的应用程序时,必须验证系统组件即使在集成测试中也可以正常工作。
[testContainers(https://java.testcontainers.org/test_framework_integration/ Unit_5/)是一个Java库,简化了用于自动化测试的Docker遏制的创建和管理。它促进了测试环境的配置,确保复制和隔离生产环境。
我们开始之前,将码头工人安装在您的母亲中很重要。然后将必要的依赖性添加到项目的pom.xml文件:
<dependencies>
<!-- Dependências do Spring Boot e Kafka -->
<!-- ... -->
<!-- Dependência do Testcontainers para Kafka -->
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>kafka</artifactId>
<version>1.16.0</version>
<scope>test</scope>
</dependency>
</dependencies>
用测试范围创建测试
现在,让我们创建一个集成测试,以检查我们的Spring Boot应用是否能够连接并将消息发送到Kafkat。
我们将开始为消费者和生产者创建Genhinia课程
...
@Component
public class KafkaConsumer {
private static final Logger LOGGER = LoggerFactory.getLogger(KafkaConsumer.class);
private CountDownLatch latch = new CountDownLatch(1);
private String payload;
@KafkaListener(topics = "${test.topic}")
public void receive(ConsumerRecord<?, ?> consumerRecord) {
LOGGER.info("received payload='{}'", consumerRecord.toString());
payload = consumerRecord.toString();
latch.countDown();
}
public CountDownLatch getLatch() {
return latch;
}
public void resetLatch() {
latch = new CountDownLatch(1);
}
public String getPayload() {
return payload;
}
}
...
@Component
public class KafkaProducer {
private static final Logger LOGGER = LoggerFactory.getLogger(KafkaProducer.class);
@Autowired private KafkaTemplate<String, String> kafkaTemplate;
public void send(String topic, String payload) {
LOGGER.info("sending payload='{}' to topic='{}'", payload, topic);
kafkaTemplate.send(topic, payload);
}
}
创建一个称为 kafkaproducentegrationTest 的测试类后不久,并使用 @runwith(springrunner.class)和 @springbootest 编写它。这些笔记设置了弹簧启动测试上下文:
...
@RunWith(SpringRunner.class)
@Import(com.fantinel.kafka.KafkaProducerIntegrationTest.KafkaTestContainerConfiguration.class)
@SpringBootTest(classes = Application.class)
@DirtiesContext
public class KafkaProducerIntegrationTest {
@ClassRule
public static KafkaContainer kafka =
new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:7.4.0"));
@Autowired public KafkaTemplate<String, String> template;
@Autowired private KafkaConsumer consumer;
@Autowired private KafkaProducer producer;
@Value("${test.topic}")
private String topic;
@Before
public void setup() {
consumer.resetLatch();
}
@Test
public void givenKafkaDockerContainer_whenSendingMessage_thenMessageReceived() throws Exception {
String data = "Hello DevTo";
producer.send(topic, data);
boolean messageConsumed = consumer.getLatch().await(10, TimeUnit.SECONDS);
assertTrue(messageConsumed);
assertThat(consumer.getPayload(), containsString(data));
}
@TestConfiguration
static class KafkaTestContainerConfiguration {
@Bean
ConcurrentKafkaListenerContainerFactory<Integer, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
@Bean
public ConsumerFactory<Integer, String> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs());
}
@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka.getBootstrapServers());
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "devto");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
return props;
}
@Bean
public ProducerFactory<String, String> producerFactory() {
Map<String, Object> configProps = new HashMap<>();
configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka.getBootstrapServers());
configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
return new DefaultKafkaProducerFactory<>(configProps);
}
@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
}
}
最后,将消费者和局部配置添加到 application.yml
文件
spring:
kafka:
consumer:
auto-offset-reset: earliest
group-id: dev-to-test-group
test:
topic: dev-to-test-topic
在上面的示例中,我们导入必要的类,包括Kafka类,Junit测试类和Spring引导类。然后,我们定义了 @runwith(SpringRunner.Class),以指示使用Spring Runner进行测试,该测试允许启动弹簧上下文。
集成测试定义在KafkaproducentegrationTest类中。首先,我们使用TestContainers定义了KAFKA遏制。 Kafka遏制是用图像“ Confluentin/CP Kafka:7.4.0”初始化的。这种配置允许测试在孤立的Docker遏制中执行功能齐全的KAFKA环境。然后,我们使用@Autow的注释自动注入 kafkatemplate , kafkaconsumer 和 kafkaproducer 测试。我们还注入要使用的kafka t.pica的名称,该名称通过指出 @value(“ $ {test.topic}”)。
在每个测试之前,整个 setup()将执行以重新定义消费者的闩锁。此准备确保消费者准备接收消息。
现在准备了集成测试,我们可以执行它。在测试执行过程中,测试范围将负责创建和管理Kafka的Docker遏制。
您可以直接通过IDE或使用MVN测试命令执行测试。在执行过程中,您可能会看到显示的Kafka日志。
包括£o
简而言之,使用TestContainers的使用有助于为使用Apache Kafka的Spring引导应用程序创建集成测试。借助孤立和自动化的测试环境,我们可以确保系统组件正常运行,并在将应用程序植入生产之前识别集成问题。
在这篇文章中,我们探讨了如何使用TestContainers和Spring启动的Kafka配置和编写集成测试。我希望您发现了这种遏制,并且可以将这些概念应用于您的项目。如果您有任何DAN或建议,请在下面发表评论。
在上面的最新方式是处置存储库GIT