2. 使用Kafka消费者API从头读取数据

2.1. 设置

在开始之前,我们首先需要设置依赖项,初始化Kafka集群连接,并向Kafka发布一些消息。

Kafka提供了一个方便的Java客户端库,我们可以使用它在Kafka集群上执行各种操作。

2.1.1. 依赖项

首先,在项目的pom.xml文件中添加Kafka客户端Java库的Maven依赖

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>3.4.0</version>
</dependency>

2.1.2. 集群和主题初始化

在整个指南中,我们将假设在本地系统上运行一个默认配置的Kafka集群。

其次,我们需要创建一个可以用于发布和消费消息的Kafka主题。请参考我们的Kafka主题创建指南来创建名为“baeldung”的主题。

现在,我们的Kafka集群已经启动并创建了主题,接下来让我们向Kafka发布一些消息。

2.1.3. 发布消息

最后,让我们向名为“baeldung”的Kafka主题发布一些示例消息。

为了发布消息,我们可以创建一个KafkaProducer实例,使用Properties实例定义的基本配置:

Properties producerProperties = new Properties();
producerProperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_CONTAINER.getBootstrapServers());
producerProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
producerProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

KafkaProducer<String, String> producer = new KafkaProducer<>(producerProperties);

然后,使用KafkaProducer.send(ProducerRecord)方法将消息发布到“baeldung”主题:

for (int i = 1; i <= 10; i++) {
    ProducerRecord<String, String> record = new ProducerRecord<>("baeldung", String.valueOf(i));
    producer.send(record);
}

我们向集群发布了10条消息,这些将用于演示我们的消费者实现。

3. 从头开始消费消息

至此,我们已经初始化了Kafka集群并发布了一些示例消息到主题。接下来,我们将学习如何从头开始读取消息。

为了演示这一点,我们首先创建一个KafkaConsumer实例,其中包含由Properties实例定义的特定消费者属性。然后,我们将使用创建的KafkaConsumer实例来消费消息,并再次回退到分区偏移量的开始。

让我们详细看看这些步骤。

3.1. 消费者属性

为了从Kafka主题的开始位置消费消息,我们创建一个KafkaConsumer实例,其中设置消费者的“group.id”属性为随机生成的UUID。这样做的原因是:

Properties consumerProperties = new Properties();
consumerProperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
consumerProperties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
consumerProperties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
consumerProperties.put(ConsumerConfig.GROUP_ID_CONFIG, UUID.randomUUID().toString());

当我们为消费者生成一个新的组ID时,消费者将始终属于由“group.id”属性标识的新消费者组。新消费者组不会有与之关联的任何偏移量。在这种情况下,Kafka提供了一个名为“auto.offset.reset”的属性,它指示当Kafka中没有初始偏移量或服务器上当前偏移量不存在时应做什么。

auto.offset.reset”属性接受以下值:

  • earliest:自动重置偏移量到最早的偏移量
  • latest:自动重置偏移量到最新的偏移量
  • none:如果找不到消费者组的先前偏移量,将抛出异常给消费者
  • 其他任何值:如果设置的值不是上述三个,将向消费者抛出异常

由于我们想从Kafka主题的开始位置读取,我们将“auto.offset.reset”属性设置为“earliest”:

consumerProperties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

现在,使用消费者属性创建KafkaConsumer实例:

KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerProperties);

我们将使用这个KafkaConsumer实例从主题的开始位置消费消息。

3.2. 消费消息

首先,我们需要订阅消费者以从主题“baeldung”消费消息:

consumer.subscribe(Arrays.asList("baeldung"));

接下来,我们*使用KafkaConsumer.poll(Duration duration)方法从“baeldung”主题中轮询新消息,直到指定的Duration参数所指的时间:*

ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(10));

for (ConsumerRecord<String, String> record : records) {
    logger.info(record.value());
}

通过这种方式,我们已经从“baeldung”主题的开始位置读取了所有消息。

此外,*要使现有的消费者重置为从主题的开始位置读取,我们可以使用KafkaConsumer.seekToBeginning(Collection partitions)方法。* 这个方法接受一个TopicPartition集合,并将消费者的偏移量指向分区的开始:

consumer.seekToBeginning(consumer.assignment());

这里,我们将*KafkaConsumer.assignment()的值传递给seekToBeginning()方法。KafkaConsumer.assignment()*方法返回当前分配给消费者的分区集。

最后,再次轮询同一个消费者获取消息,现在会从分区的开始位置读取所有消息:

ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(10));

for (ConsumerRecord<String, String> record : records) {
    logger.info(record.value());
}

4. 总结

在这篇文章中,我们了解了如何使用Kafka消费者API从头读取Kafka主题中的消息。

我们首先探讨了如何让新的消费者从Kafka主题的开始位置读取消息及其实现。然后,我们看到了如何让正在消费的消费者重置其偏移量,以便从头开始读取消息。

如往常一样,所有示例的完整代码可以在GitHub上找到。