1. 引言

Apache Kafka 是一个分布式流处理平台,它允许我们发布和订阅记录流,通常称为消息。此外,Kafka 头部提供了一种附加元数据到 Kafka 消息的方式,增强了消息处理的上下文和灵活性。

在这个教程中,我们将深入探讨常用的 Kafka 头部,并学习如何使用 Java 查看和提取它们。

2. Kafka 头部概述

Kafka 头部代表附在 Kafka 消息上的键值对,提供了一种在主要消息内容旁边包含补充元数据的方法。

例如,Kafka 头部通过提供用于将消息导向特定处理管道或消费者的的数据,促进了消息路由。此外,头部非常适合承载自定义应用程序元数据,以适应应用程序的处理逻辑。

3. Kafka 默认头部

Kafka 生产者会在发送的消息中自动包含几个默认头部。 这些头部提供了关于消息的关键元数据和上下文。在本节中,我们将深入了解一些常用头部及其在 Kafka 消息处理领域的重要性。

3.1. 生产者头部

当在 Kafka 中创建消息时,生产者会自动包含一些默认头部,如:

  • KafkaHeaders.TOPIC – 这个头部包含消息所属的主题名称。
  • KafkaHeaders.KEY – 如果消息带有键,Kafka 会自动添加一个名为“key”的头部,包含序列化的键字节。
  • KafkaHeaders.PARTITION – Kafka 会添加一个名为“partition”的头部,表示消息属于的分区 ID。
  • KafkaHeaders.TIMESTAMP – Kafka 会给每个消息添加一个名为“timestamp”的头部,表示消息由生产者产生的时间戳。

3.2. 消费者头部

带有 RECEIVED_ 前缀的头部由 Kafka 消费者在接收消息时添加,提供关于消息接收过程的元数据:

  • KafkaHeaders.RECEIVED_TOPIC – 这个头部包含消息接收的主题名称。
  • KafkaHeaders.RECEIVED_KEY – 这个头部使消费者能够访问与消息关联的键。
  • KafkaHeaders.RECEIVED_PARTITION – Kafka 会添加这个头部,指示消息被分配到的分区 ID。
  • KafkaHeaders.RECEIVED_TIMESTAMP – 这个头部反映了消费者接收消息的时间。
  • KafkaHeaders.OFFSET – 位置指示消息在分区日志中的位置。

4. 带有头部消费消息

首先,我们实例化一个 KafkaConsumer 对象。KafkaConsumer 负责订阅 Kafka 主题并从中获取消息。在实例化 KafkaConsumer 后,我们订阅我们想要消费消息的主题。通过订阅主题,消费者可以接收该主题上发布的消息。

一旦消费者订阅了主题,我们继续从 Kafka 中获取记录。在这个过程中,KafkaConsumer 从订阅的主题中检索消息,包括它们相关的头部。

以下代码示例演示如何带有头部消费消息:

@KafkaListener(topics = "my-topic")
public void listen(String message, @Headers Map<String, Object> headers) {
    System.out.println("Received message: " + message);
    System.out.println("Headers:");
    headers.forEach((key, value) -> System.out.println(key + ": " + value));
}

当从指定主题(如“my-topic”)接收到消息时,Kafka 监听容器会调用 listen() 方法。**@Headers 注解表示参数应填充接收到消息的头部。**

以下是示例输出:

Received message: Hello Baeldung!
Headers:
kafka_receivedMessageKey: null
kafka_receivedPartitionId: 0
kafka_receivedTopic: my-topic
kafka_offset: 123
... // other headers

要访问特定头部,我们可以使用头部映射的 get() 方法,提供所需的头部键。以下是一个访问主题名称的例子:

String topicName = headers.get(KafkaHeaders.TOPIC);

topicName 应返回 my-topic

此外,当我们消费消息时,如果我们已经知道需要的头部,可以直接作为方法参数提取头部。 这种方法提供了更简洁、针对性更强的方式来访问特定头部值,而无需遍历所有头部。

以下代码示例演示如何带有头部消费消息,直接提取特定头部作为方法参数:

@KafkaListener(topics = "my-topic")
public void listen(String message, @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition) {
    System.out.println("Received message: " + message);
    System.out.println("Partition: " + partition);
}

listen() 方法中,我们直接使用 @Header 注解提取 RECEIVED_PARTITION 头部。这个注解允许我们指定要提取的头部及其对应的类型。将头部值直接注入方法参数(在这种情况下,partition)使我们可以在方法体内部直接访问。

以下是输出:

Received message: Hello Baeldung!
Partition: 0

5. 总结

在这篇文章中,我们探讨了 Apache Kafka 中消息处理中的 Kafka 头部的重要性。我们详细了解了生产者和消费者自动包含的默认头部。此外,我们还学习了如何查看和操作这些头部。

如往常一样,示例代码可以在 GitHub 上找到。