1. 概述

本文将探讨在Kafka中实现重试机制的重要性。我们将分析在Spring Boot中实现重试的多种方案,并学习提升Kafka消费者可靠性和弹性的最佳实践。

如果是首次在Spring中配置Kafka,建议先阅读Spring和Kafka入门文章

2. 项目搭建

创建新的Spring Boot项目,添加spring-kafka依赖:

<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
    <version>3.1.2</version>
</dependency>

创建一个实体类:

public class Greeting {

    private String msg;
    private String name;

    // 标准构造器、getter和setter
}

3. Kafka消费者

Kafka消费者是从Kafka集群读取数据的客户端应用。它订阅一个或多个主题(topic),消费已发布的消息。生产者将消息发送到主题(存储和发布记录的类别)。主题被划分为多个分区(partition)以实现水平扩展。每个分区都是不可变的消息序列。

消费者可以通过指定偏移量(offset,即消息在分区中的位置)来读取特定分区的消息。确认(ack)是消费者发送给Kafka代理(broker)的消息,表示已成功处理记录。确认发送后,消费者偏移量才会更新。

这确保消息被消费,且不会再次传递给当前监听器。

3.1. 确认模式

确认模式决定代理何时更新消费者偏移量。

有三种确认模式:

  1. 自动提交(auto-commit):消费者收到消息后立即发送确认
  2. 处理后确认(after-processing):消费者成功处理消息后才发送确认
  3. 手动确认(manual):消费者收到特定指令后才发送确认

确认模式决定了消费者如何处理从Kafka集群读取的消息。

创建一个新的ConcurrentKafkaListenerContainerFactory Bean:

@Bean
public ConcurrentKafkaListenerContainerFactory<String, Object> greetingKafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<String, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
    // 其他配置
    factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.RECORD);
    factory.afterPropertiesSet();
    return factory;
}

可配置的确认模式包括:

  1. AckMode.RECORD:处理后确认模式,每处理一条消息发送一次确认
  2. AckMode.BATCH:手动模式,按批次确认而非单条消息
  3. AckMode.COUNT:手动模式,处理指定数量消息后确认
  4. AckMode.MANUAL:手动模式,不自动发送确认
  5. AckMode.TIME:手动模式,指定时间间隔后确认

要在Kafka中实现消息处理重试逻辑,需选择合适的AckMode

AckMode应允许消费者向代理明确指示哪些消息已成功处理。 这样代理可将未确认的消息重新投递给其他消费者。

阻塞重试场景下,通常使用RECORDMANUAL模式。

4. 阻塞重试

阻塞重试允许消费者在因临时错误导致首次消费失败时重试。消费者在重试前会等待一段时间(称为重试退避期)。

此外,消费者可通过固定延迟或指数退避策略自定义重试间隔,还可设置最大重试次数,超过后标记消息为失败。

4.1. 错误处理器

在Kafka配置类中定义两个属性:

@Value(value = "${kafka.backoff.interval}")
private Long interval;

@Value(value = "${kafka.backoff.max_failure}")
private Long maxAttempts;

定义错误处理器处理消费过程中的所有异常:

@Bean
public DefaultErrorHandler errorHandler() {
    BackOff fixedBackOff = new FixedBackOff(interval, maxAttempts);
    DefaultErrorHandler errorHandler = new DefaultErrorHandler((consumerRecord, exception) -> {
        // 所有重试尝试耗尽时执行的逻辑
    }, fixedBackOff);
    return errorHandler;
}

FixedBackOff类接受两个参数:

  • interval:重试间隔时间(毫秒)
  • maxAttempts:放弃前的最大重试次数

此策略中,消费者在重试前等待固定时间。

DefaultErrorHandler通过Lambda函数初始化,该函数定义所有重试尝试耗尽时的执行逻辑。

Lambda函数接受两个参数:

  • consumerRecord:导致错误的Kafka记录
  • exception:抛出的异常

4.2. 容器工厂

将错误处理器添加到容器工厂:

@Bean
public ConcurrentKafkaListenerContainerFactory<String, Object> greetingKafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<String, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
    // 其他配置
    factory.setCommonErrorHandler(errorHandler());
    factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.RECORD);
    factory.afterPropertiesSet();
    return factory;
}

如果存在重试策略,需将确认模式设为AckMode.RECORD,确保处理出错时消息能被重新投递。

❌ 不应使用AckMode.BATCHAckMode.TIME,因为消费者会批量确认消息。处理单条消息出错时,整个批次或时间窗口内的消息不会重新投递给自身。

这会导致重试策略无法正确处理错误。

4.3. 可重试异常与不可重试异常

可指定哪些异常可重试,哪些不可重试。

修改ErrorHandler

@Bean
public DefaultErrorHandler errorHandler() {
    BackOff fixedBackOff = new FixedBackOff(interval, maxAttempts);
    DefaultErrorHandler errorHandler = new DefaultErrorHandler((consumerRecord, e) -> {
        // 所有重试尝试耗尽时执行的逻辑
    }, fixedBackOff);
    errorHandler.addRetryableExceptions(SocketTimeoutException.class);
    errorHandler.addNotRetryableExceptions(NullPointerException.class);
    return errorHandler;
}

这里指定了哪些异常类型应触发消费者的重试策略。

SocketTimeoutException被视为可重试,而NullPointerException被视为不可重试。

若未设置可重试异常,将使用默认集合:

4.4. 优缺点分析

阻塞重试中,消息处理失败时,消费者会阻塞直到重试完成或达到最大重试次数。

优点

  • 提升可靠性:允许消费者在出错时重试,确保消息最终成功处理
  • 简化实现:将重试机制抽象化,消费者只需关注消息处理逻辑

⚠️ 缺点

  • 引入延迟:消费者需等待重试完成,可能影响整体性能
  • 资源消耗:重试期间占用CPU和内存资源,影响系统可扩展性

5. 非阻塞重试

非阻塞重试允许消费者异步重试消息消费,不阻塞监听器方法执行。

5.1. @RetryableTopic注解

KafkaListener上添加@RetryableTopic注解:

@Component
@KafkaListener(id = "multiGroup", topics = "greeting")
public class MultiTypeKafkaListener {

    @KafkaHandler
    @RetryableTopic(
      backoff = @Backoff(value = 3000L), 
      attempts = "5", 
      autoCreateTopics = "false",
      include = SocketTimeoutException.class, exclude = NullPointerException.class)
    public void handleGreeting(Greeting greeting) {
        System.out.println("Greeting received: " + greeting);
    }
}

通过修改属性自定义重试行为:

  • backoff:重试时的退避策略
  • attempts:放弃前的最大重试次数
  • autoCreateTopics:是否自动创建重试主题和DLT(死信主题)
  • include:触发重试的异常类型
  • exclude:不触发重试的异常类型

当消息无法投递到目标主题时,会自动发送到重试主题。若达到最大重试次数仍未成功,则发送到DLT进行后续处理。

5.2. 优缺点分析

优点

  • 性能提升:不阻塞调用线程,提高应用整体性能
  • 可靠性增强:应用能从故障中恢复并继续处理消息

⚠️ 缺点

  • 复杂度增加:需处理重试逻辑和DLT
  • 消息重复风险:若原始投递和重试均成功,可能导致消息重复
  • 消息顺序问题:重试消息异步发送,可能晚于非重试消息投递

6. 总结

本文分析了在Kafka主题中实现重试逻辑的方案,包括阻塞和非阻塞两种方法。

完整示例代码可在GitHub获取。


原始标题:Implementing Retry in Kafka Consumer | Baeldung