1. 概述
Apache Kafka 是一个开源的分布式事件流处理平台。
在这个快速教程中,我们将学习如何获取Kafka主题中的消息数量。我们将展示编程方法以及原生命令技术。
2. 编程技术
一个Kafka主题可能有多个分区。我们的技术需要确保我们统计了每个分区的消息数量。
首先,我们需要创建一个消费者:
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
下一步是从这个消费者中获取所有分区:
List<TopicPartition> partitions = consumer.partitionsFor(topic).stream().map(p -> new TopicPartition(topic, p.partition()))
.collect(Collectors.toList());
第三步是将消费者偏移量设置到每个分区的末尾,并将结果记录在分区映射中:
consumer.assign(partitions);
consumer.seekToEnd(Collections.emptySet());
Map<TopicPartition, Long> endPartitions = partitions.stream().collect(Collectors.toMap(Function.identity(), consumer::position));
最后一步是取每个分区的最新位置,并将结果相加得到主题中的消息总数:
numberOfMessages = partitions.stream().mapToLong(p -> endPartitions.get(p)).sum();
3. Kafka 原生命令
如果只是用于分析目的,编程技术需要创建服务并在机器上运行,可能会显得有些冗余。一个直接的选择是使用Kafka的原生命令,它能快速给出结果。
3.1. 使用 GetoffsetShell
命令
在执行原生命令之前,我们需要导航到机器上的Kafka根目录。以下命令返回主题“baeldung”上的发布消息数量:
$ bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list localhost:9092
--topic baeldung | awk -F ":" '{sum += $3} END {print "Result: "sum}'
Result: 3
3.2. 使用消费者控制台
如前所述,我们将在执行任何命令之前先导航到Kafka的根目录。以下命令返回主题“baeldung”上的发布消息数量:
$ bin/kafka-console-consumer.sh --from-beginning --bootstrap-server localhost:9092
--property print.key=true --property print.value=false --property print.partition
--topic baeldung --timeout-ms 5000 | tail -n 10|grep "Processed a total of"
Processed a total of 3 messages
4. 总结
在这篇文章中,我们探讨了获取Kafka主题中消息数量的方法。我们学习了一种编程技术,它将所有分区分配给一个消费者并检查最新的偏移量。
我们也了解了两种Kafka原生命令技术。一种是Kafka工具中的GetoffsetShell
命令,另一种是在控制台上运行消费者并打印开始处的消息数量。
如往常一样,这篇文章的源代码可以在GitHub上找到。