1. Overview

In this tutorial, we’ll explore how the Kafka Consumer retrieves messages from the broker. We’ll learn the configurable properties that can directly impact how many messages the Kafka Consumer reads at once. Finally, we’ll explore how adjusting these settings affects the Consumer‘s behavior.

2. Setting up the Environment

Kafka Consumers are fetching records for a given partition in batches of configurable sizes. We cannot configure the exact number of records to be fetched in one batch, but we can configure the size of these batches, measured in bytes.

For the code snippets in this article, we’ll need a simple Spring application that uses the kafka-clients library to interact with the Kafka broker. We’ll create a Java class that internally uses a KafkaConsumer to subscribe to a topic and log the incoming messages. If you want to dive deeper, feel free to read through our article dedicated to the Kafka Consumer API and follow along.

One of the key differences in our example will be the logging: Instead of logging one message at a time, let’s collect them and log the whole batch. This way, we’ll be able to see exactly how many messages are fetched for each poll(). Additionally, let’s enrich the log by incorporating details like the initial and final offsets of the batch along with the consumer’s groupId:

class VariableFetchSizeKafkaListener implements Runnable {
    private final String topic;
    private final KafkaConsumer<String, String> consumer;
    
    // constructor

    @Override
    public void run() {
        consumer.subscribe(singletonList(topic));
        int pollCount = 1;

        while (true) {
            List<ConsumerRecord<String, String>> records = new ArrayList<>();
            for (var record : consumer.poll(ofMillis(500))) {
                records.add(record);
            }
            if (!records.isEmpty()) {
                String batchOffsets = String.format("%s -> %s", records.get(0).offset(), records.get(records.size() - 1).offset());
                String groupId = consumer.groupMetadata().groupId();
                log.info("groupId: {}, poll: #{}, fetched: #{} records, offsets: {}", groupId, pollCount++, records.size(), batchOffsets);
            }
        }
    }
}

The Testcontainers library will help us set up the test environment by spinning up a Docker container with a running Kafka broker. If you want to learn more about setting up the Testcontainer’s Kafka module, check out how we configured the test environment here and follow along.

In our particular case, we can define an additional method that publishes several messages on a given topic. For instance, let’s assume we are streaming values read by a temperature sensor to a topic named “engine.sensor.temperature“:

void publishTestData(int recordsCount, String topic) {
    List<ProducerRecord<String, String>> records = IntStream.range(0, recordsCount)
      .mapToObj(__ -> new ProducerRecord<>(topic, "key1", "temperature=255F"))
      .collect(toList());
    // publish all to kafka
}

As we can see, we have used the same key for all the messages. As a result, all records will be sent to the same partition. For payload, we’ve used a short, fixed text depicting a temperature measurement.

3. Testing the Default Behaviour

Let’s start by creating a Kafka listener using the default consumer configuration. Then, we’ll publish a few messages to see how many batches our listener consumes. As we can see, our custom listener uses the Consumer API internally. As a result, to instantiate VariableFetchSizeKafkaListener, we’ll have to configure and create a KafkaConsumer first:

Properties props = new Properties();
props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_CONTAINER.getBootstrapServers());
props.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "default_config");
KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(props);

For now, we’ll use KafkaConsumer‘s default values for the minimum and maximum fetch sizes. Based on this consumer, we can instantiate our listener and run it asynchronously to avoid blocking the main thread:

CompletableFuture.runAsync(
  new VariableFetchSizeKafkaListener(topic, kafkaConsumer)
);

Finally, let’s block the test thread for a few seconds, giving some time for the listener to consume the messages. The goal of this article is to start the listeners and observe how they perform. We’ll use the Junit5 tests as a convenient way of setting up and exploring their behavior, but for simplicity, we won’t include any specific assertions. As a result, this will be our starting @Test:

@Test
void whenUsingDefaultConfiguration_thenProcessInBatchesOf() throws Exception {
    String topic = "engine.sensors.temperature";
    publishTestData(300, topic);

    Properties props = new Properties();
    props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_CONTAINER.getBootstrapServers());
    props.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
    props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
    props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
    props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "default_config");
    KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(props);

    CompletableFuture.runAsync(
      new VariableFetchSizeKafkaListener(topic, kafkaConsumer)
    );

    Thread.sleep(5_000L);
}

Now, let’s run the test and inspect the logs to see how many records will be fetched in a single batch:

10:48:46.958 [ForkJoinPool.commonPool-worker-2] INFO  c.b.k.c.VariableFetchSizeKafkaListener - groupId: default_config, poll: #1, fetched: #300 records, offsets: 0 -> 299

As we can notice, we fetched all the 300 records in a single batch because our messages are small. Both the key and body are short strings: the key is four characters, and the body is 16 characters long. That’s a total of 20 bytes, plus some extra for the record’s metadata. On the other hand, the default value for the maximum batch size is one mebibyte (1.024 x 1.024 bytes), or simply 1,048,576 bytes.

4. Configuring Maximum Partition Fetch Size

The “max.partition.fetch.bytes” in Kafka determines the largest amount of data that a consumer can fetch from a single partition in a single request. Consequently, even for a small number of short messages, we can force our listeners to fetch the records in multiple batches by changing the property.

To observe this, let’s create two moreVariableFetchSizeKafkaListeners and configure them setting this property to only 500B, respectively 5KB. Firstly, let’s extract all the common consumer Properties in a dedicated  method to avoid code duplication:

Properties commonConsumerProperties() {
    Properties props = new Properties();
    props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_CONTAINER.getBootstrapServers());
    props.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
    props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
    props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
    return props;
}

Then, let’s create the first listener and run it asynchronously:

Properties fetchSize_500B = commonConsumerProperties();
fetchSize_500B.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "max_fetch_size_500B");
fetchSize_500B.setProperty(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, "500");
CompletableFuture.runAsync(
  new VariableFetchSizeKafkaListener("engine.sensors.temperature", new KafkaConsumer<>(fetchSize_500B))
);

As we can see, we are setting different consumer group IDs for the various listeners. This will allow them to consume the same test data. Now, let’s proceed with the second listener and complete the test:

@Test
void whenChangingMaxPartitionFetchBytesProperty_thenAdjustBatchSizesWhilePolling() throws Exception {
    String topic = "engine.sensors.temperature";
    publishTestData(300, topic);
    
    Properties fetchSize_500B = commonConsumerProperties();
    fetchSize_500B.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "max_fetch_size_500B");
    fetchSize_500B.setProperty(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, "500");
    CompletableFuture.runAsync(
      new VariableFetchSizeKafkaListener(topic, new KafkaConsumer<>(fetchSize_500B))
    );

    Properties fetchSize_5KB = commonConsumerProperties();
    fetchSize_5KB.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "max_fetch_size_5KB");
    fetchSize_5KB.setProperty(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, "5000");
    CompletableFuture.runAsync(
      new VariableFetchSizeKafkaListener(topic, new KafkaConsumer<>(fetchSize_5KB))
    );

    Thread.sleep(10_000L);
}

If we run this test, we can assume that the first consumer will fetch batches roughly ten times smaller than the second consumer. Let’s analyze the logs:

[worker-3] INFO - groupId: max_fetch_size_5KB, poll: #1, fetched: #56 records, offsets: 0 -> 55
[worker-2] INFO - groupId: max_fetch_size_500B, poll: #1, fetched: #5 records, offsets: 0 -> 4
[worker-2] INFO - groupId: max_fetch_size_500B, poll: #2, fetched: #5 records, offsets: 5 -> 9
[worker-3] INFO - groupId: max_fetch_size_5KB, poll: #2, fetched: #56 records, offsets: 56 -> 111
[worker-2] INFO - groupId: max_fetch_size_500B, poll: #3, fetched: #5 records, offsets: 10 -> 14
[worker-3] INFO - groupId: max_fetch_size_5KB, poll: #3, fetched: #56 records, offsets: 112 -> 167
[worker-2] INFO - groupId: max_fetch_size_500B, poll: #4, fetched: #5 records, offsets: 15 -> 19
[worker-3] INFO - groupId: max_fetch_size_5KB, poll: #4, fetched: #51 records, offsets: 168 -> 218
[worker-2] INFO - groupId: max_fetch_size_500B, poll: #5, fetched: #5 records, offsets: 20 -> 24
[...]

As expected, one of the listeners fetches, indeed, batches of data that are almost ten times larger than the other. Moreover, it’s important to understand that the number of records within a batch depends on the size of these records and their metadata. To highlight this, we can observe that the consumer with groupId  “max_fetch_size_5KB” fetched fewer records when polling the fourth time.

5. Configuring Minimum Fetch Size

The Consumer API also allows customizing the minimum fetch size through the “fetch.min.bytes” property. We can change this property to specify the minimum amount of data that a broker needs to respond. If this minimum value is not met, the broker waits longer before sending a response to the consumer’s fetch request. To emphasize this, we can add a delay to our test publisher within our test helper method. As a result, the producer will wait a specific number of milliseconds between sending each message:

@Test
void whenChangingMinFetchBytesProperty_thenAdjustWaitTimeWhilePolling() throws Exception {
    String topic = "engine.sensors.temperature";
    publishTestData(300, topic, 100L);  
    // ...
}

void publishTestData(int measurementsCount, String topic, long delayInMillis) {
    // ...
}

Let’s start by creating a VariableFetchSizeKafkaListener that will use the default configuration, having “fetch.min.bytes” equal to one byte. Similar to the previous examples, we’ll run this consumer asynchronously within a CompletableFuture:

// fetch.min.bytes = 1 byte (default)
Properties minFetchSize_1B = commonConsumerProperties();
minFetchSize_1B.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "min_fetch_size_1B");
CompletableFuture.runAsync(
  new VariableFetchSizeKafkaListener(topic, new KafkaConsumer<>(minFetchSize_1B))
);

With this setup, and due to the delay we introduced, we can expect each record to be retrieved individually, one after the other. In other words, we can expect many batches of a single record. Also, we expect these batches to be consumed at a similar speed as our KafkaProducer publishes the data, which in our case is every 100 milliseconds. Let’s run the test and analyze the logs:

14:23:22.368 [worker-2] INFO - groupId: min_fetch_size_1B, poll: #1, fetched: #1 records, offsets: 0 -> 0
14:23:22.472 [worker-2] INFO - groupId: min_fetch_size_1B, poll: #2, fetched: #1 records, offsets: 1 -> 1
14:23:22.582 [worker-2] INFO - groupId: min_fetch_size_1B, poll: #3, fetched: #1 records, offsets: 2 -> 2
14:23:22.689 [worker-2] INFO - groupId: min_fetch_size_1B, poll: #4, fetched: #1 records, offsets: 3 -> 3
[...]

Moreover, we can force the consumer to wait for more data to accumulate by adjusting the “fetch.min.bytes” value to a larger size:

// fetch.min.bytes = 500 bytes
Properties minFetchSize_500B = commonConsumerProperties();
minFetchSize_500B.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "mim_fetch_size_500B");
minFetchSize_500B.setProperty(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, "500");
CompletableFuture.runAsync(
  new VariableFetchSizeKafkaListener(topic, new KafkaConsumer<>(minFetchSize_500B))
);

With the property set to 500 bytes, we can anticipate the consumer to wait longer and fetch more data. Let’s run this example as well and observe the outcomes:

14:24:49.303 [worker-3] INFO - groupId: mim_fetch_size_500B, poll: #1, fetched: #6 records, offsets: 0 -> 5
14:24:49.812 [worker-3] INFO - groupId: mim_fetch_size_500B, poll: #2, fetched: #4 records, offsets: 6 -> 9
14:24:50.315 [worker-3] INFO - groupId: mim_fetch_size_500B, poll: #3, fetched: #5 records, offsets: 10 -> 14
14:24:50.819 [worker-3] INFO - groupId: mim_fetch_size_500B, poll: #4, fetched: #5 records, offsets: 15 -> 19
[...]

6. Conclusion

In this article, we discussed the way KafkaConsumers are fetching data from the broker. We learned that, by default, the consumer will fetch data if there is at least one new record. On the other hand, if the new data from the partition exceeds 1,048,576 bytes, it will be split into multiple batches of that maximum size. We discovered that customizing the “fetch.min.bytes” and “max.partition.fetch.bytes” properties allows us to tailor Kafka’s behavior to suit our specific requirements.

As always, the source for the examples is available over on GitHub.