1. 概述

在这个简短的教程中,我们将了解如何从Apache Kafka主题中获取最后N条消息。

在文章的第一部分,我们将专注于执行此操作所需的先决条件。在第二部分,我们将使用Java和Apache Kafka Java API库构建一个小型实用工具来读取消息。最后,我们将提供使用Apache KafkaCat命令行工具实现相同结果的简要指南。

2. 先决条件

从Kafka主题中获取最后N条消息,就像从定义好的偏移量开始消费消息一样简单。 Kafka主题中的偏移量指示消费者的位置。在之前的文章中,我们已经看到如何利用*consumer.seekToEnd()*方法从特定主题中获取一定数量的消息。

考虑到相同的功能,我们可以通过简单的减法来理解如何计算正确的偏移量:offset = lastOffset - N。然后,我们可以从此位置开始轮询N条消息。

然而,如果使用事务性生产者生成记录,这种方法将不起作用。在这种情况下,偏移量会跳过一些数字以适应Kafka主题的事务性记录(提交/回滚等)。事务性生产者通常用于需要精确一次处理Kafka消息的情况。简单地说,如果我们从(lastOffset - N)开始读取,可能会消费到少于N条消息,因为一些偏移量被事务性记录消耗了【参见Apache Kafka问题记录KAFKA-10009】。

3. 使用Java获取Kafka主题中的最后N条消息

首先,我们需要创建一个生产者和一个消费者

Properties producerProperties = new Properties();
producerProperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_CONTAINER.getBootstrapServers());
producerProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
producerProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

Properties consumerProperties = new Properties();
consumerProperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_CONTAINER.getBootstrapServers());
consumerProperties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
consumerProperties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
consumerProperties.put(ConsumerConfig.GROUP_ID_CONFIG, "ConsumerGroup1");

KafkaProducer<String, String> producer = new KafkaProducer<>(producerProperties);
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerProperties);

现在让我们产生一些消息:

final String TOPIC1 = "baeldung-topic";
int messagesInTopic = 100;
for (int i = 0; i < messagesInTopic; i++) {
    producer.send(new ProducerRecord(TOPIC1, null, MESSAGE_KEY, String.valueOf(i))).get();
}

为了清晰和简单起见,假设我们只需要为消费者注册一个分区:

TopicPartition partition = new TopicPartition(TOPIC1, 0);
List<TopicPartition> partitions = new ArrayList<>();
partitions.add(partition);
consumer.assign(partitions);

正如我们之前提到的,我们需要将偏移量定位在正确的位置,然后开始轮询:

int messagesToRetrieve = 10;
consumer.seekToEnd(partitions);
long startIndex = consumer.position(partition) - messagesToRetrieve;
consumer.seek(partition, startIndex);
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMinutes(1));

如果网络特别慢,或者要检索的消息数量特别大,我们可能需要增加轮询时间。在这种情况下,我们需要考虑内存中存在大量记录可能导致资源短缺的问题。

现在,让我们最后检查是否确实获取到了正确数量的消息:

for (ConsumerRecord<String, String> record : records) {
    assertEquals(MESSAGE_KEY, record.key());
    assertTrue(Integer.parseInt(record.value()) >= (messagesInTopic - messagesToRetrieve));
    recordsReceived++;
}
assertEquals(messagesToRetrieve, recordsReceived);

4. 使用KafkaCat获取Kafka主题中的最后N条消息

KafkaCat(kcat)是一个可用于测试和调试Kafka主题的命令行工具。Kafka本身提供了许多脚本和shell工具来执行相同的操作,但KafkaCat的简单易用性使其在诸如从Apache Kafka主题中获取最后N条消息这样的操作上成为事实上的标准。一旦安装了它,我们可以通过运行这个简单的命令来获取Kafka主题中最近产生的N条消息:

$ kafkacat -C -b localhost:9092 -t topic-name -o -<N> -e
  • -C表示我们需要消费消息
  • -b指定Kafka代理的地址
  • -t指定主题名称
  • -o表示我们需要从这个偏移量开始读取。带有负号表示从末尾读取N条消息
  • -e选项在读取最后一条消息后退出

回到我们之前讨论的案例,从名为"baeldung-topic"的主题中获取最后10条消息的命令是:

$ kafkacat -C -b localhost:9092 -t baeldung-topic -o -10 -e

5. 总结

在这篇简短的教程中,我们了解了如何从Kafka主题中消费最新N条消息。在第一部分,我们使用了Java Kafka API库。在第二部分,我们使用了一个名为KafkaCat的命令行实用程序。

如往常一样,代码可在GitHub上找到。