1. Overview
In this tutorial, we’ll discuss the importance of implementing retry in Kafka. We’ll explore the various options available for implementing it on Spring Boot, and learn the best practices for maximizing the reliability and resilience of Kafka Consumer.
If we’re configuring Kafka on Spring for the first time, and want to learn more, we can start with an intro article to Spring and Kafka.
2. Project Setup
Let’s create a new Spring Boot project, and add the spring-kafka dependency:
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>3.1.2</version>
</dependency>
Now let’s create an object:
public class Greeting {
private String msg;
private String name;
// standard constructors, getters and setters
}
3. Kafka Consumer
A Kafka Consumer is a client application that reads data from a Kafka cluster. It subscribes to one or more topics, and consumes published messages. Producers send messages to a topic, which is a category name where records are stored and published. Topics are divided into several partitions to allow them to scale horizontally. Each partition is an immutable sequence of messages.
Consumers can read messages from a specific partition by specifying an offset, which is the position of the message within the partition. An ack (acknowledgment) is a message sent by a consumer to a Kafka broker to indicate that it has successfully processed a record. The consumer offset will be updated once the ack is sent.
This ensures the message is consumed, and won’t be delivered to the current listener again.
3.1. Ack Mode
The ack mode determines when the broker updates the consumer’s offset.
There are three acknowledgment modes:
- auto-commit: the consumer sends an acknowledgment to the broker as soon as it receives a message
- after-processing: the consumer only sends an acknowledgment to the broker after it has successfully processed the message
- manual: the consumer waits until it receives specific instructions before sending an acknowledgment to the broker
The Ack mode determines how the consumer handles the messages it reads from the Kafka cluster.
Let’s create a new bean that makes a new ConcurrentKafkaListenerContainerFactory:
@Bean
public ConcurrentKafkaListenerContainerFactory<String, Object> greetingKafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
// Other configurations
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.RECORD);
factory.afterPropertiesSet();
return factory;
}
There are several ack modes available that we can configure:
- AckMode.RECORD: In this after-processing mode, the consumer sends an acknowledgment for each message it processes.
- AckMode.BATCH: In this manual mode, the consumer sends an acknowledgment for a batch of messages, rather than for each message.
- AckMode.COUNT: In this manual mode, the consumer sends an acknowledgment after it has processed a specific number of messages.
- AckMode.MANUAL: In this manual mode, the consumer doesn’t send an acknowledgment for the messages it processes.
- AckMode.TIME: In this manual mode, the consumer sends an acknowledgment after a certain amount of time has passed.
To implement a retry logic for message processing in Kafka, we need to select an AckMode.
This AckMode should allow the consumer to indicate to the broker which specific messages have been successfully processed. This way, the broker can redeliver any unacknowledged messages to another consumer.
This may be the RECORD or MANUAL mode in the case of blocking retry.
4. Blocking Retry
A blocking retry enables the consumer to attempt consuming a message again if the initial attempt fails due to a temporary error. The consumer waits a certain amount of time, known as the retry backoff period, before trying to consume the message again.
Additionally, the consumer can customize the retry backoff period using either a fixed delay or an exponential backoff strategy. It can also set maximum retries before giving up and marking the message as failed.
4.1. Error Handler
Let’s define two properties on the Kafka configuration class:
@Value(value = "${kafka.backoff.interval}")
private Long interval;
@Value(value = "${kafka.backoff.max_failure}")
private Long maxAttempts;
To handle all exceptions thrown during the consuming process, we’ll define a new error handler:
@Bean
public DefaultErrorHandler errorHandler() {
BackOff fixedBackOff = new FixedBackOff(interval, maxAttempts);
DefaultErrorHandler errorHandler = new DefaultErrorHandler((consumerRecord, exception) -> {
// logic to execute when all the retry attemps are exhausted
}, fixedBackOff);
return errorHandler;
}
The FixedBackOff class takes two arguments:
- interval: the amount of time to wait between retries in milliseconds
- maxAttempts: the maximum number of times to retry the operation before giving up
In this strategy, the consumer waits a fixed time before retrying the message consumption.
The DefaultErrorHandler is being initialized with a lambda function representing the logic to execute when all the retry attempts are exhausted.
The lambda function takes two arguments:
- consumerRecord: represents the Kafka record that caused the error
- exception: represents the exception that was thrown
4.2. Container Factory
Let’s add a container factory bean to the error handler:
@Bean
public ConcurrentKafkaListenerContainerFactory<String, Object> greetingKafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
// Other configurations
factory.setCommonErrorHandler(errorHandler());
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.RECORD);
factory.afterPropertiesSet();
return factory;
}
If a retry policy is present, we’ll set the ack mode to AckMode.RECORD to make sure that the consumer will redeliver messages if an error happens during processing.
We shouldn’t set the ack mode to AckMode.BATCH or AckMode.TIME because the consumer will acknowledge multiple messages at once. This is because the consumer won’t redeliver all messages in the batch or time window to itself if an error occurs while processing a message.
So the retry policy won’t be able to handle the error properly.
4.3. Retryable Exception and Not Retryable Exception
We can specify which exceptions are retryable and which are non-retryable.
Let’s modify the ErrorHandler:
@Bean
public DefaultErrorHandler errorHandler() {
BackOff fixedBackOff = new FixedBackOff(interval, maxAttempts);
DefaultErrorHandler errorHandler = new DefaultErrorHandler((consumerRecord, e) -> {
// logic to execute when all the retry attemps are exhausted
}, fixedBackOff);
errorHandler.addRetryableExceptions(SocketTimeoutException.class);
errorHandler.addNotRetryableExceptions(NullPointerException.class);
return errorHandler;
}
Here we specified which exception types should trigger a retry policy in the consumer.
The SocketTimeoutException is considered retryable, while the NullPointerException is considered non-retryable.
If we don’t set any retryable exceptions, the default set of retryable exceptions will be used:
4.4. Advantages and Disadvantages
In the blocking retry, when message processing fails, the consumer blocks until the retry mechanism finishes its retries, or until the maximum number of retries is reached.
There are several advantages and disadvantages of using blocking retry.
Blocking retry can improve the reliability of the message processing pipeline by allowing the consumer to retry the consumption of a message if an error occurs. This can help to ensure that messages are processed successfully, even if transient errors occur.
Blocking retry can simplify the implementation of the message processing logic by abstracting away the retry mechanism. The consumer can focus on processing the message and leave the retry mechanism to handle any errors that may occur.
Finally, blocking retry may introduce delays in the message processing pipeline if the consumer is required to wait for the retry mechanism to complete its retries. This can impact the overall performance of the system. Blocking retry may also cause the consumer to consume more resources, such as CPU and memory, as it waits for the retry mechanism to complete its retries. This can impact the overall scalability of the system.
5. Non-Blocking Retry
Non-blocking retry allows the consumer to retry the consumption of a message asynchronously without blocking the execution of the message listener method.
5.1. @RetryableTopic
Let’s add the @RetryableTopic annotation to the KafkaListener:
@Component
@KafkaListener(id = "multiGroup", topics = "greeting")
public class MultiTypeKafkaListener {
@KafkaHandler
@RetryableTopic(
backoff = @Backoff(value = 3000L),
attempts = "5",
autoCreateTopics = "false",
include = SocketTimeoutException.class, exclude = NullPointerException.class)
public void handleGreeting(Greeting greeting) {
System.out.println("Greeting received: " + greeting);
}
}
We customized the retry behavior by modifying several properties, such as:
- backoff: This property specifies the backoff strategy to use when retrying a failed message.
- attempts: This property specifies the maximum number of times a message should be retried before giving up.
- autoCreateTopics: This property specifies whether or not to automatically create the retry topic and DLT (Dead Letter Topic) if they don’t already exist.
- include: This property specifies the exceptions that should trigger a retry.
- exclude: This property specifies the exceptions that shouldn’t trigger a retry.
When a message fails to be delivered to its intended topic, it’ll be automatically sent to the retry topic for retrying.
If the message still can’t be delivered after the maximum number of attempts, it’ll be sent to the DLT for further processing.
5.2. Advantages and Disadvantages
There are several advantages to implementing a non-blocking retry:
- Improved performance: Non-blocking retries allow for the retrying of failed messages without blocking the calling thread, which can improve the overall performance of the application.
- Increased reliability: Non-blocking retries can help the application recover from failures and continue processing messages, even if some messages fail to be delivered.
However, there are also some potential disadvantages to consider when implementing non-blocking retries:
- Increased complexity: Non-blocking retries can add additional complexity to the application, as we’ll need to handle the retry logic and DLT.
- Risk of message duplication: If a message is successfully delivered after a retry, the message may be delivered multiple times if both the original delivery and the retry succeeded. We’ll need to consider this risk, and implement measures to prevent message duplication if it’s a concern.
- Order of the message: Retried messages are sent to the retry topic asynchronously, and may be delivered to the original topic later than non-retried messages.
6. Conclusion
In this article, we analyzed how to implement retry logic on a Kafka topic, including blocking and non-blocking approaches.
As always, the full source code of the examples can be found over on GitHub.