1. 引言

Apache Kafka 是一个开源分布式事件存储和容错流处理系统。Kafka本质上是一个事件流平台,客户端可以在其中发布和订阅事件流。通常,生产者应用程序将事件发布到Kafka,而消费者订阅这些事件,从而实现发布者-订阅者模型。

在这个教程中,我们将学习如何使用Kafka生产者在Kafka消息中添加自定义头。

2. 设置

Kafka提供了一个易于使用的Java库,我们可以用它来创建Kafka生产者客户端(Producer)和消费者客户端(Consumers)。

2.1. 依赖项

首先,让我们在项目的pom.xml文件中添加Kafka客户端Java库的Maven依赖项

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>3.4.0</version>
</dependency>

2.2. 连接初始化

本指南假设我们在本地系统上运行了一个Kafka集群。此外,我们需要创建一个主题,并与Kafka集群建立连接。

首先,我们通过参考我们的Kafka主题创建指南,创建一个名为“baeldung”的主题。

其次,让我们创建一个新的Properties实例,用于配置生产者连接到本地代理的最低必要配置:

Properties producerProperties = new Properties();
producerProperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
producerProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
producerProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

最后,让我们创建一个KafkaProducer实例,我们将使用它来发布消息:

KafkaProducer <String, String> producer = new KafkaProducer<>(producerProperties);

KafkaProducer类的构造函数接受一个包含*bootstrap.servers*属性的Properties对象(或Map),并返回一个KafkaProducer实例。

类似地,让我们创建一个KafkaConsumer实例,我们将使用它来消费消息:

Properties consumerProperties = new Properties();
consumerProperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_CONTAINER.getBootstrapServers());
consumerProperties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
consumerProperties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
consumerProperties.put(ConsumerConfig.GROUP_ID_CONFIG, "ConsumerGroup1");

KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerProperties);

我们将使用这些生产者和消费者实例来演示所有的编程示例。

现在,我们已经配置了所有必要的依赖项和连接,我们可以编写一个简单的应用程序,向Kafka消息中添加自定义头。

3. 使用自定义头发布消息

Kafka消息中的自定义头支持是在Kafka版本0.11.0.0中添加的。要创建一个Kafka消息(Record),我们创建一个ProducerRecord<K, V>实例。ProducerRecord基本上标识了要发布的消息值和主题,以及其他元数据。

ProducerRecord类提供了各种构造函数,用于向Kafka消息添加自定义头。让我们看看可以使用的几个构造函数:

  • ProducerRecord<String, Integer, K, V> topic, Integer partition, K key, V value, Iterable<Header> headers)
  • ProducerRecord<String, Integer, Long, K, V> topic, Integer partition, Long timestamp, K key, V value, Iterable<Header> headers)

两个ProducerRecord类的构造函数都接受自定义头的形式为Iterable<Header>类型。

为了理解,让我们创建一个ProducerRecord,它将一个带有自定义头的消息发布到“baeldung”主题:

List <Header> headers = new ArrayList<>();
headers.add(new RecordHeader("website", "baeldung.com".getBytes()));
ProducerRecord <String, String> record = new ProducerRecord <>("baeldung", null, "message", "Hello World", headers);

producer.send(record);

在这里,我们正在创建一个Header类型的List,以作为构造函数的头传递。每个头表示一个RecordHeader(String key, byte[] value)实例,它接受一个头键作为String和一个头值作为byte数组。

同样,我们可以使用第二个构造函数,它还接受记录发布的时戳:

List <Header> headers = new ArrayList<>();
headers.add(new RecordHeader("website", "baeldung.com".getBytes()));
ProducerRecord <String, String> record = new ProducerRecord <>("baeldung", null, System.currentTimeMillis(), "message", "Hello World", headers);

producer.send(record);

到目前为止,我们已经创建了一个带有自定义头的消息,并将其发布到Kafka。

接下来,让我们实现消费者代码,以消费消息并验证其自定义头。

4. 使用自定义头消费消息

首先,我们将消费者实例订阅到Kafka主题“baeldung”,以便从其中消费消息:

consumer.subscribe(Arrays.asList("baeldung"));

其次,我们使用轮询机制从Kafka中获取新消息:

ConsumerRecords<String, String> records = consumer.poll(Duration.ofMinutes(1));

KafkaConsumer.poll(Duration duration)方法根据Duration参数指定的时间,在Kafka主题中轮询新消息。该方法返回一个包含获取的消息的ConsumerRecords实例。ConsumerRecords基本上是ConsumerRecord类型的可迭代实例。

最后,我们遍历获取的记录,并获取每条消息的自定义头:

for (ConsumerRecord<String, String> record : records) {
    System.out.println(record.key());
    System.out.println(record.value());

    Headers consumedHeaders = record.headers();
    for (Header header : consumedHeaders) {
        System.out.println(header.key());
        System.out.println(new String(header.value()));
    }
}

这里,我们使用ConsumerRecord类的各种getter方法来获取消息键、值和自定义头。ConsumerRecord.headers()方法返回一个包含自定义头的Headers实例。Headers基本上是Header类型的可迭代实例。然后,我们遍历每个Header实例,并使用Header.key()Header.value()方法分别获取头键和值。

5. 总结

在这篇文章中,我们学习了如何在Kafka消息中添加自定义头。我们查看了接受自定义头的不同构造函数及其相应的实现。

然后,我们看到了如何消费带有自定义头的消息并验证它们。

一如既往,所有示例的完整代码可在GitHub上找到。