1. 概述
Apache Kafka 是一个分布式、高容错的流处理系统。本文将介绍 Spring 对 Kafka 的支持,以及它如何通过抽象层简化原生 Kafka Java 客户端 API 的使用。
Spring Kafka 提供了两种核心抽象:
- KafkaTemplate:实现典型的 Spring 模板编程模型,简化消息发送
- @KafkaListener:支持基于 POJO 的消息驱动编程,通过注解实现消息监听
2. Spring Kafka 中的监听器容器是什么?
Spring 框架通过依赖注入 (DI) 实现控制反转 (IoC) 原则。对象直接定义其依赖关系,IoC 容器在创建 bean 时注入这些依赖。bean 是由 Spring IoC 容器实例化、组装和管理的对象。
在 Kafka 上下文中,监听器容器是包含 Kafka 消息消费者(监听器)的容器。Spring for Apache Kafka 使用容器工厂创建消息监听器容器。我们使用 @KafkaListener
注解将 bean 方法标记为监听器容器的消息监听器。容器工厂会为所有带 @KafkaListener
注解的 bean 方法创建监听器容器。
Spring Kafka 提供以下核心接口和类管理监听器容器:
MessageListenerContainer
:创建 Kafka 消息监听器容器的抽象接口KafkaMessageListenerContainer
:创建单线程消息监听器容器的实现类ConcurrentMessageListenerContainer
:基于并发度创建多个KafkaMessageListenerContainer
的实现类KafkaListenerContainerFactory
:MessageListenerContainer
的抽象工厂ConcurrentKafkaListenerContainerFactory
:创建ConcurrentMessageListenerContainer
的实现类
3. 安装与配置
3.1. 环境准备
下载安装 Kafka 请参考官方快速入门指南。本文假设服务器已使用默认配置启动,且未修改任何端口。
3.2. Maven 依赖
在 pom.xml
添加 spring-kafka
依赖:
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>3.3.1</version>
</dependency>
配置 spring-boot-maven-plugin
:
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<configuration>
<mainClass>com.baeldung.spring.kafka.KafkaApplication</mainClass>
</configuration>
</plugin>
⚠️ 最新版本可在 Maven 中央仓库 查询
4. 配置 Topic
4.1. 传统方式(不推荐)
过去我们通过命令行工具创建 Topic:
$ bin/kafka-topics.sh --create \
--zookeeper localhost:2181 \
--replication-factor 1 --partitions 1 \
--topic mytopic
4.2. 程序化创建(推荐)
Kafka 引入 AdminClient
后,可通过代码创建 Topic。添加 KafkaAdmin
Spring bean,它会自动为所有 NewTopic
类型的 bean 创建 Topic:
@Configuration
public class KafkaTopicConfig {
@Value(value = "${spring.kafka.bootstrap-servers}")
private String bootstrapAddress;
@Bean
public KafkaAdmin kafkaAdmin() {
Map<String, Object> configs = new HashMap<>();
configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
return new KafkaAdmin(configs);
}
@Bean
public NewTopic topic1() {
return new NewTopic("baeldung", 1, (short) 1);
}
}
5. 生产消息
5.1. 生产者配置
首先需要配置 ProducerFactory
,它定义创建 Kafka Producer
实例的策略。然后创建 KafkaTemplate
,它封装 Producer
实例并提供便捷的消息发送方法。
✅
Producer
实例是线程安全的,整个应用使用单例能获得更高性能,因此KafkaTemplate
也是线程安全的。
@Configuration
public class KafkaProducerConfig {
@Bean
public ProducerFactory<String, String> producerFactory() {
Map<String, Object> configProps = new HashMap<>();
configProps.put(
ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
bootstrapAddress);
configProps.put(
ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
StringSerializer.class);
configProps.put(
ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
StringSerializer.class);
return new DefaultKafkaProducerFactory<>(configProps);
}
@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
}
5.2. 发送消息
使用 KafkaTemplate
发送消息:
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
public void sendMessage(String msg) {
kafkaTemplate.send(topicName, msg);
}
send
API 返回 CompletableFuture
对象。如果需要阻塞线程获取发送结果,可调用 get()
方法,但这会降低生产者吞吐量。
⚠️ Kafka 是高速流处理平台,建议异步处理结果避免消息堆积
通过回调实现异步处理:
public void sendMessage(String message) {
CompletableFuture<SendResult<String, String>> future = kafkaTemplate.send(topicName, message);
future.whenComplete((result, ex) -> {
if (ex == null) {
System.out.println("Sent message=[" + message +
"] with offset=[" + result.getRecordMetadata().offset() + "]");
} else {
System.out.println("Unable to send message=[" +
message + "] due to : " + ex.getMessage());
}
});
}
6. 消费消息
6.1. 消费者配置
需要配置 ConsumerFactory
和 KafkaListenerContainerFactory
。在配置类添加 @EnableKafka
注解以启用 @KafkaListener
检测:
@EnableKafka
@Configuration
public class KafkaConsumerConfig {
@Bean
public ConsumerFactory<String, String> consumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
bootstrapAddress);
props.put(
ConsumerConfig.GROUP_ID_CONFIG,
groupId);
props.put(
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class);
props.put(
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class);
return new DefaultKafkaConsumerFactory<>(props);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String>
kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
}
6.2. 基础消费
使用 @KafkaListener
创建 POJO 监听器:
@KafkaListener(topics = "topicName", groupId = "foo")
public void listenGroupFoo(String message) {
System.out.println("Received Message in group foo: " + message);
}
支持多监听器监听同一 Topic(需不同 groupId),也支持单监听器监听多 Topic:
@KafkaListener(topics = "topic1, topic2", groupId = "foo")
通过 @Header
获取消息头:
@KafkaListener(topics = "topicName")
public void listenWithHeaders(
@Payload String message,
@Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition) {
System.out.println(
"Received Message: " + message +
"from partition: " + partition);
}
6.3. 消费指定分区
对于多分区 Topic,可通过 @TopicPartition
指定分区和初始偏移量:
@KafkaListener(
topicPartitions = @TopicPartition(topic = "topicName",
partitionOffsets = {
@PartitionOffset(partition = "0", initialOffset = "0"),
@PartitionOffset(partition = "3", initialOffset = "0")}),
containerFactory = "partitionsKafkaListenerContainerFactory")
public void listenToPartition(
@Payload String message,
@Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition) {
System.out.println(
"Received Message: " + message +
"from partition: " + partition);
}
⚠️ 设置
initialOffset=0
会导致监听器每次初始化时重新消费分区 0 和 3 的历史消息
仅指定分区不设置偏移量:
@KafkaListener(topicPartitions
= @TopicPartition(topic = "topicName", partitions = { "0", "1" }))
6.4. 消息过滤
通过 RecordFilterStrategy
配置消息过滤器:
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String>
filterKafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setRecordFilterStrategy(
record -> record.value().contains("World"));
return factory;
}
监听器使用过滤工厂:
@KafkaListener(
topics = "topicName",
containerFactory = "filterKafkaListenerContainerFactory")
public void listenWithFilter(String message) {
System.out.println("Received Message in filtered listener: " + message);
}
此监听器会丢弃所有匹配过滤条件的消息
7. 自定义消息转换器
7.1. 发送自定义对象
定义消息对象:
public class Greeting {
private String msg;
private String name;
// 标准构造器/getter/setter
}
配置 ProducerFactory
使用 JsonSerializer
:
@Bean
public ProducerFactory<String, Greeting> greetingProducerFactory() {
// ...
configProps.put(
ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
JsonSerializer.class);
return new DefaultKafkaProducerFactory<>(configProps);
}
@Bean
public KafkaTemplate<String, Greeting> greetingKafkaTemplate() {
return new KafkaTemplate<>(greetingProducerFactory());
}
发送自定义对象:
kafkaTemplate.send(topicName, new Greeting("Hello", "World"));
7.2. 消费自定义对象
配置消费者使用 JsonDeserializer
:
@Bean
public ConsumerFactory<String, Greeting> greetingConsumerFactory() {
// ...
return new DefaultKafkaConsumerFactory<>(
props,
new StringDeserializer(),
new JsonDeserializer<>(Greeting.class));
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, Greeting>
greetingKafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, Greeting> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(greetingConsumerFactory());
return factory;
}
添加 Jackson 依赖(spring-kafka 的可选依赖):
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.18.2</version>
</dependency>
⚠️ 建议使用与 spring-kafka 兼容的 Jackson 版本
监听自定义对象:
@KafkaListener(
topics = "topicName",
containerFactory = "greetingKafkaListenerContainerFactory")
public void greetingListener(Greeting greeting) {
// 处理 Greeting 消息
}
8. 多方法监听器
8.1. 生产者类型映射
定义新消息类型:
public class Farewell {
private String message;
private Integer remainingMinutes;
// 标准构造器/getter/setter
}
配置 JSON 类型映射:
configProps.put(JsonSerializer.TYPE_MAPPINGS,
"greeting:com.baeldung.spring.kafka.Greeting, farewell:com.baeldung.spring.kafka.Farewell");
完整生产者配置:
@Bean
public ProducerFactory<String, Object> multiTypeProducerFactory() {
Map<String, Object> configProps = new HashMap<>();
configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
configProps.put(JsonSerializer.TYPE_MAPPINGS,
"greeting:com.baeldung.spring.kafka.Greeting, farewell:com.baeldung.spring.kafka.Farewell");
return new DefaultKafkaProducerFactory<>(configProps);
}
@Bean
public KafkaTemplate<String, Object> multiTypeKafkaTemplate() {
return new KafkaTemplate<>(multiTypeProducerFactory());
}
发送多种类型消息:
multiTypeKafkaTemplate.send(multiTypeTopicName, new Greeting("Greetings", "World!"));
multiTypeKafkaTemplate.send(multiTypeTopicName, new Farewell("Farewell", 25));
multiTypeKafkaTemplate.send(multiTypeTopicName, "Simple string message");
8.2. 消费者消息转换器
配置自定义 MessageConverter
:
@Bean
public RecordMessageConverter multiTypeConverter() {
StringJsonMessageConverter converter = new StringJsonMessageConverter();
DefaultJackson2JavaTypeMapper typeMapper = new DefaultJackson2JavaTypeMapper();
typeMapper.setTypePrecedence(Jackson2JavaTypeMapper.TypePrecedence.TYPE_ID);
typeMapper.addTrustedPackages("com.baeldung.spring.kafka");
Map<String, Class<?>> mappings = new HashMap<>();
mappings.put("greeting", Greeting.class);
mappings.put("farewell", Farewell.class);
typeMapper.setIdClassMapping(mappings);
converter.setTypeMapper(typeMapper);
return converter;
}
配置消费者工厂:
@Bean
public ConsumerFactory<String, Object> multiTypeConsumerFactory() {
HashMap<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
return new DefaultKafkaConsumerFactory<>(props);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, Object> multiTypeKafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(multiTypeConsumerFactory());
factory.setRecordMessageConverter(multiTypeConverter());
return factory;
}
8.3. 多方法监听器实现
使用 @KafkaHandler
处理不同类型消息:
@Component
@KafkaListener(id = "multiGroup", topics = "multitype")
public class MultiTypeKafkaListener {
@KafkaHandler
public void handleGreeting(Greeting greeting) {
System.out.println("Greeting received: " + greeting);
}
@KafkaHandler
public void handleF(Farewell farewell) {
System.out.println("Farewell received: " + farewell);
}
@KafkaHandler(isDefault = true)
public void unknown(Object object) {
System.out.println("Unkown type received: " + object);
}
}
✅
isDefault=true
标记的方法作为默认处理器,处理未匹配类型的消息
9. 总结
本文介绍了 Spring 对 Apache Kafka 的核心支持,包括:
- ✅ 监听器容器原理与配置
- ✅ Topic 程序化创建
- ✅ 消息生产与消费(含分区控制)
- ✅ 自定义消息转换器
- ✅ 多类型消息处理方案
运行前请确保:
- Kafka 服务已启动
- 相关 Topic 已创建(或使用程序化创建)
⚠️ 生产环境建议手动管理 Topic,避免程序化创建带来的潜在风险