1. 概述

在这个教程中,我们将学习如何使用 Kafka 订阅消费者到多个主题。当相同的业务逻辑应用于多个主题时,这是常见的需求。

2. 创建模型类

我们以一个简单的支付系统为例,它有两个Kafka主题:一个用于信用卡支付,另一个用于银行转账。让我们创建模型类:

public class PaymentData {
    private String paymentReference;
    private String type;
    private BigDecimal amount;
    private Currency currency;

    // standard getters and setters
}

3. 使用Kafka消费者API订阅多个主题

首先,我们将讨论使用Kafka消费者API的方法。请添加必要的Maven依赖:

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>3.5.1</version>
</dependency>

同时,我们也需要配置Kafka消费者:

Properties properties = new Properties();
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_CONTAINER.getBootstrapServers());
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
properties.put(ConsumerConfig.GROUP_ID_CONFIG, "payments");
kafkaConsumer = new KafkaConsumer<>(properties);

在消费消息之前,我们需要使用subscribe()方法将kafkaConsumer订阅到两个主题:

kafkaConsumer.subscribe(Arrays.asList("card-payments", "bank-transfers"));

现在我们可以测试我们的配置了。在每个主题上发布一条消息:

void publishMessages() throws Exception {
    ProducerRecord<String, String> cardPayment = new ProducerRecord<>("card-payments", 
      "{\"paymentReference\":\"A184028KM0013790\", \"type\":\"card\", \"amount\":\"275\", \"currency\":\"GBP\"}");
    kafkaProducer.send(cardPayment).get();
    
    ProducerRecord<String, String> bankTransfer = new ProducerRecord<>("bank-transfers",
      "{\"paymentReference\":\"19ae2-18mk73-009\", \"type\":\"bank\", \"amount\":\"150\", \"currency\":\"EUR\"}");
    kafkaProducer.send(bankTransfer).get();
}

最后,我们可以编写集成测试:

@Test
void whenSendingMessagesOnTwoTopics_thenConsumerReceivesMessages() throws Exception {
    publishMessages();
    kafkaConsumer.subscribe(Arrays.asList("card-payments", "bank-transfers"));

    int eventsProcessed = 0;
    for (ConsumerRecord<String, String> record : kafkaConsumer.poll(Duration.ofSeconds(10))) {
        log.info("Event on topic={}, payload={}", record.topic(), record.value());
        eventsProcessed++;
    }
    assertThat(eventsProcessed).isEqualTo(2);
}

4. 使用Spring Kafka订阅多个主题

接下来,我们将讨论使用Spring Kafka的方法。

请在pom.xml中添加spring-kafkajackson-databind依赖:

<dependency> 
    <groupId>org.springframework.kafka</groupId> 
    <artifactId>spring-kafka</artifactId>
    <version>3.1.2</version>
</dependency>
<dependency> 
    <groupId>com.fasterxml.jackson.core</groupId>
    <artifactId>jackson-databind</artifactId>
    <version>2.15.2</version>
</dependency>

还需要定义ConsumerFactoryConcurrentKafkaListenerContainerFactory bean:

@Bean
public ConsumerFactory<String, PaymentData> consumerFactory() {
    List<String, String> config = new HashMap<>();
    config.put(BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
    config.put(VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);

    return new DefaultKafkaConsumerFactory<>(
      config, new StringDeserializer(), new JsonDeserializer<>(PaymentData.class));
}

@Bean
public ConcurrentKafkaListenerContainerFactory<String, PaymentData> containerFactory() {
    ConcurrentKafkaListenerContainerFactory<String, PaymentData> factory =
      new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory());
    return factory;
}

使用@KafkaListener注解的topics属性订阅到两个主题:

@KafkaListener(topics = { "card-payments", "bank-transfers" }, groupId = "payments")

最后,我们可以创建消费者。此外,我们还会包含Kafka头信息,以便识别消息接收的主题:

@KafkaListener(topics = { "card-payments", "bank-transfers" }, groupId = "payments")
public void handlePaymentEvents(
  PaymentData paymentData, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {
    log.info("Event on topic={}, payload={}", topic, paymentData);
}

验证我们的配置

@Test
public void whenSendingMessagesOnTwoTopics_thenConsumerReceivesMessages() throws Exception {
    CountDownLatch countDownLatch = new CountDownLatch(2);
    doAnswer(invocation -> {
        countDownLatch.countDown();
        return null;
    }).when(paymentsConsumer)
      .handlePaymentEvents(any(), any());

    kafkaTemplate.send("card-payments", createCardPayment());
    kafkaTemplate.send("bank-transfers", createBankTransfer());

    assertThat(countDownLatch.await(5, TimeUnit.SECONDS)).isTrue();
}

5. 使用Kafka CLI订阅多个主题

我们将讨论的最后一个方法是使用Kafka CLI。

首先,我们在每个主题上发送一条消息:

$ bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic card-payments
>{"paymentReference":"A184028KM0013790", "type":"card", "amount":"275", "currency":"GBP"}

$ bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic bank-transfers
>{"paymentReference":"19ae2-18mk73-009", "type":"bank", "amount":"150", "currency":"EUR"}

现在,我们可以启动Kafka CLI消费者。include选项允许我们指定消息消费的主题列表:

$ bin/kafka-console-consumer.sh --from-beginning --bootstrap-server localhost:9092 --include "card-payments|bank-transfers"

运行该命令后,输出如下:

{"paymentReference":"A184028KM0013790", "type":"card", "amount":"275", "currency":"GBP"}
{"paymentReference":"19ae2-18mk73-009", "type":"bank", "amount":"150", "currency":"EUR"}

6. 总结

在这篇文章中,我们学习了三种不同的方法来订阅Kafka消费者到多个主题。这对于为多个主题实现相同功能非常有用。

前两种方法基于Kafka消费者API和Spring Kafka,可以与现有应用集成。最后一种使用Kafka CLI,可以快速验证多个主题。

如往常一样,完整的代码可以在GitHub上找到。