1. 引言
Apache Kafka 是一个开源分布式事件存储和容错流处理系统。Kafka本质上是一个事件流平台,客户端可以在其中发布和订阅事件流。通常,生产者应用程序将事件发布到Kafka,而消费者订阅这些事件,从而实现发布者-订阅者模型。
在这个教程中,我们将学习如何使用Kafka生产者在Kafka消息中添加自定义头。
2. 设置
Kafka提供了一个易于使用的Java库,我们可以用它来创建Kafka生产者客户端(Producer)和消费者客户端(Consumers)。
2.1. 依赖项
首先,让我们在项目的pom.xml
文件中添加Kafka客户端Java库的Maven依赖项:
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>3.4.0</version>
</dependency>
2.2. 连接初始化
本指南假设我们在本地系统上运行了一个Kafka集群。此外,我们需要创建一个主题,并与Kafka集群建立连接。
首先,我们通过参考我们的Kafka主题创建指南,创建一个名为“baeldung”的主题。
其次,让我们创建一个新的Properties
实例,用于配置生产者连接到本地代理的最低必要配置:
Properties producerProperties = new Properties();
producerProperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
producerProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
producerProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
最后,让我们创建一个KafkaProducer
实例,我们将使用它来发布消息:
KafkaProducer <String, String> producer = new KafkaProducer<>(producerProperties);
KafkaProducer
类的构造函数接受一个包含*bootstrap.servers*
属性的Properties
对象(或Map
),并返回一个KafkaProducer
实例。
类似地,让我们创建一个KafkaConsumer
实例,我们将使用它来消费消息:
Properties consumerProperties = new Properties();
consumerProperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_CONTAINER.getBootstrapServers());
consumerProperties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
consumerProperties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
consumerProperties.put(ConsumerConfig.GROUP_ID_CONFIG, "ConsumerGroup1");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerProperties);
我们将使用这些生产者和消费者实例来演示所有的编程示例。
现在,我们已经配置了所有必要的依赖项和连接,我们可以编写一个简单的应用程序,向Kafka消息中添加自定义头。
3. 使用自定义头发布消息
Kafka消息中的自定义头支持是在Kafka版本0.11.0.0中添加的。要创建一个Kafka消息(Record),我们创建一个ProducerRecord<K, V>
实例。ProducerRecord
基本上标识了要发布的消息值和主题,以及其他元数据。
ProducerRecord
类提供了各种构造函数,用于向Kafka消息添加自定义头。让我们看看可以使用的几个构造函数:
-
ProducerRecord<String, Integer, K, V> topic, Integer partition, K key, V value, Iterable<Header> headers)
-
ProducerRecord<String, Integer, Long, K, V> topic, Integer partition, Long timestamp, K key, V value, Iterable<Header> headers)
两个ProducerRecord
类的构造函数都接受自定义头的形式为Iterable<Header>
类型。
为了理解,让我们创建一个ProducerRecord
,它将一个带有自定义头的消息发布到“baeldung”主题:
List <Header> headers = new ArrayList<>();
headers.add(new RecordHeader("website", "baeldung.com".getBytes()));
ProducerRecord <String, String> record = new ProducerRecord <>("baeldung", null, "message", "Hello World", headers);
producer.send(record);
在这里,我们正在创建一个Header
类型的List
,以作为构造函数的头传递。每个头表示一个RecordHeader(String key, byte[] value)
实例,它接受一个头键作为String
和一个头值作为byte
数组。
同样,我们可以使用第二个构造函数,它还接受记录发布的时戳:
List <Header> headers = new ArrayList<>();
headers.add(new RecordHeader("website", "baeldung.com".getBytes()));
ProducerRecord <String, String> record = new ProducerRecord <>("baeldung", null, System.currentTimeMillis(), "message", "Hello World", headers);
producer.send(record);
到目前为止,我们已经创建了一个带有自定义头的消息,并将其发布到Kafka。
接下来,让我们实现消费者代码,以消费消息并验证其自定义头。
4. 使用自定义头消费消息
首先,我们将消费者实例订阅到Kafka主题“baeldung”,以便从其中消费消息:
consumer.subscribe(Arrays.asList("baeldung"));
其次,我们使用轮询机制从Kafka中获取新消息:
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMinutes(1));
KafkaConsumer.poll(Duration duration)
方法根据Duration
参数指定的时间,在Kafka主题中轮询新消息。该方法返回一个包含获取的消息的ConsumerRecords
实例。ConsumerRecords
基本上是ConsumerRecord
类型的可迭代实例。
最后,我们遍历获取的记录,并获取每条消息的自定义头:
for (ConsumerRecord<String, String> record : records) {
System.out.println(record.key());
System.out.println(record.value());
Headers consumedHeaders = record.headers();
for (Header header : consumedHeaders) {
System.out.println(header.key());
System.out.println(new String(header.value()));
}
}
这里,我们使用ConsumerRecord
类的各种getter方法来获取消息键、值和自定义头。ConsumerRecord.headers()
方法返回一个包含自定义头的Headers
实例。Headers
基本上是Header
类型的可迭代实例。然后,我们遍历每个Header
实例,并使用Header.key()
和Header.value()
方法分别获取头键和值。
5. 总结
在这篇文章中,我们学习了如何在Kafka消息中添加自定义头。我们查看了接受自定义头的不同构造函数及其相应的实现。
然后,我们看到了如何消费带有自定义头的消息并验证它们。
一如既往,所有示例的完整代码可在GitHub上找到。