1. 概述

Apache Kafka 是一个分布式、高容错的流处理系统。本文将介绍 Spring 对 Kafka 的支持,以及它如何通过抽象层简化原生 Kafka Java 客户端 API 的使用。

Spring Kafka 提供了两种核心抽象:

  • KafkaTemplate:实现典型的 Spring 模板编程模型,简化消息发送
  • @KafkaListener:支持基于 POJO 的消息驱动编程,通过注解实现消息监听

2. Spring Kafka 中的监听器容器是什么?

Spring 框架通过依赖注入 (DI) 实现控制反转 (IoC) 原则。对象直接定义其依赖关系,IoC 容器在创建 bean 时注入这些依赖。bean 是由 Spring IoC 容器实例化、组装和管理的对象。

在 Kafka 上下文中,监听器容器是包含 Kafka 消息消费者(监听器)的容器。Spring for Apache Kafka 使用容器工厂创建消息监听器容器。我们使用 @KafkaListener 注解将 bean 方法标记为监听器容器的消息监听器。容器工厂会为所有带 @KafkaListener 注解的 bean 方法创建监听器容器。

Spring Kafka 提供以下核心接口和类管理监听器容器:

  • MessageListenerContainer:创建 Kafka 消息监听器容器的抽象接口
  • KafkaMessageListenerContainer:创建单线程消息监听器容器的实现类
  • ConcurrentMessageListenerContainer:基于并发度创建多个 KafkaMessageListenerContainer 的实现类
  • KafkaListenerContainerFactoryMessageListenerContainer 的抽象工厂
  • ConcurrentKafkaListenerContainerFactory:创建 ConcurrentMessageListenerContainer 的实现类

3. 安装与配置

3.1. 环境准备

下载安装 Kafka 请参考官方快速入门指南。本文假设服务器已使用默认配置启动,且未修改任何端口。

3.2. Maven 依赖

pom.xml 添加 spring-kafka 依赖:

<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
    <version>3.3.1</version>
</dependency>

配置 spring-boot-maven-plugin

<plugin>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-maven-plugin</artifactId>
    <configuration>
        <mainClass>com.baeldung.spring.kafka.KafkaApplication</mainClass>
    </configuration>
</plugin>

⚠️ 最新版本可在 Maven 中央仓库 查询

4. 配置 Topic

4.1. 传统方式(不推荐)

过去我们通过命令行工具创建 Topic:

$ bin/kafka-topics.sh --create \
  --zookeeper localhost:2181 \
  --replication-factor 1 --partitions 1 \
  --topic mytopic

4.2. 程序化创建(推荐)

Kafka 引入 AdminClient 后,可通过代码创建 Topic。添加 KafkaAdmin Spring bean,它会自动为所有 NewTopic 类型的 bean 创建 Topic

@Configuration
public class KafkaTopicConfig {
    
    @Value(value = "${spring.kafka.bootstrap-servers}")
    private String bootstrapAddress;

    @Bean
    public KafkaAdmin kafkaAdmin() {
        Map<String, Object> configs = new HashMap<>();
        configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
        return new KafkaAdmin(configs);
    }
    
    @Bean
    public NewTopic topic1() {
        return new NewTopic("baeldung", 1, (short) 1);
    }
}

5. 生产消息

5.1. 生产者配置

首先需要配置 ProducerFactory,它定义创建 Kafka Producer 实例的策略。然后创建 KafkaTemplate,它封装 Producer 实例并提供便捷的消息发送方法。

Producer 实例是线程安全的,整个应用使用单例能获得更高性能,因此 KafkaTemplate 也是线程安全的。

@Configuration
public class KafkaProducerConfig {

    @Bean
    public ProducerFactory<String, String> producerFactory() {
        Map<String, Object> configProps = new HashMap<>();
        configProps.put(
          ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 
          bootstrapAddress);
        configProps.put(
          ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
          StringSerializer.class);
        configProps.put(
          ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
          StringSerializer.class);
        return new DefaultKafkaProducerFactory<>(configProps);
    }

    @Bean
    public KafkaTemplate<String, String> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }
}

5.2. 发送消息

使用 KafkaTemplate 发送消息:

@Autowired
private KafkaTemplate<String, String> kafkaTemplate;

public void sendMessage(String msg) {
    kafkaTemplate.send(topicName, msg);
}

send API 返回 CompletableFuture 对象。如果需要阻塞线程获取发送结果,可调用 get() 方法,但这会降低生产者吞吐量。

⚠️ Kafka 是高速流处理平台,建议异步处理结果避免消息堆积

通过回调实现异步处理:

public void sendMessage(String message) {
    CompletableFuture<SendResult<String, String>> future = kafkaTemplate.send(topicName, message);
    future.whenComplete((result, ex) -> {
        if (ex == null) {
            System.out.println("Sent message=[" + message + 
              "] with offset=[" + result.getRecordMetadata().offset() + "]");
        } else {
              System.out.println("Unable to send message=[" + 
                message + "] due to : " + ex.getMessage());
          }
    });
}

6. 消费消息

6.1. 消费者配置

需要配置 ConsumerFactoryKafkaListenerContainerFactory在配置类添加 @EnableKafka 注解以启用 @KafkaListener 检测

@EnableKafka
@Configuration
public class KafkaConsumerConfig {

    @Bean
    public ConsumerFactory<String, String> consumerFactory() {
        Map<String, Object> props = new HashMap<>();
        props.put(
          ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, 
          bootstrapAddress);
        props.put(
          ConsumerConfig.GROUP_ID_CONFIG, 
          groupId);
        props.put(
          ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
          StringDeserializer.class);
        props.put(
          ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
          StringDeserializer.class);
        return new DefaultKafkaConsumerFactory<>(props);
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, String> 
      kafkaListenerContainerFactory() {
   
        ConcurrentKafkaListenerContainerFactory<String, String> factory =
          new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        return factory;
    }
}

6.2. 基础消费

使用 @KafkaListener 创建 POJO 监听器:

@KafkaListener(topics = "topicName", groupId = "foo")
public void listenGroupFoo(String message) {
    System.out.println("Received Message in group foo: " + message);
}

支持多监听器监听同一 Topic(需不同 groupId),也支持单监听器监听多 Topic:

@KafkaListener(topics = "topic1, topic2", groupId = "foo")

通过 @Header 获取消息头:

@KafkaListener(topics = "topicName")
public void listenWithHeaders(
  @Payload String message, 
  @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition) {
      System.out.println(
        "Received Message: " + message +
        "from partition: " + partition);
}

6.3. 消费指定分区

对于多分区 Topic,可通过 @TopicPartition 指定分区和初始偏移量:

@KafkaListener(
  topicPartitions = @TopicPartition(topic = "topicName",
  partitionOffsets = {
    @PartitionOffset(partition = "0", initialOffset = "0"), 
    @PartitionOffset(partition = "3", initialOffset = "0")}),
  containerFactory = "partitionsKafkaListenerContainerFactory")
public void listenToPartition(
  @Payload String message, 
  @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition) {
    System.out.println(
      "Received Message: " + message +
      "from partition: " + partition);
}

⚠️ 设置 initialOffset=0 会导致监听器每次初始化时重新消费分区 0 和 3 的历史消息

仅指定分区不设置偏移量:

@KafkaListener(topicPartitions 
  = @TopicPartition(topic = "topicName", partitions = { "0", "1" }))

6.4. 消息过滤

通过 RecordFilterStrategy 配置消息过滤器:

@Bean
public ConcurrentKafkaListenerContainerFactory<String, String>
  filterKafkaListenerContainerFactory() {

    ConcurrentKafkaListenerContainerFactory<String, String> factory =
      new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory());
    factory.setRecordFilterStrategy(
      record -> record.value().contains("World"));
    return factory;
}

监听器使用过滤工厂:

@KafkaListener(
  topics = "topicName", 
  containerFactory = "filterKafkaListenerContainerFactory")
public void listenWithFilter(String message) {
    System.out.println("Received Message in filtered listener: " + message);
}

此监听器会丢弃所有匹配过滤条件的消息

7. 自定义消息转换器

7.1. 发送自定义对象

定义消息对象:

public class Greeting {
    private String msg;
    private String name;
    // 标准构造器/getter/setter
}

配置 ProducerFactory 使用 JsonSerializer

@Bean
public ProducerFactory<String, Greeting> greetingProducerFactory() {
    // ...
    configProps.put(
      ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
      JsonSerializer.class);
    return new DefaultKafkaProducerFactory<>(configProps);
}

@Bean
public KafkaTemplate<String, Greeting> greetingKafkaTemplate() {
    return new KafkaTemplate<>(greetingProducerFactory());
}

发送自定义对象:

kafkaTemplate.send(topicName, new Greeting("Hello", "World"));

7.2. 消费自定义对象

配置消费者使用 JsonDeserializer

@Bean
public ConsumerFactory<String, Greeting> greetingConsumerFactory() {
    // ...
    return new DefaultKafkaConsumerFactory<>(
      props,
      new StringDeserializer(), 
      new JsonDeserializer<>(Greeting.class));
}

@Bean
public ConcurrentKafkaListenerContainerFactory<String, Greeting> 
  greetingKafkaListenerContainerFactory() {

    ConcurrentKafkaListenerContainerFactory<String, Greeting> factory =
      new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(greetingConsumerFactory());
    return factory;
}

添加 Jackson 依赖(spring-kafka 的可选依赖):

<dependency>
    <groupId>com.fasterxml.jackson.core</groupId>
    <artifactId>jackson-databind</artifactId>
    <version>2.18.2</version>
</dependency>

⚠️ 建议使用与 spring-kafka 兼容的 Jackson 版本

监听自定义对象:

@KafkaListener(
  topics = "topicName", 
  containerFactory = "greetingKafkaListenerContainerFactory")
public void greetingListener(Greeting greeting) {
    // 处理 Greeting 消息
}

8. 多方法监听器

8.1. 生产者类型映射

定义新消息类型:

public class Farewell {
    private String message;
    private Integer remainingMinutes;
    // 标准构造器/getter/setter
}

配置 JSON 类型映射:

configProps.put(JsonSerializer.TYPE_MAPPINGS, 
  "greeting:com.baeldung.spring.kafka.Greeting, farewell:com.baeldung.spring.kafka.Farewell");

完整生产者配置:

@Bean
public ProducerFactory<String, Object> multiTypeProducerFactory() {
    Map<String, Object> configProps = new HashMap<>();
    configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
    configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
    configProps.put(JsonSerializer.TYPE_MAPPINGS, 
      "greeting:com.baeldung.spring.kafka.Greeting, farewell:com.baeldung.spring.kafka.Farewell");
    return new DefaultKafkaProducerFactory<>(configProps);
}

@Bean
public KafkaTemplate<String, Object> multiTypeKafkaTemplate() {
    return new KafkaTemplate<>(multiTypeProducerFactory());
}

发送多种类型消息:

multiTypeKafkaTemplate.send(multiTypeTopicName, new Greeting("Greetings", "World!"));
multiTypeKafkaTemplate.send(multiTypeTopicName, new Farewell("Farewell", 25));
multiTypeKafkaTemplate.send(multiTypeTopicName, "Simple string message");

8.2. 消费者消息转换器

配置自定义 MessageConverter

@Bean
public RecordMessageConverter multiTypeConverter() {
    StringJsonMessageConverter converter = new StringJsonMessageConverter();
    DefaultJackson2JavaTypeMapper typeMapper = new DefaultJackson2JavaTypeMapper();
    typeMapper.setTypePrecedence(Jackson2JavaTypeMapper.TypePrecedence.TYPE_ID);
    typeMapper.addTrustedPackages("com.baeldung.spring.kafka");
    Map<String, Class<?>> mappings = new HashMap<>();
    mappings.put("greeting", Greeting.class);
    mappings.put("farewell", Farewell.class);
    typeMapper.setIdClassMapping(mappings);
    converter.setTypeMapper(typeMapper);
    return converter;
}

配置消费者工厂:

@Bean
public ConsumerFactory<String, Object> multiTypeConsumerFactory() {
    HashMap<String, Object> props = new HashMap<>();
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
    return new DefaultKafkaConsumerFactory<>(props);
}

@Bean
public ConcurrentKafkaListenerContainerFactory<String, Object> multiTypeKafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<String, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(multiTypeConsumerFactory());
    factory.setRecordMessageConverter(multiTypeConverter());
    return factory;
}

8.3. 多方法监听器实现

使用 @KafkaHandler 处理不同类型消息:

@Component
@KafkaListener(id = "multiGroup", topics = "multitype")
public class MultiTypeKafkaListener {

    @KafkaHandler
    public void handleGreeting(Greeting greeting) {
        System.out.println("Greeting received: " + greeting);
    }

    @KafkaHandler
    public void handleF(Farewell farewell) {
        System.out.println("Farewell received: " + farewell);
    }

    @KafkaHandler(isDefault = true)
    public void unknown(Object object) {
        System.out.println("Unkown type received: " + object);
    }
}

isDefault=true 标记的方法作为默认处理器,处理未匹配类型的消息

9. 总结

本文介绍了 Spring 对 Apache Kafka 的核心支持,包括:

  • ✅ 监听器容器原理与配置
  • ✅ Topic 程序化创建
  • ✅ 消息生产与消费(含分区控制)
  • ✅ 自定义消息转换器
  • ✅ 多类型消息处理方案

运行前请确保

  1. Kafka 服务已启动
  2. 相关 Topic 已创建(或使用程序化创建)

⚠️ 生产环境建议手动管理 Topic,避免程序化创建带来的潜在风险


原始标题:Intro to Apache Kafka with Spring | Baeldung