Finally, let’s implement the consumer for the main topic:
@KafkaListener(topics = { "payments" }, groupId = "payments")
public void handlePayment(
Payment payment, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {
log.info("Event on main topic={}, payload={}", topic, payment);
}
Before moving on to the DLT examples, we’ll discuss the retry configuration.
3.3. Turning Off Retries
In real-life projects, it’s common to retry processing an event in case of errors before sending it to DLT. This can be easily achieved using the non-blocking retries mechanism provided by Spring Kafka.
In this article, however, we’ll turn off the retries to highlight the DLT mechanism. An event will be published directly to the DLT when the consumer for the main topic fails to process it.
First, we need to define the producerFactory and the retryableTopicKafkaTemplate beans:
@Bean
public ProducerFactory<String, Payment> producerFactory() {
Map<String, Object> config = new HashMap<>();
config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
return new DefaultKafkaProducerFactory<>(
config, new StringSerializer(), new JsonSerializer<>());
}
@Bean
public KafkaTemplate<String, Payment> retryableTopicKafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
Now we can define the consumer for the main topic without additional retries, as described earlier:
@RetryableTopic(attempts = "1", kafkaTemplate = "retryableTopicKafkaTemplate")
@KafkaListener(topics = { "payments"}, groupId = "payments")
public void handlePayment(
Payment payment, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {
log.info("Event on main topic={}, payload={}", topic, payment);
}