1. 概述
Apache Kafka 是一个开源的分布式流处理系统,它具有容错能力和高吞吐量。Kafka 实质上是一个实现发布者-订阅者模型的消息系统。 它的 messaging、存储和流处理能力使我们能够大规模地存储和分析实时数据流。
在这个教程中,我们将首先了解在 Kafka 消息中键的重要性。然后,我们将学习如何将带有键的消息发布到 Kafka 主题。
2. Kafka 消息中键的重要作用
如我们所知,Kafka 有效地按我们生成记录的顺序存储一系列记录。
当我们向 Kafka 主题发布消息时,它们会轮询方式均匀分布到可用分区中。 因此,在 Kafka 主题内,分区内的消息顺序是有保证的,但跨分区则不保证。
当我们带有键的消息发布到 Kafka 主题时,所有具有相同键的消息都将被 Kafka 保证存储在同一分区中。因此,如果希望维护具有相同键的消息顺序,Kafka 消息中的键就很有用。
总结来说,发送消息到 Kafka 时,键并非强制要求。基本上,如果我们希望严格保持具有相同键的消息顺序,那么在消息中使用键是必要的。对于所有其他情况,拥有空键将更好地将消息分布在分区之间。
接下来,让我们立即深入到一些带有 Kafka 消息键的实际实现代码。
3. 设置
在开始之前,让我们先初始化一个 Kafka 集群,设置依赖项,并与 Kafka 集群建立连接。
Kafka 的 Java 库提供了易于使用的生产者和消费者 API,我们可以使用这些 API 来从 Kafka 发布和消费消息。
3.1. 依赖项
首先,我们需要在项目 的 pom.xml
文件中添加 Kafka 客户端 Java 库的 Maven 依赖:
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>3.4.0</version>
</dependency>
3.2. 集群和主题初始化
其次,我们需要一个运行中的 Kafka 集群,以便我们可以连接并执行各种 Kafka 操作。本指南假设我们的本地系统上有一个默认配置的 Kafka 集群正在运行。
最后,我们将创建一个可以用来发布和消费消息的多分区 Kafka 主题。参考我们的 创建 Kafka 主题指南,创建一个名为“baeldung”的主题:
Properties adminProperties = new Properties();
adminProperties.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
Admin admin = Admin.create(adminProperties);
这里,我们创建了一个带有基本配置的 Kafka Admin 实例,由 Properties 实例定义。接下来,我们将使用这个 Admin 实例创建一个名为“baeldung”的具有五个分区的主题:
admin.createTopics(Collections.singleton(new NewTopic("baeldung", 5, (short) 1)));
现在我们已经设置了带有主题的 Kafka 集群,让我们开始发布带有键的消息。
4. 带有键的消息发布
为了演示我们的代码示例,我们首先创建一个 KafkaProducer 实例,其中包含由 Properties 实例定义的基本生产者属性。接下来,我们将使用创建的 KafkaProducer 实例发布带有键的消息,并验证主题分区。
让我们详细了解一下这些步骤。
4.1. 初始化生产者
首先,我们创建一个新的 Properties 实例,其中包含连接到本地代理的生产者的属性:
Properties producerProperties = new Properties();
producerProperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
producerProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
producerProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
然后,使用创建的生产者属性的 Properties 实例创建一个 KafkaProducer 实例:
KafkaProducer <String, String> producer = new KafkaProducer<>(producerProperties);
KafkaProducer 类的构造函数接受一个 Properties 对象(或 Map)并返回一个 KafkaProducer 实例。
4.2. 发布消息
Kafka 生产者 API 提供了多个构造函数来创建一个带有键的 ProducerRecord 实例。我们使用 ProducerRecord<K, V>(String topic, K key, V value) 构造函数来创建带有键的消息:
ProducerRecord<String, String> record = new ProducerRecord<>("baeldung", "message-key", "Hello World");
这里,我们为“baeldung”主题创建了一个带有键的 ProducerRecord 实例。
现在,让我们向 Kafka 主题发布一些消息并验证分区:
for (int i = 1; i <= 10; i++) {
ProducerRecord<String, String> record = new ProducerRecord<>("baeldung", "message-key", String.valueOf(i));
Future<RecordMetadata> future = producer.send(record);
RecordMetadata metadata = future.get();
logger.info(String.valueOf(metadata.partition()));
}
我们使用 KafkaProducer.send(ProducerRecord<String, String> record) 方法将消息发布到 Kafka。该方法返回一个 Future 类型的实例 RecordMetadata。然后,我们使用阻塞调用 Future
接下来,我们使用 RecordMetadata.partition() 方法并获取消息的分区。
上述代码片段的输出结果如下:
1
1
1
1
1
1
1
1
1
1
通过这种方式,我们验证了带有相同键的消息被发布到了同一个分区。
5. 总结
在这篇文章中,我们了解了 Kafka 消息中键的重要性。
我们首先了解了如何将带有键的消息发布到主题。然后,我们讨论了如何验证具有相同键的消息被发布到同一分区。
如往常一样,所有示例的完整代码都在 GitHub 上可获取。