1. 概述
Kafka 是一个开源的分布式流处理中间件,它通过 发布-订阅模式 解耦消息的生产者和消费者。Kafka 的使用场景包括消息传递、监控、指标收集和流式处理等。
Kafka 中的信息是通过 Topic 来组织的。每个 Topic 是一个特定的数据流。生产者 向 Topic 写入数据,消费者 从 Topic 读取数据。多个消费者可以组成一个 消费者组(Consumer Group) 来消费一个 Topic。
在本教程中,我们将介绍如何通过命令行从 Kafka 的消费者组中移除一个特定的 Topic。我们使用的 Kafka 版本是 3.7.0。
2. 消费者组简介
一个 Kafka 消费者可以单独消费某个 Topic 的消息。但也可以将多个消费者加入同一个消费者组中。组内的每个消费者负责读取互不重叠的分区(Partition),即一个分区只能被组内的一个消费者消费。因此,整个消费者组作为一个整体消费整个 Topic。
Kafka 使用 消费者偏移量(Consumer Offsets) 来记录消费者组中每个消费者读取了分区中的哪些数据。消费者偏移量使得当某个消费者宕机或新消费者加入组时,不会丢失数据。消费者组会定期提交偏移量到 Kafka 内部的一个特殊 Topic(通常是 __consumer_offsets
)中。
3. 示例环境搭建
我们将在本地搭建一个单节点的 Kafka 集群,并使用 Kafka 自带的命令行工具完成操作。
3.1. 启动 Kafka 服务
我们使用 Kafka Raft(KRaft)模式启动 Kafka,不需要依赖 Zookeeper。
首先,使用 kafka-storage.sh
工具生成一个集群 ID:
$ kafka-storage.sh random-uuid
pNdgxrKvQAaU9x-a_dXY0A
然后,格式化 Kafka 的存储目录:
$ kafka-storage.sh format -t pNdgxrKvQAaU9x-a_dXY0A -c /home/baeldung/work/kafka/config/kraft/server.properties
metaPropertiesEnsemble=MetaPropertiesEnsemble(metadataLogDir=Optional.empty, dirs={/tmp/kraft-combined-logs: EMPTY})
Formatting /tmp/kraft-combined-logs with metadata.version 3.7-IV4.
最后,启动 Kafka 服务:
$ kafka-server-start.sh /home/baeldung/work/kafka/config/kraft/server.properties
[2024-05-22 02:51:23,808] INFO Registered kafka:type=kafka.Log4jController MBean (kafka.utils.Log4jControllerRegistration$)
...
3.2. 创建 Topic
使用 kafka-topics.sh
创建两个 Topic:first-topic
和 second-topic
:
$ kafka-topics.sh --bootstrap-server localhost:9092 --create --topic first-topic
Created topic first-topic.
$ kafka-topics.sh --bootstrap-server localhost:9092 --create --topic second-topic
Created topic second-topic.
查看 Topic 列表:
$ kafka-topics.sh --bootstrap-server localhost:9092 --describe
Topic: first-topic TopicId: fExg0TR8SM6YTzsGoPixEA PartitionCount: 1 ReplicationFactor: 1 Configs: segment.bytes=1073741824
Topic: first-topic Partition: 0 Leader: 1 Replicas: 1 Isr: 1
Topic: second-topic TopicId: LVi3UHPxRciawbnR41WWcQ PartitionCount: 1 ReplicationFactor: 1 Configs: segment.bytes=1073741824
Topic: second-topic Partition: 0 Leader: 1 Replicas: 1 Isr: 1
默认每个 Topic 只有一个分区。可以通过 --partitions
参数指定分区数。
3.3. 启动消费者并加入消费者组
使用 kafka-console-consumer.sh
启动两个消费者,并加入同一个消费者组 first-group
:
$ kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic first-topic --group first-group
$ kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic second-topic --group first-group
查看消费者组列表:
$ kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list
first-group
3.4. 启动生产者并发送消息
使用 kafka-console-producer.sh
启动一个生产者,向 first-topic
发送消息:
$ kafka-console-producer.sh --bootstrap-server localhost:9092 --topic first-topic
>Hello1
>Hello2
>
此时,消费者会接收到这两条消息:
$ kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic first-topic --group first-group
Hello1
Hello2
3.5. 查看消费者组偏移量状态
使用 kafka-consumer-groups.sh --describe
查看消费者组偏移量状态:
$ kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group first-group
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
first-group first-topic 0 2 2 0 console-consumer-753... /127.0.0.1 console-consumer
first-group second-topic 0 0 0 0 console-consumer-a69... /127.0.0.1 console-consumer
其中 LAG
表示未处理的消息数量。
3.6. 停止消费者并积累消息
停止第一个消费者(first-topic
),并向其继续发送两条消息:
$ kafka-console-producer.sh --bootstrap-server localhost:9092 --topic first-topic
>Hello1
>Hello2
>Hello3
>Hello4
>
再次查看消费者组偏移量状态:
$ kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group first-group
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
first-group second-topic 0 0 0 0 console-consumer-a69... /127.0.0.1 console-consumer
first-group first-topic 0 2 4 2
可以看到 first-topic
的 LAG
是 2,表示有两条消息未被消费。
4. 使用 --delete-offsets
移除 Topic
如果确定不再需要消费某个 Topic,可以使用 kafka-consumer-groups.sh
的 --delete-offsets
参数删除该 Topic 的偏移量信息。
执行删除命令:
$ kafka-consumer-groups.sh --bootstrap-server localhost:9092 --delete-offsets --group first-group --topic first-topic
Request succeed for deleting offsets with topic first-topic group first-group
TOPIC PARTITION STATUS
first-topic 0 Successful
再次查看消费者组信息:
$ kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group first-group
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
first-group second-topic 0 0 0 0 console-consumer-a69... /127.0.0.1 console-consumer
此时,first-topic
已不在消费者组中。
查看 Kafka 中的 Topic 列表:
$ kafka-topics.sh --bootstrap-server localhost:9092 --list
__consumer_offsets
first-topic
second-topic
可以看到 first-topic
仍然存在于 Kafka 中,只是不再被消费者组消费。
5. 总结
在本文中,我们介绍了如何使用 Kafka 提供的命令行工具从消费者组中移除一个特定的 Topic。
我们首先搭建了一个 Kafka 单节点环境,创建了多个 Topic 并加入了同一个消费者组。随后演示了 Kafka 如何在消费者离线时积累消息,以及如何使用 --delete-offsets
参数清理偏移量信息,从而实现从消费者组中彻底移除某个 Topic。
✅ 关键点总结:
- 使用
--delete-offsets
可以移除消费者组中某个 Topic 的偏移量。 - Topic 本身不会被删除,只是不再被消费者组消费。
- Kafka 内部使用
__consumer_offsets
Topic 来记录消费者偏移量。
❌ 注意:
- 该操作不可逆,一旦执行,相关 Topic 的偏移量将被清除。
- 如果消费者组后续再次消费该 Topic,会从默认位置(如 earliest 或 latest)开始读取。
⚠️ 建议:
- 在生产环境中执行此操作前,务必确认不再需要消费该 Topic 的历史数据。