用TestContainers在春季启动时测试Kafka
#测试 #java #springboot #kafka

今天,让我们谈谈如何测试使用Apache Kafka作为sistema de mensageria的Spring Boot应用程序。为了确保在集成测试中一切正常工作,让我们使用TestContainers库,这使我们能够创建一个自动化和隔离的测试环境。

Apache Kafka是一个流行的分布式流媒体平台,用于实时消息传递。当我们开发使用KAFKA的应用程序时,必须验证系统组件即使在集成测试中也可以正常工作。

[testContainers(https://java.testcontainers.org/test_framework_integration/ Unit_5/)是一个Java库,简化了用于自动化测试的Docker遏制的创建和管理。它促进了测试环境的配置,确保复制和隔离生产环境。

我们开始之前,将码头工人安装在您的母亲中很重要。然后将必要的依赖性添加到项目的pom.xml文件:

<dependencies>
    <!-- Dependências do Spring Boot e Kafka -->
    <!-- ... -->
    <!-- Dependência do Testcontainers para Kafka -->
    <dependency>
        <groupId>org.testcontainers</groupId>
        <artifactId>kafka</artifactId>
        <version>1.16.0</version>
        <scope>test</scope>
    </dependency>
</dependencies>

用测试范围创建测试

现在,让我们创建一个集成测试,以检查我们的Spring Boot应用是否能够连接并将消息发送到Kafkat。

我们将开始为消费者和生产者创建Genhinia课程

...
@Component
public class KafkaConsumer {

  private static final Logger LOGGER = LoggerFactory.getLogger(KafkaConsumer.class);

  private CountDownLatch latch = new CountDownLatch(1);

  private String payload;

  @KafkaListener(topics = "${test.topic}")
  public void receive(ConsumerRecord<?, ?> consumerRecord) {
    LOGGER.info("received payload='{}'", consumerRecord.toString());

    payload = consumerRecord.toString();
    latch.countDown();
  }

  public CountDownLatch getLatch() {
    return latch;
  }

  public void resetLatch() {
    latch = new CountDownLatch(1);
  }

  public String getPayload() {
    return payload;
  }
}

...

@Component
public class KafkaProducer {

  private static final Logger LOGGER = LoggerFactory.getLogger(KafkaProducer.class);

  @Autowired private KafkaTemplate<String, String> kafkaTemplate;

  public void send(String topic, String payload) {
    LOGGER.info("sending payload='{}' to topic='{}'", payload, topic);
    kafkaTemplate.send(topic, payload);
  }
}

创建一个称为 kafkaproducentegrationTest 的测试类后不久,并使用 @runwith(springrunner.class) @springbootest 编写它。这些笔记设置了弹簧启动测试上下文:

...
@RunWith(SpringRunner.class)
@Import(com.fantinel.kafka.KafkaProducerIntegrationTest.KafkaTestContainerConfiguration.class)
@SpringBootTest(classes = Application.class)
@DirtiesContext
public class KafkaProducerIntegrationTest {

  @ClassRule
  public static KafkaContainer kafka =
      new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:7.4.0"));

  @Autowired public KafkaTemplate<String, String> template;

  @Autowired private KafkaConsumer consumer;

  @Autowired private KafkaProducer producer;

  @Value("${test.topic}")
  private String topic;

  @Before
  public void setup() {
    consumer.resetLatch();
  }

  @Test
  public void givenKafkaDockerContainer_whenSendingMessage_thenMessageReceived() throws Exception {
    String data = "Hello DevTo";

    producer.send(topic, data);

    boolean messageConsumed = consumer.getLatch().await(10, TimeUnit.SECONDS);
    assertTrue(messageConsumed);
    assertThat(consumer.getPayload(), containsString(data));
  }

  @TestConfiguration
  static class KafkaTestContainerConfiguration {

    @Bean
    ConcurrentKafkaListenerContainerFactory<Integer, String> kafkaListenerContainerFactory() {
      ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
          new ConcurrentKafkaListenerContainerFactory<>();
      factory.setConsumerFactory(consumerFactory());
      return factory;
    }

    @Bean
    public ConsumerFactory<Integer, String> consumerFactory() {
      return new DefaultKafkaConsumerFactory<>(consumerConfigs());
    }

    @Bean
    public Map<String, Object> consumerConfigs() {
      Map<String, Object> props = new HashMap<>();
      props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka.getBootstrapServers());
      props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
      props.put(ConsumerConfig.GROUP_ID_CONFIG, "devto");
      props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
      props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
      return props;
    }

    @Bean
    public ProducerFactory<String, String> producerFactory() {
      Map<String, Object> configProps = new HashMap<>();
      configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka.getBootstrapServers());
      configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
      configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
      return new DefaultKafkaProducerFactory<>(configProps);
    }

    @Bean
    public KafkaTemplate<String, String> kafkaTemplate() {
      return new KafkaTemplate<>(producerFactory());
    }
  }
}

最后,将消费者和局部配置添加到 application.yml
文件

spring:
  kafka:
    consumer:
      auto-offset-reset: earliest
      group-id: dev-to-test-group
test:
  topic: dev-to-test-topic

在上面的示例中,我们导入必要的类,包括Kafka类,Junit测试类和Spring引导类。然后,我们定义了 @runwith(SpringRunner.Class),以指示使用Spring Runner进行测试,该测试允许启动弹簧上下文。

集成测试定义在KafkaproducentegrationTest类中。首先,我们使用TestContainers定义了KAFKA遏制。 Kafka遏制是用图像“ Confluentin/CP Kafka:7.4.0”初始化的。这种配置允许测试在孤立的Docker遏制中执行功能齐全的KAFKA环境。

然后,我们使用@Autow的注释自动注入 kafkatemplate kafkaconsumer kafkaproducer 测试。我们还注入要使用的kafka t.pica的名称,该名称通过指出 @value(“ $ {test.topic}”)。

在每个测试之前,整个 setup()将执行以重新定义消费者的闩锁。此准备确保消费者准备接收消息。

现在准备了集成测试,我们可以执行它。在测试执行过程中,测试范围将负责创建和管理Kafka的Docker遏制。

您可以直接通过IDE或使用MVN测试命令执行测试。在执行过程中,您可能会看到显示的Kafka日志。

包括£o

简而言之,使用TestContainers的使用有助于为使用Apache Kafka的Spring引导应用程序创建集成测试。借助孤立和自动化的测试环境,我们可以确保系统组件正常运行,并在将应用程序植入生产之前识别集成问题。

在这篇文章中,我们探讨了如何使用TestContainers和Spring启动的Kafka配置和编写集成测试。我希望您发现了这种遏制,并且可以将这些概念应用于您的项目。如果您有任何DAN或建议,请在下面发表评论。
在上面的最新方式是处置存储库GIT