1. 概述

Kafka消费者组滞后是 任何基于Kafka的事件驱动系统的关键性能指标

在本教程中,我们将构建一个分析器应用程序来监控 Kafka 消费者延迟。

2. 消费者滞后

消费者滞后只是 消费者最后提交的偏移量和生产者在日志中的最终偏移量之间的增量 。换句话说,消费者延迟衡量任何生产者-消费者系统中生产和消费消息之间的延迟。

在本节中,让我们了解如何确定偏移值。

2.1. Kafka 管理客户端

要检查消费者组的偏移值我们需要管理 Kafka 客户端 。因此,让我们在 LagAnalyzerService 类中编写一个方法来创建 AdminClient 类的实例:

private AdminClient getAdminClient(String bootstrapServerConfig) {
    Properties config = new Properties();
    config.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServerConfig);
    return AdminClient.create(config);
}

我们必须注意使用 @Value 注释从属性文件中检索引导服务器列表。以同样的方式,我们将使用此注释来获取其他值,例如 groupId 和 topicName

2.2.消费者群体抵消

首先,我们可以 使用 AdminClient 类的 listConsumerGroupOffsets() 方法来获取特定消费者组id的偏移量信息

接下来,我们的重点主要是偏移值,因此我们可以调用 partitionsToOffsetAndMetadata() 方法来获取TopicPartition与 OffsetAndMetadata 值的映射:

private Map<TopicPartition, Long> getConsumerGrpOffsets(String groupId) 
  throws ExecutionException, InterruptedException {
    ListConsumerGroupOffsetsResult info = adminClient.listConsumerGroupOffsets(groupId);
    Map<TopicPartition, OffsetAndMetadata> topicPartitionOffsetAndMetadataMap = info.partitionsToOffsetAndMetadata().get();

    Map<TopicPartition, Long> groupOffset = new HashMap<>();
    for (Map.Entry<TopicPartition, OffsetAndMetadata> entry : topicPartitionOffsetAndMetadataMap.entrySet()) {
        TopicPartition key = entry.getKey();
        OffsetAndMetadata metadata = entry.getValue();
        groupOffset.putIfAbsent(new TopicPartition(key.topic(), key.partition()), metadata.offset());
    }
    return groupOffset;
}

最后,我们可以注意到 topicPartitionOffsetAndMetadataMap 上的迭代将我们获取的结果限制为每个主题和分区的偏移值。

2.3.生产者抵消

找到消费者组滞后的唯一剩下的事情就是获取最终偏移值的方法。为此,我们可以使用 KafkaConsumer 类的 endOffsets() 方法。

首先,我们在 LagAnalyzerService 类中创建 KafkaConsumer 类的实例:

private KafkaConsumer<String, String> getKafkaConsumer(String bootstrapServerConfig) {
    Properties properties = new Properties();
    properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServerConfig);
    properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
    properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
    return new KafkaConsumer<>(properties);
}

接下来,让我们聚合消费者组偏移量中所有相关的 TopicPartition 值,我们需要计算延迟,以便我们将其作为参数提供给 endOffsets() 方法:

private Map<TopicPartition, Long> getProducerOffsets(Map<TopicPartition, Long> consumerGrpOffset) {
    List<TopicPartition> topicPartitions = new LinkedList<>();
    for (Map.Entry<TopicPartition, Long> entry : consumerGrpOffset.entrySet()) {
        TopicPartition key = entry.getKey();
        topicPartitions.add(new TopicPartition(key.topic(), key.partition()));
    }
    return kafkaConsumer.endOffsets(topicPartitions);
}

最后,让我们编写一个方法,使用消费者偏移量和生产者的 结束偏移量 来生成每个 TopicPartition 的滞后:

private Map<TopicPartition, Long> computeLags(
  Map<TopicPartition, Long> consumerGrpOffsets,
  Map<TopicPartition, Long> producerOffsets) {
    Map<TopicPartition, Long> lags = new HashMap<>();
    for (Map.Entry<TopicPartition, Long> entry : consumerGrpOffsets.entrySet()) {
        Long producerOffset = producerOffsets.get(entry.getKey());
        Long consumerOffset = consumerGrpOffsets.get(entry.getKey());
        long lag = Math.abs(producerOffset - consumerOffset);
        lags.putIfAbsent(entry.getKey(), lag);
    }
    return lags;
}

3. 滞后分析器

现在,让我们通过在 LagAnalyzerService 类中编写 analyzeLag() 方法来协调滞后分析:

public void analyzeLag(String groupId) throws ExecutionException, InterruptedException {
    Map<TopicPartition, Long> consumerGrpOffsets = getConsumerGrpOffsets(groupId);
    Map<TopicPartition, Long> producerOffsets = getProducerOffsets(consumerGrpOffsets);
    Map<TopicPartition, Long> lags = computeLags(consumerGrpOffsets, producerOffsets);
    for (Map.Entry<TopicPartition, Long> lagEntry : lags.entrySet()) {
        String topic = lagEntry.getKey().topic();
        int partition = lagEntry.getKey().partition();
        Long lag = lagEntry.getValue();
        LOGGER.info("Time={} | Lag for topic = {}, partition = {}, groupId = {} is {}",
          MonitoringUtil.time(),
          topic,
          partition,
          lag);
    }
}

然而,当涉及到监控滞后指标时,我们需要一个 几乎实时的滞后值,以便我们可以采取任何管理措施来恢复系统性能

实现此目的的一种直接方法是 定期轮询滞后值 。因此,让我们创建一个 LiveLagAnalyzerService 服务,它将调用 LagAnalyzerService 的 analyzeLag() 方法

@Scheduled(fixedDelay = 5000L)
public void liveLagAnalysis() throws ExecutionException, InterruptedException {
    lagAnalyzerService.analyzeLag(groupId);
}

出于我们的目的,我们使用 @Scheduled 注释将轮询频率设置为 5 秒 。但是,为了实时监控,我们可能需要通过JMX进行访问。

4. 模拟

在本节中,我们将 模拟本地 Kafka 设置的 Kafka 生产者和消费者 ,以便我们可以看到 LagAnalyzer 的运行情况,而无需依赖外部 Kafka 生产者和消费者。

4.1.模拟模式

由于 仅出于演示目的才需要模拟模式 ,因此当我们想要针对真实场景运行滞后分析器应用程序时,我们应该有一种机制来将其关闭。

我们可以将其保留为 application.properties 资源文件中的可配置属性:

monitor.producer.simulate=true
monitor.consumer.simulate=true

我们将把这些属性插入 Kafka 生产者和消费者并控制他们的行为。

另外,我们定义生产者 startTimeendTime 和一个辅助方法 time() 来获取监控期间的当前时间:

public static final Date startTime = new Date();
public static final Date endTime = new Date(startTime.getTime() + 30 * 1000);

public static String time() {
    DateTimeFormatter dtf = DateTimeFormatter.ofPattern("yyyy/MM/dd HH:mm:ss");
    LocalDateTime now = LocalDateTime.now();
    String date = dtf.format(now);
    return date;
}

4.2.生产者-消费者配置

我们需要定义一些核心配置值来实例化 Kafka 消费者和生产者模拟器的实例。

首先,我们在 KafkaConsumerConfig 类中定义消费者模拟器的配置:

@Bean
public ConsumerFactory<String, String> consumerFactory() {
    Map<String, Object> props = new HashMap<>();
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 0);
    props.put(ConsumerConfig.GROUP_ID_CONFIG, this.groupId);
    return new DefaultKafkaConsumerFactory<>(props);
}

@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory(
  @Qualifier("consumerFactory") ConsumerFactory<String, String> consumerFactory) {
    ConcurrentKafkaListenerContainerFactory<String, String> listenerContainerFactory = 
      new ConcurrentKafkaListenerContainerFactory<>();
    listenerContainerFactory.setConsumerFactory(consumerFactory);
    return listenerContainerFactory;
}

接下来,我们可以在 KafkaProducerConfig 类中定义生产者模拟器的配置:

@Bean
public ProducerFactory<String, String> producerFactory() {
    Map<String, Object> configProps = new HashMap<>();
    configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
    configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    return new DefaultKafkaProducerFactory<>(configProps);
}

@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
    return new KafkaTemplate<>(producerFactory());
}

此外,我们使用 @KafkaListener 注解来指定目标侦听器,当然,只有当 monitor.consumer.simulate 设置为 true 时才启用:

@KafkaListener(
  topics = "${monitor.topic.name}",
  containerFactory = "kafkaListenerContainerFactory",
  autoStartup = "${monitor.consumer.simulate}")
public void listen(String message) throws InterruptedException {
    Thread.sleep(10L);
}

因此,我们添加了 10 毫秒的睡眠时间来人为地制造消费者延迟。

最后,我们编写一个 sendMessage() 方法来模拟生产者

@Scheduled(fixedDelay = 1L, initialDelay = 5L)
public void sendMessage() throws ExecutionException, InterruptedException {
    if (enabled) {
        if (endTime.after(new Date())) {
            String message = "msg-" + time();
            SendResult<String, String> result = kafkaTemplate.send(topicName, message).get();
        }
    }
}

我们可以注意到,生产者将以 1 条消息/毫秒的速率生成消息。此外,它会在模拟 开始时间30 秒结束时间 后停止生成消息。

4.3.实时监控

现在,让我们运行 LagAnalyzerApplication 中的 main 方法:

public static void main(String[] args) {
    SpringApplication.run(LagAnalyzerApplication.class, args);
    while (true) ;
}

每 30 秒后我们就会看到主题每个分区的当前延迟:

Time=2021/06/06 11:07:24 | Lag for topic = baeldungTopic, partition = 0, groupId =baeldungGrp is 93
Time=2021/06/06 11:07:29 | Lag for topic = baeldungTopic, partition = 0, groupId =baeldungGrp is 290
Time=2021/06/06 11:07:34 | Lag for topic = baeldungTopic, partition = 0, groupId =baeldungGrp is 776
Time=2021/06/06 11:07:39 | Lag for topic = baeldungTopic, partition = 0, groupId =baeldungGrp is 1159
Time=2021/06/06 11:07:44 | Lag for topic = baeldungTopic, partition = 0, groupId =baeldungGrp is 1559
Time=2021/06/06 11:07:49 | Lag for topic = baeldungTopic, partition = 0, groupId =baeldungGrp is 2015
Time=2021/06/06 11:07:54 | Lag for topic = baeldungTopic, partition = 0, groupId =baeldungGrp is 1231
Time=2021/06/06 11:07:59 | Lag for topic = baeldungTopic, partition = 0, groupId =baeldungGrp is 731
Time=2021/06/06 11:08:04 | Lag for topic = baeldungTopic, partition = 0, groupId =baeldungGrp is 231
Time=2021/06/06 11:08:09 | Lag for topic = baeldungTopic, partition = 0, groupId =baeldungGrp is 0

因此,生产者生成消息的速率为 1 条消息/毫秒,高于消费者消费消息的速率。因此, 滞后将在前 30 秒开始建立,之后生产者停止生产,因此滞后将逐渐下降到 0

5. 通过执行器端点监控消费者延迟

对于具有 Kafka 消费者的 Spring Boot 应用程序,我们可以使用 Micrometer 获取消费者滞后指标并将其公开给执行器端点。让我们看看如何做到这一点。

5.1.启用执行器端点

首先,我们需要在项目的 pom.xml 文件中添加 spring-boot-starter-actuator 依赖:

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-actuator</artifactId>
    <version>3.0.5</version>
</dependency>

现在,我们还 通过配置 application.properties 文件来启用 /actuator 端点

management.endpoints.web.base-path=/actuator
management.endpoints.web.exposure.include=*
management.endpoint.health.show-details=always

最后,我们还将应用程序的端口设置为不同于 8080

server.port=8081

我们必须注意,Zookeeper 进程在端口 8080 上运行 Web 控制台。因此,如果我们在本地计算机上运行 Zookeeper,则必须为 Spring Boot 应用程序使用不同的端口。

5.2.使用 Micrometer 配置指标

我们可以使用Micrometer库获取 Kafka 消费者指标。在本节中, 我们将公开 Prometheus 监控系统的消费者指标

首先,我们必须在项目的 pom.xml 文件中添加 micrometer-registry-prometheus 依赖项:

<dependency>
    <groupId>io.micrometer</groupId>
    <artifactId>micrometer-registry-prometheus</artifactId>
    <version>1.10.5</version>
</dependency>

接下来,让我们为我们的应用程序启用 JMX 和 /actuator/prometheus 端点:

management.endpoint.prometheus.enabled=true
spring.jmx.enabled=false

继续,让我们将 MeterRegistry 类的实例添加为 KafkaConsumerConfig 类的成员:

@Autowired
private MeterRegistry meterRegistry;

最后,我们准备将 MicrometerConsumerListener 添加到 ConsumerFactory bean

@Bean
public ConsumerFactory<String, String> consumerFactory() {
    Map<String, Object> props = getConsumerConfig(this.groupId);
    DefaultKafkaConsumerFactory<String, String> consumerFactory = new DefaultKafkaConsumerFactory<>(props);
    consumerFactory.addListener(new MicrometerConsumerListener<>(this.meterRegistry));
    return consumerFactory;
}

就是这样!我们已准备好运行应用程序并监控 Kafka 消费者指标。

5.3.监控消费者滞后指标

我们可以启动应用程序并访问 /actuator/prometheus 端点来查看 kafka_consumer_* 指标。在其他指标中, kafka_consumer_fetch_manager_records_lag 指标显示当前的滞后信息

kafka_consumer_fetch_manager_records_lag{client_id="consumer-baeldungGrp-2",kafka_version="3.3.1",partition="0",spring_id="consumerFactory.consumer-baeldungGrp-2",topic="baeldung",} 21447.0

此外,让我们编写一个脚本来定期获取延迟并以几乎实时的方式显示当前延迟:

$ while true
do
curl --silent -XGET http://localhost:8081/actuator/prometheus | \
awk '/kafka_consumer_fetch_manager_records_lag{/{print "Current lag is:",$2}'; 
sleep 5;
done
Current lag is: 11375.0
Current lag is: 10875.0
Current lag is: 10375.0
Current lag is: 9875.0

伟大的!我们已经成功集成了 Kafka 消费者指标并通过执行器端点公开它们。

六,结论

在本教程中,我们了解了如何查找 Kafka 主题上的消费者滞后。此外,我们利用这些知识 在 Spring 中创建了一个 LagAnalyzer 应用程序,该应用程序几乎实时显示延迟

与往常一样,本教程的完整源代码可在 GitHub 上获取。