1. 概述
在这个教程中,我们将学习如何使用 Kafka 订阅消费者到多个主题。当相同的业务逻辑应用于多个主题时,这是常见的需求。
2. 创建模型类
我们以一个简单的支付系统为例,它有两个Kafka主题:一个用于信用卡支付,另一个用于银行转账。让我们创建模型类:
public class PaymentData {
private String paymentReference;
private String type;
private BigDecimal amount;
private Currency currency;
// standard getters and setters
}
3. 使用Kafka消费者API订阅多个主题
首先,我们将讨论使用Kafka消费者API的方法。请添加必要的Maven依赖:
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>3.5.1</version>
</dependency>
同时,我们也需要配置Kafka消费者:
Properties properties = new Properties();
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_CONTAINER.getBootstrapServers());
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
properties.put(ConsumerConfig.GROUP_ID_CONFIG, "payments");
kafkaConsumer = new KafkaConsumer<>(properties);
在消费消息之前,我们需要使用subscribe()
方法将kafkaConsumer
订阅到两个主题:
kafkaConsumer.subscribe(Arrays.asList("card-payments", "bank-transfers"));
现在我们可以测试我们的配置了。在每个主题上发布一条消息:
void publishMessages() throws Exception {
ProducerRecord<String, String> cardPayment = new ProducerRecord<>("card-payments",
"{\"paymentReference\":\"A184028KM0013790\", \"type\":\"card\", \"amount\":\"275\", \"currency\":\"GBP\"}");
kafkaProducer.send(cardPayment).get();
ProducerRecord<String, String> bankTransfer = new ProducerRecord<>("bank-transfers",
"{\"paymentReference\":\"19ae2-18mk73-009\", \"type\":\"bank\", \"amount\":\"150\", \"currency\":\"EUR\"}");
kafkaProducer.send(bankTransfer).get();
}
最后,我们可以编写集成测试:
@Test
void whenSendingMessagesOnTwoTopics_thenConsumerReceivesMessages() throws Exception {
publishMessages();
kafkaConsumer.subscribe(Arrays.asList("card-payments", "bank-transfers"));
int eventsProcessed = 0;
for (ConsumerRecord<String, String> record : kafkaConsumer.poll(Duration.ofSeconds(10))) {
log.info("Event on topic={}, payload={}", record.topic(), record.value());
eventsProcessed++;
}
assertThat(eventsProcessed).isEqualTo(2);
}
4. 使用Spring Kafka订阅多个主题
接下来,我们将讨论使用Spring Kafka的方法。
请在pom.xml
中添加spring-kafka
和jackson-databind
依赖:
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>3.1.2</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.15.2</version>
</dependency>
还需要定义ConsumerFactory
和ConcurrentKafkaListenerContainerFactory
bean:
@Bean
public ConsumerFactory<String, PaymentData> consumerFactory() {
List<String, String> config = new HashMap<>();
config.put(BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
config.put(VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
return new DefaultKafkaConsumerFactory<>(
config, new StringDeserializer(), new JsonDeserializer<>(PaymentData.class));
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, PaymentData> containerFactory() {
ConcurrentKafkaListenerContainerFactory<String, PaymentData> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
使用@KafkaListener
注解的topics
属性订阅到两个主题:
@KafkaListener(topics = { "card-payments", "bank-transfers" }, groupId = "payments")
最后,我们可以创建消费者。此外,我们还会包含Kafka头信息,以便识别消息接收的主题:
@KafkaListener(topics = { "card-payments", "bank-transfers" }, groupId = "payments")
public void handlePaymentEvents(
PaymentData paymentData, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {
log.info("Event on topic={}, payload={}", topic, paymentData);
}
请验证我们的配置:
@Test
public void whenSendingMessagesOnTwoTopics_thenConsumerReceivesMessages() throws Exception {
CountDownLatch countDownLatch = new CountDownLatch(2);
doAnswer(invocation -> {
countDownLatch.countDown();
return null;
}).when(paymentsConsumer)
.handlePaymentEvents(any(), any());
kafkaTemplate.send("card-payments", createCardPayment());
kafkaTemplate.send("bank-transfers", createBankTransfer());
assertThat(countDownLatch.await(5, TimeUnit.SECONDS)).isTrue();
}
5. 使用Kafka CLI订阅多个主题
我们将讨论的最后一个方法是使用Kafka CLI。
首先,我们在每个主题上发送一条消息:
$ bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic card-payments
>{"paymentReference":"A184028KM0013790", "type":"card", "amount":"275", "currency":"GBP"}
$ bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic bank-transfers
>{"paymentReference":"19ae2-18mk73-009", "type":"bank", "amount":"150", "currency":"EUR"}
现在,我们可以启动Kafka CLI消费者。include
选项允许我们指定消息消费的主题列表:
$ bin/kafka-console-consumer.sh --from-beginning --bootstrap-server localhost:9092 --include "card-payments|bank-transfers"
运行该命令后,输出如下:
{"paymentReference":"A184028KM0013790", "type":"card", "amount":"275", "currency":"GBP"}
{"paymentReference":"19ae2-18mk73-009", "type":"bank", "amount":"150", "currency":"EUR"}
6. 总结
在这篇文章中,我们学习了三种不同的方法来订阅Kafka消费者到多个主题。这对于为多个主题实现相同功能非常有用。
前两种方法基于Kafka消费者API和Spring Kafka,可以与现有应用集成。最后一种使用Kafka CLI,可以快速验证多个主题。
如往常一样,完整的代码可以在GitHub上找到。