1. 概述

Kafka 是一个开源的分布式流处理中间件,它通过 发布-订阅模式 解耦消息的生产者和消费者。Kafka 的使用场景包括消息传递、监控、指标收集和流式处理等。

Kafka 中的信息是通过 Topic 来组织的。每个 Topic 是一个特定的数据流。生产者 向 Topic 写入数据,消费者 从 Topic 读取数据。多个消费者可以组成一个 消费者组(Consumer Group) 来消费一个 Topic。

在本教程中,我们将介绍如何通过命令行从 Kafka 的消费者组中移除一个特定的 Topic。我们使用的 Kafka 版本是 3.7.0。


2. 消费者组简介

一个 Kafka 消费者可以单独消费某个 Topic 的消息。但也可以将多个消费者加入同一个消费者组中。组内的每个消费者负责读取互不重叠的分区(Partition),即一个分区只能被组内的一个消费者消费。因此,整个消费者组作为一个整体消费整个 Topic。

Kafka 使用 消费者偏移量(Consumer Offsets) 来记录消费者组中每个消费者读取了分区中的哪些数据。消费者偏移量使得当某个消费者宕机或新消费者加入组时,不会丢失数据。消费者组会定期提交偏移量到 Kafka 内部的一个特殊 Topic(通常是 __consumer_offsets)中。


3. 示例环境搭建

我们将在本地搭建一个单节点的 Kafka 集群,并使用 Kafka 自带的命令行工具完成操作。

3.1. 启动 Kafka 服务

我们使用 Kafka Raft(KRaft)模式启动 Kafka,不需要依赖 Zookeeper。

首先,使用 kafka-storage.sh 工具生成一个集群 ID:

$ kafka-storage.sh random-uuid
pNdgxrKvQAaU9x-a_dXY0A

然后,格式化 Kafka 的存储目录:

$ kafka-storage.sh format -t pNdgxrKvQAaU9x-a_dXY0A -c /home/baeldung/work/kafka/config/kraft/server.properties
metaPropertiesEnsemble=MetaPropertiesEnsemble(metadataLogDir=Optional.empty, dirs={/tmp/kraft-combined-logs: EMPTY})
Formatting /tmp/kraft-combined-logs with metadata.version 3.7-IV4.

最后,启动 Kafka 服务:

$ kafka-server-start.sh /home/baeldung/work/kafka/config/kraft/server.properties
[2024-05-22 02:51:23,808] INFO Registered kafka:type=kafka.Log4jController MBean (kafka.utils.Log4jControllerRegistration$)
...

3.2. 创建 Topic

使用 kafka-topics.sh 创建两个 Topic:first-topicsecond-topic

$ kafka-topics.sh --bootstrap-server localhost:9092 --create --topic first-topic
Created topic first-topic.

$ kafka-topics.sh --bootstrap-server localhost:9092 --create --topic second-topic
Created topic second-topic.

查看 Topic 列表:

$ kafka-topics.sh --bootstrap-server localhost:9092 --describe
Topic: first-topic    TopicId: fExg0TR8SM6YTzsGoPixEA    PartitionCount: 1    ReplicationFactor: 1    Configs: segment.bytes=1073741824
    Topic: first-topic    Partition: 0    Leader: 1    Replicas: 1    Isr: 1
Topic: second-topic    TopicId: LVi3UHPxRciawbnR41WWcQ    PartitionCount: 1    ReplicationFactor: 1    Configs: segment.bytes=1073741824
    Topic: second-topic    Partition: 0    Leader: 1    Replicas: 1    Isr: 1

默认每个 Topic 只有一个分区。可以通过 --partitions 参数指定分区数。

3.3. 启动消费者并加入消费者组

使用 kafka-console-consumer.sh 启动两个消费者,并加入同一个消费者组 first-group

$ kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic first-topic --group first-group

$ kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic second-topic --group first-group

查看消费者组列表:

$ kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list
first-group

3.4. 启动生产者并发送消息

使用 kafka-console-producer.sh 启动一个生产者,向 first-topic 发送消息:

$ kafka-console-producer.sh --bootstrap-server localhost:9092 --topic first-topic
>Hello1
>Hello2
>

此时,消费者会接收到这两条消息:

$ kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic first-topic --group first-group
Hello1
Hello2

3.5. 查看消费者组偏移量状态

使用 kafka-consumer-groups.sh --describe 查看消费者组偏移量状态:

$ kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group first-group

GROUP           TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID             HOST            CLIENT-ID
first-group     first-topic     0          2               2               0               console-consumer-753... /127.0.0.1      console-consumer
first-group     second-topic    0          0               0               0               console-consumer-a69... /127.0.0.1      console-consumer

其中 LAG 表示未处理的消息数量。

3.6. 停止消费者并积累消息

停止第一个消费者(first-topic),并向其继续发送两条消息:

$ kafka-console-producer.sh --bootstrap-server localhost:9092 --topic first-topic
>Hello1
>Hello2
>Hello3
>Hello4
>

再次查看消费者组偏移量状态:

$ kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group first-group

GROUP           TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID             HOST            CLIENT-ID
first-group     second-topic    0          0               0               0               console-consumer-a69... /127.0.0.1      console-consumer
first-group     first-topic     0          2               4               2

可以看到 first-topicLAG 是 2,表示有两条消息未被消费。


4. 使用 --delete-offsets 移除 Topic

如果确定不再需要消费某个 Topic,可以使用 kafka-consumer-groups.sh--delete-offsets 参数删除该 Topic 的偏移量信息。

执行删除命令:

$ kafka-consumer-groups.sh --bootstrap-server localhost:9092 --delete-offsets --group first-group --topic first-topic
Request succeed for deleting offsets with topic first-topic group first-group

TOPIC                          PARTITION       STATUS         
first-topic                    0               Successful

再次查看消费者组信息:

$ kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group first-group

GROUP           TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID             HOST            CLIENT-ID
first-group     second-topic    0          0               0               0               console-consumer-a69... /127.0.0.1      console-consumer

此时,first-topic 已不在消费者组中。

查看 Kafka 中的 Topic 列表:

$ kafka-topics.sh --bootstrap-server localhost:9092 --list
__consumer_offsets
first-topic
second-topic

可以看到 first-topic 仍然存在于 Kafka 中,只是不再被消费者组消费。


5. 总结

在本文中,我们介绍了如何使用 Kafka 提供的命令行工具从消费者组中移除一个特定的 Topic。

我们首先搭建了一个 Kafka 单节点环境,创建了多个 Topic 并加入了同一个消费者组。随后演示了 Kafka 如何在消费者离线时积累消息,以及如何使用 --delete-offsets 参数清理偏移量信息,从而实现从消费者组中彻底移除某个 Topic。

关键点总结:

  • 使用 --delete-offsets 可以移除消费者组中某个 Topic 的偏移量。
  • Topic 本身不会被删除,只是不再被消费者组消费。
  • Kafka 内部使用 __consumer_offsets Topic 来记录消费者偏移量。

注意:

  • 该操作不可逆,一旦执行,相关 Topic 的偏移量将被清除。
  • 如果消费者组后续再次消费该 Topic,会从默认位置(如 earliest 或 latest)开始读取。

⚠️ 建议:

  • 在生产环境中执行此操作前,务必确认不再需要消费该 Topic 的历史数据。

原始标题:How to Remove a Specific Topic From a Kafka Consumer Group