1. Overview
Apache Kafka is a powerful, distributed, fault-tolerant stream processing system. In a previous tutorial, we learned how to work with Spring and Kafka.
In this tutorial, we’ll build on the previous one and learn how to write reliable, self-contained integration tests that don’t rely on an external Kafka server running.
First, we’ll start by looking at how to use and configure an embedded instance of Kafka.
Then we’ll see how we can make use of the popular framework Testcontainers from our tests.
2. Dependencies
Of course, we’ll need to add the standard spring-kafka dependency to our pom.xml:
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>3.1.2</version>
</dependency>
Then we’ll need two more dependencies specifically for our tests.
First, we’ll add the spring-kafka-test artifact:
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka-test</artifactId>
<version>3.1.1</version>
<scope>test</scope>
</dependency>
And finally we’ll add the Testcontainers Kafka dependency, which is also available over on Maven Central:
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>kafka</artifactId>
<version>1.19.3</version>
<scope>test</scope>
</dependency>
Now that we have all the necessary dependencies configured, we can write a simple Spring Boot application using Kafka.
3. A Simple Kafka Producer-Consumer Application
Throughout this tutorial, the focus of our tests will be a simple producer-consumer Spring Boot Kafka application.
Let’s start by defining our application entry point:
@SpringBootApplication
public class KafkaProducerConsumerApplication {
public static void main(String[] args) {
SpringApplication.run(KafkaProducerConsumerApplication.class, args);
}
}
As we can see, this is a standard Spring Boot application.
3.1. Producer Setup
Next, let’s consider a producer bean that we’ll use to send messages to a given Kafka topic:
@Component
public class KafkaProducer {
private static final Logger LOGGER = LoggerFactory.getLogger(KafkaProducer.class);
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
public void send(String topic, String payload) {
LOGGER.info("sending payload='{}' to topic='{}'", payload, topic);
kafkaTemplate.send(topic, payload);
}
}
Our KafkaProducer bean defined above is merely a wrapper around the KafkaTemplate class. This class provides high-level thread-safe operations, such as sending data to the provided topic, which is exactly what we do in our send method.
3.2. Consumer Setup
Likewise, we’ll now define a simple consumer bean that will listen to a Kafka topic and receive messages:
@Component
public class KafkaConsumer {
private static final Logger LOGGER = LoggerFactory.getLogger(KafkaConsumer.class);
private CountDownLatch latch = new CountDownLatch(1);
private String payload;
@KafkaListener(topics = "${test.topic}")
public void receive(ConsumerRecord<?, ?> consumerRecord) {
LOGGER.info("received payload='{}'", consumerRecord.toString());
payload = consumerRecord.toString();
latch.countDown();
}
public void resetLatch() {
latch = new CountDownLatch(1);
}
// other getters
}
Our simple consumer uses the @KafkaListener annotation on the receive method to listen to messages on a given topic. We’ll see later how we configure the test.topic from our tests.
Furthermore, the receive method stores the message content in our bean and decrements the count of the latch variable. This variable is a simple thread-safe counter field that we’ll use later from our tests to ensure we successfully received a message.
Now that we have our simple Kafka application using Spring Boot implemented, let’s see how we can write integration tests.
4. A Word on Testing
In general, when writing clean integration tests, we shouldn’t depend on external services that we might not be able to control or might suddenly stop working. This could have adverse effects on our test results.
Similarly, if we’re dependent on an external service, in this case, a running Kafka broker, we likely won’t be able to set it up, control it and tear it down in the way we want from our tests.
4.1. Application Properties
We’re going to use a very light set of application configuration properties from our tests.
We’ll define these properties in our src/test/resources/application.yml file:
spring:
kafka:
consumer:
auto-offset-reset: earliest
group-id: baeldung
test:
topic: embedded-test-topic
This is the minimum set of properties that we need when working with an embedded instance of Kafka or a local broker.
Most of these are self-explanatory, but the one we should highlight is the consumer property auto-offset-reset: earliest. This property ensures that our consumer group gets the messages we send because the container might start after the sends have completed.
Additionally, we configure a topic property with the value embedded-test-topic, which is the topic we’ll use from our tests.
5. Testing Using Embedded Kafka
In this section, we’ll take a look at how to use an in-memory Kafka instance to run our tests against. This is also known as Embedded Kafka.
The dependency spring-kafka-test we added previously contains some useful utilities to assist with testing our application. Most notably, it contains the EmbeddedKafkaBroker class.
With that in mind, let’s go ahead and write our first integration test:
@SpringBootTest
@DirtiesContext
@EmbeddedKafka(partitions = 1, brokerProperties = { "listeners=PLAINTEXT://localhost:9092", "port=9092" })
class EmbeddedKafkaIntegrationTest {
@Autowired
private KafkaConsumer consumer;
@Autowired
private KafkaProducer producer;
@Value("${test.topic}")
private String topic;
@Test
public void givenEmbeddedKafkaBroker_whenSendingWithSimpleProducer_thenMessageReceived()
throws Exception {
String data = "Sending with our own simple KafkaProducer";
producer.send(topic, data);
boolean messageConsumed = consumer.getLatch().await(10, TimeUnit.SECONDS);
assertTrue(messageConsumed);
assertThat(consumer.getPayload(), containsString(data));
}
}
Let’s walk through the key parts of our test.
First, we start by decorating our test class with two pretty standard Spring annotations:
- The @SpringBootTest annotation will ensure that our test bootstraps the Spring application context.
- We also use the @DirtiesContext annotation, which will make sure this context is cleaned and reset between different tests.
Here comes the crucial part — we use the @EmbeddedKafka annotation to inject an instance of an EmbeddedKafkaBroker into our tests.
Moreover, there are several properties available we can use to configure the embedded Kafka node:
- partitions – This is the number of partitions used per topic. To keep things nice and simple, we only want one to be used from our tests.
- brokerProperties – additional properties for the Kafka broker. Again, we keep things simple and specify a plain text listener and a port number.
Next, we auto-wire our consumer and producer classes and configure a topic to use the value from our application.properties.
For the final piece of the puzzle, we simply send a message to our test topic and verify that the message has been received and contains the name of our test topic.
When we run our test, here’s what we’ll see among the verbose Spring output:
...
12:45:35.099 [main] INFO c.b.kafka.embedded.KafkaProducer -
sending payload='Sending with our own simple KafkaProducer' to topic='embedded-test-topic'
...
12:45:35.103 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1]
INFO c.b.kafka.embedded.KafkaConsumer - received payload=
'ConsumerRecord(topic = embedded-test-topic, partition = 0, leaderEpoch = 0, offset = 1,
CreateTime = 1605267935099, serialized key size = -1,
serialized value size = 41, headers = RecordHeaders(headers = [], isReadOnly = false),
key = null, value = Sending with our own simple KafkaProducer key)'
This confirms that our test is working properly. Awesome! We now have a way to write self-contained, independent integration tests using an in-memory Kafka broker.
6. Testing Kafka With TestContainers
Sometimes we might see small differences between a real external service vs an embedded in-memory instance of a service that has been specifically provided for testing purposes. Although unlikely, it could also be that the port used from our test might be occupied, causing a failure.
With that in mind, in this section we’ll see a variation on our previous approach to testing using the Testcontainers framework. We’ll see how to instantiate and manage an external Apache Kafka broker hosted inside a Docker container from our integration test.
Let’s define another integration test that will be quite similar to the one we saw in the previous section:
@RunWith(SpringRunner.class)
@Import(com.baeldung.kafka.testcontainers.KafkaTestContainersLiveTest.KafkaTestContainersConfiguration.class)
@SpringBootTest(classes = KafkaProducerConsumerApplication.class)
@DirtiesContext
public class KafkaTestContainersLiveTest {
@ClassRule
public static KafkaContainer kafka =
new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:5.4.3"));
@Autowired
private KafkaConsumer consumer;
@Autowired
private KafkaProducer producer;
@Value("${test.topic}")
private String topic;
@Test
public void givenKafkaDockerContainer_whenSendingWithSimpleProducer_thenMessageReceived()
throws Exception {
String data = "Sending with our own simple KafkaProducer";
producer.send(topic, data);
boolean messageConsumed = consumer.getLatch().await(10, TimeUnit.SECONDS);
assertTrue(messageConsumed);
assertThat(consumer.getPayload(), containsString(data));
}
}
Let’s take a look at the differences. We’re declaring the kafka field, which is a standard JUnit @ClassRule. This field is an instance of the KafkaContainer class that will prepare and manage the life cycle of our container running Kafka.
To avoid port clashes, Testcontainers allocates a port number dynamically when our docker container starts.
For this reason, we provide a custom consumer and producer factory configuration using the class KafkaTestContainersConfiguration:
@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka.getBootstrapServers());
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "baeldung");
// more standard configuration
return props;
}
@Bean
public ProducerFactory<String, String> producerFactory() {
Map<String, Object> configProps = new HashMap<>();
configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka.getBootstrapServers());
// more standard configuration
return new DefaultKafkaProducerFactory<>(configProps);
}
We then reference this configuration via the @Import annotation at the beginning of our test.
The reason for this is that we need a way to inject the server address into our application, which as previously mentioned is generated dynamically.
We achieve this by calling the getBootstrapServers() method, which will return the bootstrap server location:
bootstrap.servers = [PLAINTEXT://localhost:32789]
Now when we run our test, we should see that Testcontainers does several things:
- Checks our local Docker setup
- Pulls the confluentinc/cp-kafka:5.4.3 docker image if necessary
- Starts a new container and waits for it to be ready
- Finally, shuts down and deletes the container after our test finishes
Again, this is confirmed by inspecting the test output:
13:33:10.396 [main] INFO ? [confluentinc/cp-kafka:5.4.3]
- Creating container for image: confluentinc/cp-kafka:5.4.3
13:33:10.454 [main] INFO ? [confluentinc/cp-kafka:5.4.3]
- Starting container with ID: b22b752cee2e9e9e6ade38e46d0c6d881ad941d17223bda073afe4d2fe0559c3
13:33:10.785 [main] INFO ? [confluentinc/cp-kafka:5.4.3]
- Container confluentinc/cp-kafka:5.4.3 is starting: b22b752cee2e9e9e6ade38e46d0c6d881ad941d17223bda073afe4d2fe0559c3
Presto! A working integration test using a Kafka docker container.
7. Conclusion
In this article, we learned about a couple of approaches for testing Kafka applications with Spring Boot.
In the first approach, we saw how to configure and use a local in-memory Kafka broker.
Then we saw how to use Testcontainers to set up an external Kafka broker running inside a docker container from our tests.
As always, the full source code of the article is available over on GitHub.