1. Overview

In today’s event-driven architectures, managing data streams effectively is essential. Apache Kafka is a popular choice for this, but integrating it into our applications has its challenges, despite helper frameworks such as Spring Kafka. One major challenge is implementing proper dynamic listener management, which provides flexibility and control that is crucial for adapting to our application’s changing workloads and maintenance.

In this tutorial, we’ll learn how to dynamically start and stop Kafka listeners in Spring Boot applications.

2. Prerequisites

First, let’s import the spring-kafka dependency into our project:

<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
    <version>3.1.2</version>
</dependency>

3. Configuring the Kafka Consumer

Producers are applications that publish (write) events to Kafka topics.

Throughout this tutorial, we’ll use unit tests to simulate producers sending events to Kafka topics. Consumers, who subscribe to topics and process the stream of events, are represented by a listener within our application. This listener is configured to process incoming messages from Kafka.

Let’s configure our Kafka consumer through the KafkaConsumerConfig class, which includes the Kafka broker’s address, the consumer group ID, and the deserializers for the key and value:

@Bean
public DefaultKafkaConsumerFactory<String, UserEvent> consumerFactory() {
    Map<String, Object> props = new HashMap<>();
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
    props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
    props.put(JsonDeserializer.TRUSTED_PACKAGES, "com.baeldung.spring.kafka.start.stop.consumer");
    return new DefaultKafkaConsumerFactory<>(props, new StringDeserializer(), new JsonDeserializer<>(UserEvent.class));
}

4. Configuring the Kafka Listener

In Spring Kafka, annotating a method with @KafkaListener creates a listener that consumes messages from a specified topic. To define it, let’s declare a UserEventListener class:

@KafkaListener(id = Constants.LISTENER_ID, topics = Constants.MULTI_PARTITION_TOPIC, groupId = "test-group",
  containerFactory = "kafkaListenerContainerFactory", autoStartup = "false")
public void processUserEvent(UserEvent userEvent) {
    logger.info("Received UserEvent: " + userEvent.getUserEventId());
    userEventStore.addUserEvent(userEvent);
}

The above listener waits for messages from the topic multi_partition_topic and processes them using the processUserEvent() method. We assign the groupId as test-group, ensuring that the consumer becomes part of a broader group, thus facilitating distributed processing across multiple instances.

We assign each listener a unique identifier using the id attribute. In this example, the assigned listener ID is listener-id-1.

The autoStartup property gives us control over whether the listener is started when the application is initialized. In our example, we’ll set it to false, which means the listener does not start automatically when the application boots up. This configuration provides us with the flexibility to initiate the listener manually.

This manual initiation can be triggered by various events, such as a new user registration, a specific condition within the application, such as reaching a certain data volume threshold, or an administrative action, such as manually starting the listener via a management interface. For example, if an online retail application detects a surge in traffic during a flash sale, it could automatically start additional listeners to handle the increased load, optimizing performance.

The UserEventStore acts as a temporary storage for events received by the listener:

@Component
public class UserEventStore {

    private final List<UserEvent> userEvents = new ArrayList<>();

    public void addUserEvent(UserEvent userEvent) {
        userEvents.add(userEvent);
    }

    public List<UserEvent> getUserEvents() {
        return userEvents;
    }

    public void clearUserEvents() {
        this.userEvents.clear();
    }
}

5. Dynamically Controlling the Listener

Let’s create a KafkaListenerControlService that dynamically starts and stops Kafka listeners using KafkaListenerEndpointRegistry:

@Service
public class KafkaListenerControlService {

    @Autowired
    private KafkaListenerEndpointRegistry registry;

    public void startListener(String listenerId) {
        MessageListenerContainer listenerContainer = registry.getListenerContainer(listenerId);
        if (listenerContainer != null && !listenerContainer.isRunning()) {
            listenerContainer.start();
        }
    }

    public void stopListener(String listenerId) {
        MessageListenerContainer listenerContainer = registry.getListenerContainer(listenerId);
        if (listenerContainer != null && listenerContainer.isRunning()) {
            listenerContainer.stop();
        }
    }
}

The KafkaListenerControlService can precisely manage individual listener instances based on their assigned ID. Both startListener() and stopListener() methods use listenerId as a parameter, allowing us to start and stop message consumption from the topics as required*.*

KafkaListenerEndpointRegistry acts as a central repository for all Kafka listener endpoints defined within a Spring application context. It monitors these listener containers, thus permitting programmatic control over their state, be it starting, stopping, or pausing. This capability is essential for applications that need to adjust their message processing activities in real-time, without necessitating a restart of the entire application.

6. Validate Dynamic Listener Controls

Next, let’s focus on testing the dynamic start and stop capabilities of Kafka listeners within our Spring Boot application. First, let’s start the listener:

kafkaListenerControlService.startListener(Constants.LISTENER_ID);

We then verify that the listener is activated by sending and then processing a test event:

UserEvent startUserEventTest = new UserEvent(UUID.randomUUID().toString()); 
producer.send(new ProducerRecord<>(Constants.MULTI_PARTITION_TOPIC, startUserEventTest)); 
await().untilAsserted(() -> assertEquals(1, this.userEventStore.getUserEvents().size())); 
this.userEventStore.clearUserEvents();

Now that the listener is active, we’ll send a batch of ten messages for processing. After sending four messages, we’ll stop the listener and then send the remaining messages to the Kafka topic:

for (long count = 1; count <= 10; count++) {
    UserEvent userEvent = new UserEvent(UUID.randomUUID().toString());
    Future<RecordMetadata> future = producer.send(new ProducerRecord<>(Constants.MULTI_PARTITION_TOPIC, userEvent));
    RecordMetadata metadata = future.get();
    if (count == 4) {
        await().untilAsserted(() -> assertEquals(4, this.userEventStore.getUserEvents().size()));
        this.kafkaListenerControlService.stopListener(Constants.LISTENER_ID);
        this.userEventStore.clearUserEvents();
    }
    logger.info("User Event ID: " + userEvent.getUserEventId() + ", Partition : " + metadata.partition());
}

Before starting the listener, we’ll verify that there are no messages in the event store:

assertEquals(0, this.userEventStore.getUserEvents().size());
kafkaListenerControlService.startListener(Constants.LISTENER_ID);
await().untilAsserted(() -> assertEquals(6, this.userEventStore.getUserEvents().size()));
kafkaListenerControlService.stopListener(Constants.LISTENER_ID);

Once the listener starts again, it processes the remaining six messages that we sent to the Kafka topic after the listener was stopped. This test showcases the Spring Boot application’s ability to dynamically manage Kafka listeners.

7. Use Cases

Dynamic listener management excels in scenarios requiring high adaptability. For example, during peak load times, we can dynamically start additional listeners to enhance throughput and reduce processing time. Conversely, during maintenance or low-traffic periods, we can stop listeners to conserve resources. This flexibility is also beneficial for deploying new features behind feature flags, allowing seamless on-the-fly adjustments without impacting the overall system.

Let’s consider a scenario where an e-commerce platform introduces a new recommendation engine designed to enhance the user experience by suggesting products based on browsing history and purchase patterns. To verify this feature’s effectiveness before its full-scale launch, we decide to deploy it behind a feature flag.

Activating this feature flag starts the Kafka listener. As end users interact with the platform, the recommendation engine, powered by Kafka listeners, processes the incoming stream of user activity data to generate personalized product recommendations.

When we deactivate the feature flag, we stop the Kafka listener, and the platform defaults to its existing recommendation engine. This ensures a seamless user experience, regardless of the testing phase of the new engine.

While the feature is active, we actively collect data, monitor performance metrics, and make adjustments to the recommendation engine. We repeat this feature testing across multiple iterations until we achieve the desired results.

Through this iterative process, dynamic listener management proves to be a valuable tool. It allows for the seamless introduction of new features

8. Conclusion

In this article, we’ve addressed Kafka integration with Spring Boot, focusing on dynamically managing Kafka listeners. This capability is crucial for managing fluctuating workloads and performing routine maintenance. Additionally, it enables feature toggling, scales services based on traffic patterns, and manages event-driven workflows with specific triggers.

As always, the source for the examples is available over on GitHub.