概述

使用Apache Kafka的客户端应用程序通常可以分为两类:生产者和消费者。无论是生产者还是消费者,它们在开始生产和消费工作之前,都需要底层的Kafka服务器运行。

本文将学习一些策略来检查Kafka服务器是否正在运行。

2. 使用Zookeeper命令

快速了解是否有活跃broker的一个方法是使用Zookeeper的dump命令。**dump命令是可用于管理Zookeeper服务器的4LW命令之一。**

让我们使用nc命令通过监听2181端口的Zookeeper服务器发送dump命令:

$ echo dump | nc localhost 2181 | grep -i broker | xargs
/brokers/ids/0

执行命令后,我们会看到注册在Zookeeper服务器上的临时broker ID列表。如果没有临时ID存在,则说明没有broker节点运行。

此外,需要注意的是,dump命令需要在通常位于zookeeper.propertieszoo.cfg配置文件中的配置中明确启用:

lw.commands.whitelist=dump

另一种选择是,我们也可以使用Zookeeper API来获取集群中活跃broker的列表

3. 使用Apache Kafka的AdminClient

如果我们的生产者或消费者是Java应用,我们可以利用Apache Kafka的AdminClient类来判断Kafka服务器是否运行。

首先,定义一个KafkaAdminClient类,封装AdminClient类的实例,以便于快速测试代码:

public class KafkaAdminClient {
    private final AdminClient client;

    public KafkaAdminClient(String bootstrap) {
        Properties props = new Properties();
        props.put("bootstrap.servers", bootstrap);
        props.put("request.timeout.ms", 3000);
        props.put("connections.max.idle.ms", 5000);

        this.client = AdminClient.create(props);
    }
}

接下来,在KafkaAdminClient类中定义verifyConnection()方法,用于验证客户端是否能连接到运行中的broker服务器:

public boolean verifyConnection() throws ExecutionException, InterruptedException {
    Collection<Node> nodes = this.client.describeCluster()
      .nodes()
      .get();
    return nodes != null && nodes.size() > 0;
}

最后,通过连接运行中的Kafka集群来测试我们的代码:

@Test
void givenKafkaIsRunning_whenCheckedForConnection_thenConnectionIsVerified() throws Exception {
    boolean alive = kafkaAdminClient.verifyConnection();
    assertThat(alive).isTrue();
}

4. 使用kcat实用工具

我们可以使用kcat(以前称为kafkacat)命令来确定是否有运行的Kafka broker节点。为此,使用-L选项显示现有主题的元数据

$ kcat -b localhost:9092 -t demo-topic -L
Metadata for demo-topic (from broker -1: localhost:9092/bootstrap):
 1 brokers:
  broker 0 at 192.168.1.53:9092 (controller)
 1 topics:
  topic "demo-topic" with 1 partitions:
    partition 0, leader 0, replicas: 0, isrs: 0

然后,当broker节点不运行时执行相同的命令:

$ kcat -b localhost:9092 -t demo-topic -L -m 1
%3|1660579562.937|FAIL|rdkafka#producer-1| [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap: Connect to ipv4#127.0.0.1:9092 failed: Connection refused (after 1ms in state CONNECT)
% ERROR: Failed to acquire metadata: Local: Broker transport failure (Are the brokers reachable? Also try increasing the metadata timeout with -m <timeout>?)

在这种情况下,由于没有运行的broker节点,我们将收到“连接拒绝”错误。另外,我们还必须注意,通过使用-m选项限制请求超时为1秒,我们能够快速失败。

5. 使用UI工具

对于不需要自动检查的实验性PoC项目,我们可以依赖于UI工具,如Offset Explorer。然而,如果需要验证企业级Kafka客户端的broker节点状态,这种方法并不推荐。

让我们使用Offset Explorer使用Zookeeper主机和端口详细信息连接到Kafka集群:

左侧边栏会显示运行中的broker列表。就这样,我们只需点击一下就能完成。

6. 总结

在这篇教程中,我们探讨了使用Zookeeper命令、Apache的AdminClientkcat工具的命令行方法,以及基于UI的验证Kafka服务器状态的方法。

一如既往,完整教程的源代码可在GitHub上找到。