2. 使用Kafka消费者API从头读取数据
2.1. 设置
在开始之前,我们首先需要设置依赖项,初始化Kafka集群连接,并向Kafka发布一些消息。
Kafka提供了一个方便的Java客户端库,我们可以使用它在Kafka集群上执行各种操作。
2.1.1. 依赖项
首先,在项目的pom.xml
文件中添加Kafka客户端Java库的Maven依赖:
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>3.4.0</version>
</dependency>
2.1.2. 集群和主题初始化
在整个指南中,我们将假设在本地系统上运行一个默认配置的Kafka集群。
其次,我们需要创建一个可以用于发布和消费消息的Kafka主题。请参考我们的Kafka主题创建指南来创建名为“baeldung”的主题。
现在,我们的Kafka集群已经启动并创建了主题,接下来让我们向Kafka发布一些消息。
2.1.3. 发布消息
最后,让我们向名为“baeldung”的Kafka主题发布一些示例消息。
为了发布消息,我们可以创建一个KafkaProducer实例,使用Properties实例定义的基本配置:
Properties producerProperties = new Properties();
producerProperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_CONTAINER.getBootstrapServers());
producerProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
producerProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
KafkaProducer<String, String> producer = new KafkaProducer<>(producerProperties);
然后,使用KafkaProducer.send(ProducerRecord)方法将消息发布到“baeldung”主题:
for (int i = 1; i <= 10; i++) {
ProducerRecord<String, String> record = new ProducerRecord<>("baeldung", String.valueOf(i));
producer.send(record);
}
我们向集群发布了10条消息,这些将用于演示我们的消费者实现。
3. 从头开始消费消息
至此,我们已经初始化了Kafka集群并发布了一些示例消息到主题。接下来,我们将学习如何从头开始读取消息。
为了演示这一点,我们首先创建一个KafkaConsumer实例,其中包含由Properties实例定义的特定消费者属性。然后,我们将使用创建的KafkaConsumer实例来消费消息,并再次回退到分区偏移量的开始。
让我们详细看看这些步骤。
3.1. 消费者属性
为了从Kafka主题的开始位置消费消息,我们创建一个KafkaConsumer实例,其中设置消费者的“group.id”属性为随机生成的UUID。这样做的原因是:
Properties consumerProperties = new Properties();
consumerProperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
consumerProperties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
consumerProperties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
consumerProperties.put(ConsumerConfig.GROUP_ID_CONFIG, UUID.randomUUID().toString());
当我们为消费者生成一个新的组ID时,消费者将始终属于由“group.id”属性标识的新消费者组。新消费者组不会有与之关联的任何偏移量。在这种情况下,Kafka提供了一个名为“auto.offset.reset”的属性,它指示当Kafka中没有初始偏移量或服务器上当前偏移量不存在时应做什么。
“auto.offset.reset”属性接受以下值:
- earliest:自动重置偏移量到最早的偏移量
- latest:自动重置偏移量到最新的偏移量
- none:如果找不到消费者组的先前偏移量,将抛出异常给消费者
- 其他任何值:如果设置的值不是上述三个,将向消费者抛出异常
由于我们想从Kafka主题的开始位置读取,我们将“auto.offset.reset”属性设置为“earliest”:
consumerProperties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
现在,使用消费者属性创建KafkaConsumer实例:
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerProperties);
我们将使用这个KafkaConsumer实例从主题的开始位置消费消息。
3.2. 消费消息
首先,我们需要订阅消费者以从主题“baeldung”消费消息:
consumer.subscribe(Arrays.asList("baeldung"));
接下来,我们*使用KafkaConsumer.poll(Duration duration)方法从“baeldung”主题中轮询新消息,直到指定的Duration参数所指的时间:*
ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(10));
for (ConsumerRecord<String, String> record : records) {
logger.info(record.value());
}
通过这种方式,我们已经从“baeldung”主题的开始位置读取了所有消息。
此外,*要使现有的消费者重置为从主题的开始位置读取,我们可以使用KafkaConsumer.seekToBeginning(Collection
consumer.seekToBeginning(consumer.assignment());
这里,我们将*KafkaConsumer.assignment()的值传递给seekToBeginning()方法。KafkaConsumer.assignment()*方法返回当前分配给消费者的分区集。
最后,再次轮询同一个消费者获取消息,现在会从分区的开始位置读取所有消息:
ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(10));
for (ConsumerRecord<String, String> record : records) {
logger.info(record.value());
}
4. 总结
在这篇文章中,我们了解了如何使用Kafka消费者API从头读取Kafka主题中的消息。
我们首先探讨了如何让新的消费者从Kafka主题的开始位置读取消息及其实现。然后,我们看到了如何让正在消费的消费者重置其偏移量,以便从头开始读取消息。
如往常一样,所有示例的完整代码可以在GitHub上找到。