1. 概述

Apache Kafka 是一个开源的分布式事件流处理平台。

在这个快速教程中,我们将学习如何获取Kafka主题中的消息数量。我们将展示编程方法以及原生命令技术。

2. 编程技术

一个Kafka主题可能有多个分区。我们的技术需要确保我们统计了每个分区的消息数量。

首先,我们需要创建一个消费者:

KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);

下一步是从这个消费者中获取所有分区

List<TopicPartition> partitions = consumer.partitionsFor(topic).stream().map(p -> new TopicPartition(topic, p.partition()))
    .collect(Collectors.toList());

第三步是将消费者偏移量设置到每个分区的末尾,并将结果记录在分区映射中

consumer.assign(partitions);
consumer.seekToEnd(Collections.emptySet());
Map<TopicPartition, Long> endPartitions = partitions.stream().collect(Collectors.toMap(Function.identity(), consumer::position));

最后一步是取每个分区的最新位置,并将结果相加得到主题中的消息总数

numberOfMessages = partitions.stream().mapToLong(p -> endPartitions.get(p)).sum();

3. Kafka 原生命令

如果只是用于分析目的,编程技术需要创建服务并在机器上运行,可能会显得有些冗余。一个直接的选择是使用Kafka的原生命令,它能快速给出结果。

3.1. 使用 GetoffsetShell 命令

在执行原生命令之前,我们需要导航到机器上的Kafka根目录。以下命令返回主题“baeldung”上的发布消息数量:

$ bin/kafka-run-class.sh kafka.tools.GetOffsetShell   --broker-list localhost:9092   
--topic baeldung   | awk -F  ":" '{sum += $3} END {print "Result: "sum}'
Result: 3

3.2. 使用消费者控制台

如前所述,我们将在执行任何命令之前先导航到Kafka的根目录。以下命令返回主题“baeldung”上的发布消息数量:

$ bin/kafka-console-consumer.sh  --from-beginning  --bootstrap-server localhost:9092 
--property print.key=true --property print.value=false --property print.partition 
--topic baeldung --timeout-ms 5000 | tail -n 10|grep "Processed a total of"
Processed a total of 3 messages

4. 总结

在这篇文章中,我们探讨了获取Kafka主题中消息数量的方法。我们学习了一种编程技术,它将所有分区分配给一个消费者并检查最新的偏移量。

我们也了解了两种Kafka原生命令技术。一种是Kafka工具中的GetoffsetShell命令,另一种是在控制台上运行消费者并打印开始处的消息数量。

如往常一样,这篇文章的源代码可以在GitHub上找到。