1. 概述

本文将深入介绍 Kafka 提供的 MockConsumer——一个专为测试设计的消费者实现类。

我们会先分析测试 Kafka 消费者时需要关注的核心点,然后通过实际示例展示如何利用 MockConsumer 快速构建可靠、轻量的单元测试。✅

2. 为什么需要测试 Kafka Consumer?

典型的 Kafka 消费逻辑包含两个关键步骤:

  1. 订阅主题(subscribe)或手动分配分区(assign)
  2. 在循环中调用 poll() 方法拉取消息批次

以下是一个典型的消费循环示例:

void consume() {
    try {
        consumer.subscribe(Arrays.asList("foo", "bar"));
        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
            records.forEach(record -> processRecord(record));
        }
    } catch (WakeupException ex) {
        // ignore for shutdown
    } catch (RuntimeException ex) {
        // exception handling
    } finally {
        consumer.close();
    }
}

从这段代码可以看出,我们至少需要验证以下几个方面:

  • ✅ 是否正确订阅了指定 topic
  • ✅ 消息是否被正常拉取并处理
  • ✅ 异常是否被捕获和处理(如 WakeupException、KafkaException)
  • close() 是否在 finally 块中被调用,防止资源泄漏

可选的测试方案对比

方案 优点 缺点
内嵌 Kafka 实例 接近真实环境 ❌ 启动慢、资源占用高、测试不稳定
Mock 框架(如 Mockito) 轻量、快速 ❌ 需手动模拟 poll()subscribe() 等行为,容易出错
Kafka 自带 MockConsumer ✅ 完整实现 Consumer 接口,行为逼真,易用 基本无缺点,推荐首选

🔥 结论:MockConsumer 是测试 Kafka 消费者的最佳选择——简单粗暴且高效。

3. 使用 MockConsumer 实战

MockConsumerkafka-clients 库中提供的一个真实类,它实现了标准的 Consumer 接口。这意味着你可以把它当作真正的 KafkaConsumer 来用,但所有行为都是可编程控制的。

示例场景:国家人口数据消费者

我们假设有一个服务,消费 Kafka 中的国家人口更新消息:

class CountryPopulation {
    private String country;
    private Integer population;

    // standard constructor, getters and setters
}

对应的消费者实现如下:

public class CountryPopulationConsumer {

    private Consumer<String, Integer> consumer;
    private java.util.function.Consumer<Throwable> exceptionConsumer;
    private java.util.function.Consumer<CountryPopulation> countryPopulationConsumer;

    // standard constructor

    void startBySubscribing(String topic) {
        consume(() -> consumer.subscribe(Collections.singleton(topic)));
    }

    void startByAssigning(String topic, int partition) {
        consume(() -> consumer.assign(Collections.singleton(new TopicPartition(topic, partition))));
    }

    private void consume(Runnable beforePollingTask) {
        try {
            beforePollingTask.run();
            while (true) {
                ConsumerRecords<String, Integer> records = consumer.poll(Duration.ofMillis(1000));
                StreamSupport.stream(records.spliterator(), false)
                    .map(record -> new CountryPopulation(record.key(), record.value()))
                    .forEach(countryPopulationConsumer);
                consumer.commitSync();
            }
        } catch (WakeupException e) {
            System.out.println("Shutting down...");
        } catch (RuntimeException ex) {
            exceptionConsumer.accept(ex);
        } finally {
            consumer.close();
        }
    }

    public void stop() {
        consumer.wakeup();
    }
}

接下来我们将为这个消费者编写测试用例。

3.1 创建 MockConsumer 实例

测试前的通用初始化:

@BeforeEach
void setUp() {
    consumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST);
    updates = new ArrayList<>();
    countryPopulationConsumer = new CountryPopulationConsumer(consumer, 
      ex -> this.pollException = ex, updates::add);
}

⚠️ 注意:创建 MockConsumer 时必须指定 OffsetResetStrategy,通常使用 EARLIESTLATEST

我们通过 updates 收集消费到的数据,通过 pollException 捕获异常,便于后续断言。


3.2 测试 assign 分区模式

@Test
void whenStartingByAssigningTopicPartition_thenExpectUpdatesAreConsumedCorrectly() {
    // GIVEN
    consumer.schedulePollTask(() -> consumer.addRecord(record(TOPIC, PARTITION, "Romania", 19_410_000)));
    consumer.schedulePollTask(() -> countryPopulationConsumer.stop());

    HashMap<TopicPartition, Long> startOffsets = new HashMap<>();
    TopicPartition tp = new TopicPartition(TOPIC, PARTITION);
    startOffsets.put(tp, 0L);
    consumer.updateBeginningOffsets(startOffsets);

    // WHEN
    countryPopulationConsumer.startByAssigning(TOPIC, PARTITION);

    // THEN
    assertThat(updates).hasSize(1);
    assertThat(consumer.closed()).isTrue();
}

关键点解析:

  • 不能在 assign 前添加 recordMockConsumer 会校验 record 是否属于已 assign 的分区。
  • ✅ 解决方案:使用 schedulePollTask() 延迟添加 record,确保 assign 已完成。
  • updateBeginningOffsets() 设置起始 offset,避免因 offset 不匹配导致无法消费。
  • ✅ 第二次 poll 时调用 stop() 触发 WakeupException,退出无限循环。

3.3 测试 subscribe 订阅模式

@Test
void whenStartingBySubscribingToTopic_thenExpectUpdatesAreConsumedCorrectly() {
    // GIVEN
    consumer.schedulePollTask(() -> {
        consumer.rebalance(Collections.singletonList(new TopicPartition(TOPIC, 0)));
        consumer.addRecord(record("Romania", 1000, TOPIC, 0));
    });
    consumer.schedulePollTask(() -> countryPopulationConsumer.stop());

    HashMap<TopicPartition, Long> startOffsets = new HashMap<>();
    TopicPartition tp = new TopicPartition(TOPIC, 0);
    startOffsets.put(tp, 0L);
    consumer.updateBeginningOffsets(startOffsets);

    // WHEN
    countryPopulationConsumer.startBySubscribing(TOPIC);

    // THEN
    assertThat(updates).hasSize(1);
    assertThat(consumer.closed()).isTrue();
}

与 assign 的区别:

  • ✅ 必须先调用 rebalance() 模拟消费者组重平衡,否则 poll() 不会返回数据。
  • ⚠️ rebalance() 参数传入当前消费者应分配到的 TopicPartition 列表。

3.4 控制 polling 循环的三种方式

方式 方法 适用场景
✅ 调度任务 schedulePollTask(Runnable) 添加 record、触发 rebalance、模拟异常等
✅ 唤醒中断 wakeup() 模拟正常关闭流程
✅ 抛出异常 setPollException(Throwable) 验证异常处理逻辑

示例:测试异常处理

@Test
void whenStartingBySubscribingToTopicAndExceptionOccurs_thenExpectExceptionIsHandledCorrectly() {
    // GIVEN
    consumer.schedulePollTask(() -> consumer.setPollException(new KafkaException("poll exception")));
    consumer.schedulePollTask(() -> countryPopulationConsumer.stop());

    HashMap<TopicPartition, Long> startOffsets = new HashMap<>();
    TopicPartition tp = new TopicPartition(TOPIC, 0);
    startOffsets.put(tp, 0L);
    consumer.updateBeginningOffsets(startOffsets);

    // WHEN
    countryPopulationConsumer.startBySubscribing(TOPIC);

    // THEN
    assertThat(pollException).isInstanceOf(KafkaException.class).hasMessage("poll exception");
    assertThat(consumer.closed()).isTrue();
}

💡 踩坑提示:setPollException 设置的异常会在下一次 poll() 调用时抛出,非常适合模拟网络中断、序列化失败等场景。


3.5 模拟 end offsets 与分区元数据

如果你的消费逻辑依赖 end offsetspartitionsFor(),也可以用 MockConsumer 模拟:

// 模拟 end offset
TopicPartition tp = new TopicPartition(TOPIC, 0);
consumer.updateEndOffsets(Collections.singletonMap(tp, 100L));

// 模拟分区信息
List<PartitionInfo> partitions = Arrays.asList(
    new PartitionInfo(TOPIC, 0, null, null, null),
    new PartitionInfo(TOPIC, 1, null, null, null)
);
consumer.updatePartitions(TOPIC, partitions);

这样调用 position()endOffsets()partitionsFor() 时就能返回预设值,无需依赖真实 Kafka 集群。

4. 总结

MockConsumer 是测试 Kafka 消费者逻辑的利器,具备以下优势:

  • ✅ 行为真实:完整实现 Consumer 接口,无需手动 mock 复杂逻辑
  • ✅ 轻量快速:无需启动 Kafka 集群,测试秒级运行
  • ✅ 精准控制:可编程控制 record、offset、rebalance、异常等
  • ✅ 易于集成:与 JUnit、AssertJ 等主流测试框架无缝配合

📌 建议:所有 Kafka 消费者单元测试都应优先使用 MockConsumer,避免走通 Kafka 集群的“重方案”,提升测试稳定性和开发效率。

文中所有代码示例均可在 GitHub 获取:https://github.com/yourname/kafka-testing-samples


原始标题:Using Kafka MockConsumer