1. 概述
在这篇快速教程中,我们将学习如何列出 Kafka 消费者组,并一窥其详情。
2. 先决条件
为了运行此教程中的示例,我们需要一个 Kafka 集群以发送我们的请求。这可以是生产环境中的完整 Kafka 集群,也可以是用于特定测试的单实例 Kafka 集群(请参阅 /devops/kafka-list-topics#设置-kafka)。
为了简化起见,我们假设有一个监听端口 9092 的单节点集群,并且在本地主机上监听了 Zookeeper 实例(请参阅 /java-zookeeper)。此外,请注意,我们从 Kafka 安装目录运行所有示例命令。
3. 添加主题和消费者
在列出特定 Kafka 集群上的消费者之前,让我们先使用 kafka-topics.sh 脚本添加一些主题:
$ ./bin/kafka-topics.sh --create --topic users.registrations --replication-factor 1 \
--partitions 2 --zookeeper localhost:2181
$ ./bin/kafka-topics.sh --create --topic users.verfications --replication-factor 1 \
--partitions 2 --zookeeper localhost:2181
现在,我们需要添加一些消费者组。最简单的方法是使用 Kafka 发行版中捆绑的控制台消费者:
$ ./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic users.registrations --group new-user
$ ./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic users.registrations --group new-user
在这里,我们使用了 kafka-console-consumer.sh 脚本,添加了两个监听同一主题的消费者。这些消费者属于同一个组,因此来自主题分区的消息将分布在组的成员之间。这样,我们就可以在 Kafka 中实现 [竞争消费者] (https://www.enterpriseintegrationpatterns.com/patterns/messaging/CompetingConsumers.html) 模式。
我们还需要从另一个主题消费:
$ ./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic users.verifications
由于我们没有为消费者指定组,控制台消费者创建了一个新的组,其中只有一个成员。
接下来,我们将学习如何列出 Kafka 集群上的消费者和消费者组,并查看每个组的详细信息。
4. 列出消费者
要列出 Kafka 集群上的消费者,我们可以使用 kafka-consumer-groups.sh 脚本。*–list*
选项将列出所有消费者组:
$ ./bin/kafka-consumer-groups.sh --list --bootstrap-server localhost:9092
new-user
console-consumer-40123
除了 *–list*
选项外,我们还传递了 *–bootstrap-server*
选项来指定 Kafka 集群地址。我们有三个单独的消费者分布在两个组中,所以结果只包含两个组。
要查看第一个组的成员,我们可以使用 *“–group <name> –describe –members”*
选项:
$ ./bin/kafka-consumer-groups.sh --describe --group new-user --members --bootstrap-server localhost:9092
GROUP CONSUMER-ID HOST CLIENT-ID #PARTITIONS
new-user consumer-new-user-1-b90... /127.0.0.1 consumer-new-user-1 1
new-user consumer-new-user-1-af8... /127.0.0.1 consumer-new-user-1 1
在这里,我们可以看到我们的 new-user 组中有两个独立的消费者,每个消费者都从一个分区消费。
如果省略了 *–members*
选项,它将列出组中的消费者、它们正在监听的分区编号以及它们的位置偏移量:
$ ./bin/kafka-consumer-groups.sh --describe --group new-user --bootstrap-server localhost:9092
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG
new-user users.registrations 1 3 3 0
new-user users.registrations 0 5 5 0
需要注意的一件事是,此命令需要集群或 Bootstrap 服务器地址。如果我们省略集群连接信息,脚本将抛出错误:
$ ./bin/kafka-consumer-groups.sh --list
Missing required argument "[bootstrap-server]"
// truncated
5. 总结
在这篇简短的教程中,我们首先添加了一些 Kafka 主题和消费者组。然后,我们学习了如何列出消费者组并查看每个组的详细信息。