1. 概述

Apache Kafka 是一个开源的分布式流处理系统,它具有容错能力和高吞吐量。Kafka 实质上是一个实现发布者-订阅者模型的消息系统。 它的 messaging、存储和流处理能力使我们能够大规模地存储和分析实时数据流。

在这个教程中,我们将首先了解在 Kafka 消息中键的重要性。然后,我们将学习如何将带有键的消息发布到 Kafka 主题。

2. Kafka 消息中键的重要作用

如我们所知,Kafka 有效地按我们生成记录的顺序存储一系列记录。

当我们向 Kafka 主题发布消息时,它们会轮询方式均匀分布到可用分区中。 因此,在 Kafka 主题内,分区内的消息顺序是有保证的,但跨分区则不保证。

当我们带有键的消息发布到 Kafka 主题时,所有具有相同键的消息都将被 Kafka 保证存储在同一分区中。因此,如果希望维护具有相同键的消息顺序,Kafka 消息中的键就很有用。

总结来说,发送消息到 Kafka 时,键并非强制要求。基本上,如果我们希望严格保持具有相同键的消息顺序,那么在消息中使用键是必要的。对于所有其他情况,拥有空键将更好地将消息分布在分区之间。

接下来,让我们立即深入到一些带有 Kafka 消息键的实际实现代码。

3. 设置

在开始之前,让我们先初始化一个 Kafka 集群,设置依赖项,并与 Kafka 集群建立连接。

Kafka 的 Java 库提供了易于使用的生产者和消费者 API,我们可以使用这些 API 来从 Kafka 发布和消费消息。

3.1. 依赖项

首先,我们需要在项目 的 pom.xml 文件中添加 Kafka 客户端 Java 库的 Maven 依赖:

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>3.4.0</version>
</dependency>

3.2. 集群和主题初始化

其次,我们需要一个运行中的 Kafka 集群,以便我们可以连接并执行各种 Kafka 操作。本指南假设我们的本地系统上有一个默认配置的 Kafka 集群正在运行。

最后,我们将创建一个可以用来发布和消费消息的多分区 Kafka 主题。参考我们的 创建 Kafka 主题指南,创建一个名为“baeldung”的主题:

Properties adminProperties = new Properties();
adminProperties.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");

Admin admin = Admin.create(adminProperties);

这里,我们创建了一个带有基本配置的 Kafka Admin 实例,由 Properties 实例定义。接下来,我们将使用这个 Admin 实例创建一个名为“baeldung”的具有五个分区的主题:

admin.createTopics(Collections.singleton(new NewTopic("baeldung", 5, (short) 1)));

现在我们已经设置了带有主题的 Kafka 集群,让我们开始发布带有键的消息。

4. 带有键的消息发布

为了演示我们的代码示例,我们首先创建一个 KafkaProducer 实例,其中包含由 Properties 实例定义的基本生产者属性。接下来,我们将使用创建的 KafkaProducer 实例发布带有键的消息,并验证主题分区。

让我们详细了解一下这些步骤。

4.1. 初始化生产者

首先,我们创建一个新的 Properties 实例,其中包含连接到本地代理的生产者的属性:

Properties producerProperties = new Properties();
producerProperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
producerProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
producerProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

然后,使用创建的生产者属性的 Properties 实例创建一个 KafkaProducer 实例:

KafkaProducer <String, String> producer = new KafkaProducer<>(producerProperties);

KafkaProducer 类的构造函数接受一个 Properties 对象(或 Map)并返回一个 KafkaProducer 实例。

4.2. 发布消息

Kafka 生产者 API 提供了多个构造函数来创建一个带有键的 ProducerRecord 实例。我们使用 ProducerRecord<K, V>(String topic, K key, V value) 构造函数来创建带有键的消息:

ProducerRecord<String, String> record = new ProducerRecord<>("baeldung", "message-key", "Hello World");

这里,我们为“baeldung”主题创建了一个带有键的 ProducerRecord 实例。

现在,让我们向 Kafka 主题发布一些消息并验证分区:

for (int i = 1; i <= 10; i++) {
    ProducerRecord<String, String> record = new ProducerRecord<>("baeldung", "message-key", String.valueOf(i));
    Future<RecordMetadata> future = producer.send(record);
    RecordMetadata metadata = future.get();

    logger.info(String.valueOf(metadata.partition()));
}

我们使用 KafkaProducer.send(ProducerRecord<String, String> record) 方法将消息发布到 Kafka。该方法返回一个 Future 类型的实例 RecordMetadata。然后,我们使用阻塞调用 Future.get() 方法,当消息发布时,它将返回一个 RecordMetadata 实例。

接下来,我们使用 RecordMetadata.partition() 方法并获取消息的分区。

上述代码片段的输出结果如下:

1
1
1
1
1
1
1
1
1
1

通过这种方式,我们验证了带有相同键的消息被发布到了同一个分区。

5. 总结

在这篇文章中,我们了解了 Kafka 消息中键的重要性。

我们首先了解了如何将带有键的消息发布到主题。然后,我们讨论了如何验证具有相同键的消息被发布到同一分区。

如往常一样,所有示例的完整代码都在 GitHub 上可获取。