1. 概述
在这个简短的教程中,我们将了解如何从Apache Kafka主题中获取最后N条消息。
在文章的第一部分,我们将专注于执行此操作所需的先决条件。在第二部分,我们将使用Java和Apache Kafka Java API库构建一个小型实用工具来读取消息。最后,我们将提供使用Apache KafkaCat命令行工具实现相同结果的简要指南。
2. 先决条件
从Kafka主题中获取最后N条消息,就像从定义好的偏移量开始消费消息一样简单。 Kafka主题中的偏移量指示消费者的位置。在之前的文章中,我们已经看到如何利用*consumer.seekToEnd()*方法从特定主题中获取一定数量的消息。
考虑到相同的功能,我们可以通过简单的减法来理解如何计算正确的偏移量:offset = lastOffset - N。然后,我们可以从此位置开始轮询N条消息。
然而,如果使用事务性生产者生成记录,这种方法将不起作用。在这种情况下,偏移量会跳过一些数字以适应Kafka主题的事务性记录(提交/回滚等)。事务性生产者通常用于需要精确一次处理Kafka消息的情况。简单地说,如果我们从(lastOffset - N)开始读取,可能会消费到少于N条消息,因为一些偏移量被事务性记录消耗了【参见Apache Kafka问题记录KAFKA-10009】。
3. 使用Java获取Kafka主题中的最后N条消息
首先,我们需要创建一个生产者和一个消费者:
Properties producerProperties = new Properties();
producerProperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_CONTAINER.getBootstrapServers());
producerProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
producerProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
Properties consumerProperties = new Properties();
consumerProperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_CONTAINER.getBootstrapServers());
consumerProperties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
consumerProperties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
consumerProperties.put(ConsumerConfig.GROUP_ID_CONFIG, "ConsumerGroup1");
KafkaProducer<String, String> producer = new KafkaProducer<>(producerProperties);
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerProperties);
现在让我们产生一些消息:
final String TOPIC1 = "baeldung-topic";
int messagesInTopic = 100;
for (int i = 0; i < messagesInTopic; i++) {
producer.send(new ProducerRecord(TOPIC1, null, MESSAGE_KEY, String.valueOf(i))).get();
}
为了清晰和简单起见,假设我们只需要为消费者注册一个分区:
TopicPartition partition = new TopicPartition(TOPIC1, 0);
List<TopicPartition> partitions = new ArrayList<>();
partitions.add(partition);
consumer.assign(partitions);
正如我们之前提到的,我们需要将偏移量定位在正确的位置,然后开始轮询:
int messagesToRetrieve = 10;
consumer.seekToEnd(partitions);
long startIndex = consumer.position(partition) - messagesToRetrieve;
consumer.seek(partition, startIndex);
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMinutes(1));
如果网络特别慢,或者要检索的消息数量特别大,我们可能需要增加轮询时间。在这种情况下,我们需要考虑内存中存在大量记录可能导致资源短缺的问题。
现在,让我们最后检查是否确实获取到了正确数量的消息:
for (ConsumerRecord<String, String> record : records) {
assertEquals(MESSAGE_KEY, record.key());
assertTrue(Integer.parseInt(record.value()) >= (messagesInTopic - messagesToRetrieve));
recordsReceived++;
}
assertEquals(messagesToRetrieve, recordsReceived);
4. 使用KafkaCat获取Kafka主题中的最后N条消息
KafkaCat(kcat)是一个可用于测试和调试Kafka主题的命令行工具。Kafka本身提供了许多脚本和shell工具来执行相同的操作,但KafkaCat的简单易用性使其在诸如从Apache Kafka主题中获取最后N条消息这样的操作上成为事实上的标准。一旦安装了它,我们可以通过运行这个简单的命令来获取Kafka主题中最近产生的N条消息:
$ kafkacat -C -b localhost:9092 -t topic-name -o -<N> -e
- -C表示我们需要消费消息
- -b指定Kafka代理的地址
- -t指定主题名称
- -o表示我们需要从这个偏移量开始读取。带有负号表示从末尾读取N条消息
- -e选项在读取最后一条消息后退出
回到我们之前讨论的案例,从名为"baeldung-topic"的主题中获取最后10条消息的命令是:
$ kafkacat -C -b localhost:9092 -t baeldung-topic -o -10 -e
5. 总结
在这篇简短的教程中,我们了解了如何从Kafka主题中消费最新N条消息。在第一部分,我们使用了Java Kafka API库。在第二部分,我们使用了一个名为KafkaCat的命令行实用程序。
如往常一样,代码可在GitHub上找到。