1. Overview

Kafka is an open-source distributed streaming middleware that decouples message producers and consumers using the publish-subscribe pattern. Messaging, monitoring, metrics collection, and stream processing are just a few examples of Kafka’s use cases.

Information is distributed by topics in Kafka. Each topic is a particular stream of data. Producers write data to topics, while consumers read data from topics. It’s possible to group multiple consumers to form a consumer group.

In this tutorial, we’ll discuss how to remove a specific topic from a consumer group in Kafka using the command line. The version of Kafka we use in the examples is 3.7.0.

2. Brief Information About Consumer Groups

A Kafka consumer can read messages from a topic individually. However, we can also join multiple Kafka consumers in a consumer group. Each consumer in a group reads from exclusive partitions, i.e., only one consumer reads from a partition, while the others don’t read from the same partition. Therefore, the consumer group reads the topics as a whole.

Kafka checks how much of the data in a partition has been read by a consumer in a consumer group using consumer offsets. Consumer offsets let consumers in a consumer group read messages from a topic without loss of data when an existing consumer crashes or new consumers join the group. Consumers in a consumer group produce internal topics periodically to inform about the consumer offsets.

3. Example Setup

We’ll set up a Kafka cluster consisting of a single Kafka server. All of the scripts we use come with the installation of Kafka.

3.1. Starting the Kafka Server

We’ll start the Kafka server using Kafka Raft (KRaft). First, we need to generate a cluster identifier using the kafka-storage.sh script:

$ kafka-storage.sh random-uuid
pNdgxrKvQAaU9x-a_dXY0A

The random-uuid option of *kafka-storage.*sh generates a UUID (Universally Unique Identifier) so that we can use it as a cluster identifier while running Kafka in the KRaft mode. kafka-storage.sh prints the identifier in the terminal.

Next, we format the storage using kafka-storage.sh again:

$ kafka-storage.sh format -t pNdgxrKvQAaU9x-a_dXY0A -c /home/baeldung/work/kafka/config/kraft/server.properties
metaPropertiesEnsemble=MetaPropertiesEnsemble(metadataLogDir=Optional.empty, dirs={/tmp/kraft-combined-logs: EMPTY})
Formatting /tmp/kraft-combined-logs with metadata.version 3.7-IV4.

We pass the previously generated cluster identifier and a configuration file to kafka-storage.sh using its format option to format the Kafka log directories. The configuration file is server.properties which Kafka provides. kafka-storage.sh creates and formats the /tmp/kraft-combined-logs directory, as evidenced by the last line of the output.

Now, let’s start a Kafka server using the kafka-server-start.sh script:

$ kafka-server-start.sh /home/baeldung/work/kafka/config/kraft/server.properties
[2024-05-22 02:51:23,808] INFO Registered kafka:type=kafka.Log4jController MBean (kafka.utils.Log4jControllerRegistration$)
...

We pass the configuration file, server.properties, as an argument to kafka-server-start.sh. The Kafka server starts successfully.

3.2. Creating Topics

We create topics using the kafka-topics.sh script:

$ kafka-topics.sh --bootstrap-server localhost:9092 --create --topic first-topic
Created topic first-topic.

We specify the Kafka server to connect to using the –bootstrap-server option of kafka-topics.sh. Kafka listens for client connections on port 9092 by default. So, we pass localhost:9092 as the address of the Kafka server.

The –create option specifies the creation of a topic. We set the topic’s name using the –topic option. The name of the topic we create is first-topic.

Let’s create one more topic, named second-topic:

$ kafka-topics.sh --bootstrap-server localhost:9092 --create --topic second-topic
Created topic second-topic.

Let’s check the creation of the topics using the –describe option of kafka-topics.sh:

$ kafka-topics.sh --bootstrap-server localhost:9092 --describe
Topic: first-topic    TopicId: fExg0TR8SM6YTzsGoPixEA    PartitionCount: 1    ReplicationFactor: 1    Configs: segment.bytes=1073741824
    Topic: first-topic    Partition: 0    Leader: 1    Replicas: 1    Isr: 1
Topic: second-topic    TopicId: LVi3UHPxRciawbnR41WWcQ    PartitionCount: 1    ReplicationFactor: 1    Configs: segment.bytes=1073741824
    Topic: second-topic    Partition: 0    Leader: 1    Replicas: 1    Isr: 1

The –describe option lists the details of available topics. There are two topics, first-topic and second-topic, in the cluster. Each topic has only one partition by default. However, while creating the topic, we can specify the number of partitions using the –partitions* options of *kafka-topics.sh.

3.3. Starting Consumers in a Consumer Group

Let’s start a consumer using the kafka-console-consumer.sh script:

$ kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic first-topic --group first-group

The meaning of –bootstrap-server localhost:9092 is the same as it was for kafka-topics.sh — it specifies the server to connect to. The –topic option specifies the name of the topic we want to consume, which is first-topic. Finally, the –group option specifies the group identifier of the consumer group. Therefore, our consumer is in a consumer group whose group identifier is first-group.

The consumer starts and waits to receive messages from the first-topic topic. Let’s create another consumer in the same consumer group that wants to read from second-topic:

$ kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic second-topic --group first-group

Now, let’s check the existence of the consumer group using the kafka-consumer-groups.sh script:

$ kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list
first-group

The –list option of the kafka-consumer-groups.sh script displays the consumer groups in the Kafka cluster. Obviously, the only consumer group is first-group.

3.4. Starting a Producer

Next, let’s start a producer using the kafka-console-producer.sh script:

$ kafka-console-producer.sh --bootstrap-server localhost:9092 --topic first-topic
>

The meaning of –bootstrap-server localhost:9092 is the same as before. The –topic option specifies the name of the topic we want to produce: first-topic. The arrowhead symbol, >, shows that we’re ready to produce topics.

3.5. Producing and Consuming Topics

Let’s now send messages to first-topic using the already-running kafka-console-producer.sh script:

$ kafka-console-producer.sh --bootstrap-server localhost:9092 --topic first-topic
>Hello1
>Hello2
>

We sent two messages, Hello1 and Hello2. Let’s check the consumer waiting for messages from first-topic:

$ kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic first-topic --group first-group
Hello1
Hello2

As we see from the output, the producer and consumer are working successfully.

3.6. Accumulating Messages Within Kafka

We can get more detailed information about a consumer group using the –describe option of kafka-consumer-groups.sh:

$ kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group first-group

GROUP           TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID             HOST            CLIENT-ID
first-group     first-topic     0          2               2               0               console-consumer-753... /127.0.0.1      console-consumer
first-group     second-topic    0          0               0               0               console-consumer-a69... /127.0.0.1      console-consumer

We truncate the consumer identifiers in the CONSUMER-ID column. The LAG column in the output is particularly important for us. It shows the number of messages that haven’t been processed yet by the consumers in the consumer group. Its value is 0 for first-topic since we’ve already consumed the two messages, Hello1 and Hello2. Its value is also 0 for second-topic as we haven’t written any messages to this topic yet.

Let’s now stop the consumer waiting for messages from first-topic by pressing Ctrl+C and then send two new messages, Hello3 and Hello4, to first-topic using the already-running producer:

$ kafka-console-producer.sh --bootstrap-server localhost:9092 --topic first-topic
>Hello1
>Hello2
>Hello3
>Hello4
>

Let’s check the details of the offset lags once more:

$ kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group first-group

GROUP           TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID             HOST            CLIENT-ID
first-group     second-topic    0          0               0               0               console-consumer-a69... /127.0.0.1      console-consumer
first-group     first-topic     0          2               4               2

As the output shows, there isn’t any consumer for first-topic. Notably, the value in the LAG column for first-topic is 2, indicating that two messages haven’t been processed yet. Those messages are Hello3 and Hello4. Kafka starts accumulating the messages in first-topic to deliver them to consumers joining the consumer group later.

If we’re sure we don’t need to read messages from first-topic anymore, accumulating messages for first-topic in the Kafka cluster is unnecessary. Additionally, if the number of those messages increases over time, we may have performance degradation in the long run. Therefore, we may want to remove first-topic from the consumer group in such a case.

4. Using the —delete-offsets Option of kafka-consumer-groups.sh

We can use the –delete-offsets option of kafka-consumer-groups.sh for removing a specific topic from a consumer group. Basically, it deletes the consumer offsets for a specific topic. It’s useful when our consumer group reads from multiple topics and we’d like to exclude a specific topic.

Let’s now delete the consumer offsets for first-topic:

$ kafka-consumer-groups.sh --bootstrap-server localhost:9092 --delete-offsets --group first-group --topic first-topic
Request succeed for deleting offsets with topic first-topic group first-group

TOPIC                          PARTITION       STATUS         
first-topic                    0               Successful

Consequently, we’re successful in removing first-topic from first-group. Let’s check it using the –describe option of kafka-consumer-groups.sh:

$ kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group first-group

GROUP           TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID             HOST            CLIENT-ID
first-group     second-topic    0          0               0               0               console-consumer-a69... /127.0.0.1      console-consumer

The only topic now in the consumer group is second-topic.

Finally, let’s check the topics in the Kafka cluster:

$ kafka-topics.sh --bootstrap-server localhost:9092 --list
__consumer_offsets
first-topic
second-topic

Apparently, first-topic still exists within the cluster together with second-topic. There’s one more topic, namely __consumer_offsets, listed in the output. This is an internal topic used by Kafka. Kafka uses __consumer_offsets for keeping track of the last successfully processed messages (consumer offsets) within a consumer group.

5. Conclusion

In this article, we discussed how to remove a specific topic from a consumer group in Kafka using the command line.

We started with an introduction to consumer groups in Kafka. Then, we set up a Kafka cluster using the command-line utilities Kafka provides. We learned that Kafka accumulates the messages if there are no consumers for them within a consumer group.

Finally, we saw how to use the –delete-offsets option of the kafka-consumer-groups.sh script to remove a topic from a consumer group.