1. 概述
本文将探讨在Kafka中实现重试机制的重要性。我们将分析在Spring Boot中实现重试的多种方案,并学习提升Kafka消费者可靠性和弹性的最佳实践。
如果是首次在Spring中配置Kafka,建议先阅读Spring和Kafka入门文章。
2. 项目搭建
创建新的Spring Boot项目,添加spring-kafka依赖:
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>3.1.2</version>
</dependency>
创建一个实体类:
public class Greeting {
private String msg;
private String name;
// 标准构造器、getter和setter
}
3. Kafka消费者
Kafka消费者是从Kafka集群读取数据的客户端应用。它订阅一个或多个主题(topic),消费已发布的消息。生产者将消息发送到主题(存储和发布记录的类别)。主题被划分为多个分区(partition)以实现水平扩展。每个分区都是不可变的消息序列。
消费者可以通过指定偏移量(offset,即消息在分区中的位置)来读取特定分区的消息。确认(ack)是消费者发送给Kafka代理(broker)的消息,表示已成功处理记录。确认发送后,消费者偏移量才会更新。
这确保消息被消费,且不会再次传递给当前监听器。
3.1. 确认模式
确认模式决定代理何时更新消费者偏移量。
有三种确认模式:
- 自动提交(auto-commit):消费者收到消息后立即发送确认
- 处理后确认(after-processing):消费者成功处理消息后才发送确认
- 手动确认(manual):消费者收到特定指令后才发送确认
确认模式决定了消费者如何处理从Kafka集群读取的消息。
创建一个新的ConcurrentKafkaListenerContainerFactory
Bean:
@Bean
public ConcurrentKafkaListenerContainerFactory<String, Object> greetingKafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
// 其他配置
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.RECORD);
factory.afterPropertiesSet();
return factory;
}
可配置的确认模式包括:
- AckMode.RECORD:处理后确认模式,每处理一条消息发送一次确认
- AckMode.BATCH:手动模式,按批次确认而非单条消息
- AckMode.COUNT:手动模式,处理指定数量消息后确认
- AckMode.MANUAL:手动模式,不自动发送确认
- AckMode.TIME:手动模式,指定时间间隔后确认
要在Kafka中实现消息处理重试逻辑,需选择合适的AckMode。
该AckMode应允许消费者向代理明确指示哪些消息已成功处理。 这样代理可将未确认的消息重新投递给其他消费者。
阻塞重试场景下,通常使用RECORD或MANUAL模式。
4. 阻塞重试
阻塞重试允许消费者在因临时错误导致首次消费失败时重试。消费者在重试前会等待一段时间(称为重试退避期)。
此外,消费者可通过固定延迟或指数退避策略自定义重试间隔,还可设置最大重试次数,超过后标记消息为失败。
4.1. 错误处理器
在Kafka配置类中定义两个属性:
@Value(value = "${kafka.backoff.interval}")
private Long interval;
@Value(value = "${kafka.backoff.max_failure}")
private Long maxAttempts;
定义错误处理器处理消费过程中的所有异常:
@Bean
public DefaultErrorHandler errorHandler() {
BackOff fixedBackOff = new FixedBackOff(interval, maxAttempts);
DefaultErrorHandler errorHandler = new DefaultErrorHandler((consumerRecord, exception) -> {
// 所有重试尝试耗尽时执行的逻辑
}, fixedBackOff);
return errorHandler;
}
FixedBackOff类接受两个参数:
- interval:重试间隔时间(毫秒)
- maxAttempts:放弃前的最大重试次数
此策略中,消费者在重试前等待固定时间。
DefaultErrorHandler通过Lambda函数初始化,该函数定义所有重试尝试耗尽时的执行逻辑。
Lambda函数接受两个参数:
- consumerRecord:导致错误的Kafka记录
- exception:抛出的异常
4.2. 容器工厂
将错误处理器添加到容器工厂:
@Bean
public ConcurrentKafkaListenerContainerFactory<String, Object> greetingKafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
// 其他配置
factory.setCommonErrorHandler(errorHandler());
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.RECORD);
factory.afterPropertiesSet();
return factory;
}
如果存在重试策略,需将确认模式设为AckMode.RECORD,确保处理出错时消息能被重新投递。
❌ 不应使用AckMode.BATCH或AckMode.TIME,因为消费者会批量确认消息。处理单条消息出错时,整个批次或时间窗口内的消息不会重新投递给自身。
这会导致重试策略无法正确处理错误。
4.3. 可重试异常与不可重试异常
可指定哪些异常可重试,哪些不可重试。
修改ErrorHandler:
@Bean
public DefaultErrorHandler errorHandler() {
BackOff fixedBackOff = new FixedBackOff(interval, maxAttempts);
DefaultErrorHandler errorHandler = new DefaultErrorHandler((consumerRecord, e) -> {
// 所有重试尝试耗尽时执行的逻辑
}, fixedBackOff);
errorHandler.addRetryableExceptions(SocketTimeoutException.class);
errorHandler.addNotRetryableExceptions(NullPointerException.class);
return errorHandler;
}
这里指定了哪些异常类型应触发消费者的重试策略。
SocketTimeoutException被视为可重试,而NullPointerException被视为不可重试。
若未设置可重试异常,将使用默认集合:
4.4. 优缺点分析
阻塞重试中,消息处理失败时,消费者会阻塞直到重试完成或达到最大重试次数。
✅ 优点:
- 提升可靠性:允许消费者在出错时重试,确保消息最终成功处理
- 简化实现:将重试机制抽象化,消费者只需关注消息处理逻辑
⚠️ 缺点:
- 引入延迟:消费者需等待重试完成,可能影响整体性能
- 资源消耗:重试期间占用CPU和内存资源,影响系统可扩展性
5. 非阻塞重试
非阻塞重试允许消费者异步重试消息消费,不阻塞监听器方法执行。
5.1. @RetryableTopic注解
在KafkaListener上添加@RetryableTopic注解:
@Component
@KafkaListener(id = "multiGroup", topics = "greeting")
public class MultiTypeKafkaListener {
@KafkaHandler
@RetryableTopic(
backoff = @Backoff(value = 3000L),
attempts = "5",
autoCreateTopics = "false",
include = SocketTimeoutException.class, exclude = NullPointerException.class)
public void handleGreeting(Greeting greeting) {
System.out.println("Greeting received: " + greeting);
}
}
通过修改属性自定义重试行为:
- backoff:重试时的退避策略
- attempts:放弃前的最大重试次数
- autoCreateTopics:是否自动创建重试主题和DLT(死信主题)
- include:触发重试的异常类型
- exclude:不触发重试的异常类型
当消息无法投递到目标主题时,会自动发送到重试主题。若达到最大重试次数仍未成功,则发送到DLT进行后续处理。
5.2. 优缺点分析
✅ 优点:
- 性能提升:不阻塞调用线程,提高应用整体性能
- 可靠性增强:应用能从故障中恢复并继续处理消息
⚠️ 缺点:
- 复杂度增加:需处理重试逻辑和DLT
- 消息重复风险:若原始投递和重试均成功,可能导致消息重复
- 消息顺序问题:重试消息异步发送,可能晚于非重试消息投递
6. 总结
本文分析了在Kafka主题中实现重试逻辑的方案,包括阻塞和非阻塞两种方法。
完整示例代码可在GitHub获取。