1. 概述

在这篇文章中,我们将通过一个实际例子学习如何为同一个Kafka主题配置多个监听器。

如果你是第一次在Spring中配置Kafka,可以从我们的Apache Kafka与Spring的入门教程开始。

2. 项目设置

让我们构建一个图书消费者服务,它监听图书馆中新出现的图书,并为全文搜索、价格索引或用户通知等不同目的消费这些图书。

首先,创建一个Spring Boot服务并使用spring-kafka依赖:

<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
</dependency>

此外,定义听众将消费的BookEvent对象:

public class BookEvent {

    private String title;
    private String description;
    private Double price;

    //  standard constructors, getters and setters
}

3. 生产消息

Apache Kafka生产者对生态系统至关重要,因为它们向Kafka集群写入消息。考虑到这一点,首先需要定义一个生产者,它将在后续被消费者应用消费的主题上写入消息。

按照我们的示例,编写一个简单的Kafka生产者函数,将新的BookEvent对象写入“books”主题。

private static final String TOPIC = "books";

@Autowired
private KafkaTemplate<String, BookEvent> bookEventKafkaTemplate;

public void sentBookEvent(BookEvent book){
    bookEventKafkaTemplate.send(TOPIC, UUID.randomUUID().toString(), book);
}

4. 多个监听器消费同一Kafka主题

Kafka消费者是订阅Kafka集群一个或多个主题的客户端应用程序。接下来,我们将探讨如何在同一主题上设置多个监听器。

4.1. 消费者配置

首先,为了配置一个消费者,我们需要定义听众所需的ConcurrentKafkaListenerContainerFactory Bean。

现在,我们来定义将用于消费BookEvent对象的容器工厂:

@EnableKafka
@Configuration
public class KafkaConsumerConfig {

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, BookEvent> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, BookEvent> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        return factory;
    }

    public ConsumerFactory<String, BookEvent> consumerFactory(String groupId) {
        Map<String, Object> props = new HashMap<>();
        
        // required consumer factory properties

        return new DefaultKafkaConsumerFactory<>(props);
    }
}

接下来,我们将研究接收传入消息的不同策略。

4.2. 同一消费者组内的多个监听器

在一个消费者组内添加多个监听器的一种策略是在同一组内增加并发级别。因此,我们可以在@KafkaListener注解中直接指定这个值。

为了理解这是如何工作的,让我们为我们的图书馆定义一个通知监听器:

@KafkaListener(topics = "books", groupId = "book-notification-consumer", concurrency = "2")
public void bookNotificationConsumer(BookEvent event) {
    logger.info("Books event received for notification => {}", event);
}

接下来,发布三条消息后查看控制台输出。此外,让我们理解为什么这些消息只被消费一次:

Books event received for notification => BookEvent(title=book 1, description=description 1, price=1.0)
Books event received for notification => BookEvent(title=book 2, description=description 2, price=2.0)
Books event received for notification => BookEvent(title=book 3, description=description 3, price=3.0)

这是因为,内部地,对于每个并发级别,Kafka会在同一消费者组内实例化一个新的监听器。而且,同一组内所有监听器实例的范围是将消息分发给彼此,以更快地完成工作并提高吞吐量。

4.3. 不同消费者组的多个监听器

如果我们需要对同一消息进行多次消费,并为每个监听器应用不同的处理逻辑,我们需要将@KafkaListenergroupId属性设置为唯一。这样,Kafka会为每个监听器创建专用的消费者组,并将发布的所有消息推送给每个监听器。

让我们在实际操作中看到这种方法,定义一个负责全文搜索索引的监听器和一个负责价格索引的监听器。两者都监听同一个“books”主题:

@KafkaListener(topics = "books", groupId = "books-content-search")
public void bookContentSearchConsumer(BookEvent event) {
    logger.info("Books event received for full-text search indexing => {}", event);
}

@KafkaListener(topics = "books", groupId = "books-price-index")
public void bookPriceIndexerConsumer(BookEvent event) {
    logger.info("Books event received for price indexing => {}", event);
}

现在运行上面的代码并分析输出:

Books event received for price indexing => BookEvent(title=book 1, description=description 1, price=1.0)
Books event received for full-text search indexing => BookEvent(title=book 1, description=description 1, price=1.0)
Books event received for full-text search indexing => BookEvent(title=book 2, description=description 2, price=2.0)
Books event received for price indexing => BookEvent(title=book 2, description=description 2, price=2.0)
Books event received for full-text search indexing => BookEvent(title=book 3, description=description 3, price=3.0)
Books event received for price indexing => BookEvent(title=book 3, description=description 3, price=3.0)

正如我们所见,两个监听器都会接收每个BookEvent并为所有接收到的消息应用独立的处理逻辑。

5. 何时使用不同的监听器策略

如前所学,我们可以通过在@KafkaListener注解的concurrency属性中设置大于1的值,或者定义多个监听@KafkaListener方法,这些方法监听同一Kafka主题并分配不同的消费者ID,来设置多个监听器。

选择哪种策略取决于我们的目标。只要我们关注性能,通过更快地处理消息来提高吞吐量,正确的策略是在同一消费者组内增加监听器的数量。

然而,为了满足不同的需求处理同一消息多次,我们应该定义具有独特消费者组的专用监听器,它们监听相同的主题。

一般来说,我们应该为每个需要满足的需求使用一个消费者组,如果需要让那个监听器更快,可以在同一消费者组内增加监听器的数量。

6. 总结

在这篇文章中,我们学习了如何使用Spring Kafka库为同一主题配置多个监听器,通过一个图书图书馆的实际例子。我们从生产者和消费者配置开始,然后继续探讨在同一主题上添加多个监听器的不同方法。

如往常一样,示例的完整源代码可以在GitHub上找到。