概述
使用Apache Kafka的客户端应用程序通常可以分为两类:生产者和消费者。无论是生产者还是消费者,它们在开始生产和消费工作之前,都需要底层的Kafka服务器运行。
本文将学习一些策略来检查Kafka服务器是否正在运行。
2. 使用Zookeeper命令
快速了解是否有活跃broker的一个方法是使用Zookeeper的dump
命令。**dump
命令是可用于管理Zookeeper服务器的4LW命令之一。**
让我们使用nc
命令通过监听2181端口的Zookeeper服务器发送dump
命令:
$ echo dump | nc localhost 2181 | grep -i broker | xargs
/brokers/ids/0
执行命令后,我们会看到注册在Zookeeper服务器上的临时broker ID列表。如果没有临时ID存在,则说明没有broker节点运行。
此外,需要注意的是,dump
命令需要在通常位于zookeeper.properties
或zoo.cfg
配置文件中的配置中明确启用:
lw.commands.whitelist=dump
另一种选择是,我们也可以使用Zookeeper API来获取集群中活跃broker的列表。
3. 使用Apache Kafka的AdminClient
如果我们的生产者或消费者是Java应用,我们可以利用Apache Kafka的AdminClient
类来判断Kafka服务器是否运行。
首先,定义一个KafkaAdminClient
类,封装AdminClient
类的实例,以便于快速测试代码:
public class KafkaAdminClient {
private final AdminClient client;
public KafkaAdminClient(String bootstrap) {
Properties props = new Properties();
props.put("bootstrap.servers", bootstrap);
props.put("request.timeout.ms", 3000);
props.put("connections.max.idle.ms", 5000);
this.client = AdminClient.create(props);
}
}
接下来,在KafkaAdminClient
类中定义verifyConnection()
方法,用于验证客户端是否能连接到运行中的broker服务器:
public boolean verifyConnection() throws ExecutionException, InterruptedException {
Collection<Node> nodes = this.client.describeCluster()
.nodes()
.get();
return nodes != null && nodes.size() > 0;
}
最后,通过连接运行中的Kafka集群来测试我们的代码:
@Test
void givenKafkaIsRunning_whenCheckedForConnection_thenConnectionIsVerified() throws Exception {
boolean alive = kafkaAdminClient.verifyConnection();
assertThat(alive).isTrue();
}
4. 使用kcat
实用工具
我们可以使用kcat
(以前称为kafkacat
)命令来确定是否有运行的Kafka broker节点。为此,使用-L
选项显示现有主题的元数据:
$ kcat -b localhost:9092 -t demo-topic -L
Metadata for demo-topic (from broker -1: localhost:9092/bootstrap):
1 brokers:
broker 0 at 192.168.1.53:9092 (controller)
1 topics:
topic "demo-topic" with 1 partitions:
partition 0, leader 0, replicas: 0, isrs: 0
然后,当broker节点不运行时执行相同的命令:
$ kcat -b localhost:9092 -t demo-topic -L -m 1
%3|1660579562.937|FAIL|rdkafka#producer-1| [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap: Connect to ipv4#127.0.0.1:9092 failed: Connection refused (after 1ms in state CONNECT)
% ERROR: Failed to acquire metadata: Local: Broker transport failure (Are the brokers reachable? Also try increasing the metadata timeout with -m <timeout>?)
在这种情况下,由于没有运行的broker节点,我们将收到“连接拒绝”错误。另外,我们还必须注意,通过使用-m
选项限制请求超时为1秒,我们能够快速失败。
5. 使用UI工具
对于不需要自动检查的实验性PoC项目,我们可以依赖于UI工具,如Offset Explorer。然而,如果需要验证企业级Kafka客户端的broker节点状态,这种方法并不推荐。
让我们使用Offset Explorer使用Zookeeper主机和端口详细信息连接到Kafka集群:
左侧边栏会显示运行中的broker列表。就这样,我们只需点击一下就能完成。
6. 总结
在这篇教程中,我们探讨了使用Zookeeper命令、Apache的AdminClient
和kcat
工具的命令行方法,以及基于UI的验证Kafka服务器状态的方法。
一如既往,完整教程的源代码可在GitHub上找到。