1. 概述

在本教程中,我们将简要介绍Apache Kafka ,然后了解如何以编程方式在 Kafka 集群中创建和配置主题。

2.卡夫卡简介

Apache Kafka 是一个功能强大、高性能的分布式事件流平台。

通常,生产者应用程序将事件发布到 Kafka,而消费者则订阅这些事件以读取和处理它们。 Kafka 使用 主题来存储和分类这些事件, 例如,在电子商务应用程序中,可能有一个“订单”主题。

Kafka 主题是分区的,它将数据分布在多个代理之间以实现可扩展性。它们可以被复制,以使数据具有容错性和高可用性。即使在消费后,主题也会根据需要保留事件。这一切都是 通过 Kafka 命令行工具和键值配置按主题进行管理的

但是,除了命令行工具之外, Kafka 还提供了管理 API来管理和检查主题、代理和其他 Kafka 对象 。在我们的示例中,我们将使用此 API 来创建新主题。

3. 依赖关系

要使用 Admin API,我们将kafka-clients 依赖项添加到 pom.xml 中:

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>2.8.0</version>
</dependency>

4. 设置卡夫卡

在创建新主题之前,我们至少需要一个单节点的Kafka集群。

在本教程中,我们将使用Testcontainers框架来实例化 Kafka 容器。然后,我们可以运行可靠且独立的集成测试,这些测试不依赖于外部 Kafka 服务器的运行。为此,我们还需要两个专门用于测试的依赖项。

首先,让我们将Testcontainers Kafka 依赖项添加到 pom.xml 中:

<dependency>
    <groupId>org.testcontainers</groupId>
    <artifactId>kafka</artifactId>
    <version>1.15.3</version>
    <scope>test</scope>
</dependency>

接下来,我们将添加junit-jupiter 工件以使用 JUnit 5 运行 Testcontainer 测试:

<dependency>
    <groupId>org.testcontainers</groupId>
    <artifactId>junit-jupiter</artifactId>
    <version>1.15.3</version>
    <scope>test</scope>
</dependency>

现在我们已经配置了所有必要的依赖项,我们可以编写一个简单的应用程序来以编程方式创建新主题。

5. 管理API

让我们首先为本地代理创建一个具有最少配置的新 Properties 实例:

Properties properties = new Properties();
properties.put(
  AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_CONTAINER.getBootstrapServers()
);

现在我们可以获得一个 Admin 实例:

Admin admin = Admin.create(properties)

create 方法接受具有 bootstrap.servers 属性的 Properties 对象(或 Map) ,并返回一个线程安全实例。

管理客户端使用此属性来发现集群中的代理并随后执行任何管理操作。因此,通常包含两个或三个代理地址就足够了,以覆盖某些实例不可用的可能性。

AdminClientConfig 类包含所有管理客户端配置条目的常量。

6. 话题创建

首先,我们使用 Testcontainers 创建 JUnit 5 测试来验证主题创建是否成功。我们将利用Kafka 模块,该模块使用Confluence OSS 平台的官方 Kafka Docker 镜像:

@Test
void givenTopicName_whenCreateNewTopic_thenTopicIsCreated() throws Exception {
    kafkaTopicApplication.createTopic("test-topic");

    String topicCommand = "/usr/bin/kafka-topics --bootstrap-server=localhost:9092 --list";
    String stdout = KAFKA_CONTAINER.execInContainer("/bin/sh", "-c", topicCommand)
      .getStdout();

    assertThat(stdout).contains("test-topic");
}

在这里,Testcontainers 将在测试执行期间自动实例化和管理 Kafka 容器。我们只需调用应用程序代码并验证主题是否已在运行的容器中成功创建。

6.1.使用默认选项创建

主题分区和复制因子是新主题的关键考虑因素。我们将保持简单并使用 1 个分区和 1 的复制因子创建示例主题:

try (Admin admin = Admin.create(properties)) {
    int partitions = 1;
    short replicationFactor = 1;
    NewTopic newTopic = new NewTopic(topicName, partitions, replicationFactor);
    
    CreateTopicsResult result = admin.createTopics(
      Collections.singleton(newTopic)
    );

    KafkaFuture<Void> future = result.values().get(topicName);
    future.get();
}

在这里,我们使用了 管理员。 createTopics 方法使用默认选项创建一批新主题。 由于 Admin 接口扩展了 AutoCloseable 接口,我们使用了try-with-resources来执行我们的操作。这确保了资源得到适当的释放。

重要的是,此方法与 Controller Broker 通信并异步执行。返回的 CreateTopicsResult 对象公开一个 KafkaFuture, 用于访问请求批次中每个项目的结果。这遵循 Java 异步编程模式,并允许调用者使用 Future.get 方法获取操作结果。

对于同步行为,我们可以立即调用此方法来检索操作结果。这会阻塞,直到操作完成或失败。如果失败,则会导致 ExecutionException 包含根本原因。

6.2.使用选项创建

我们还可以使用 Admin 的重载形式来代替默认选项。 createTopics 方法并 通过 CreateTopicsOptions 对象 提供一些选项。我们可以使用它们来修改创建新主题时的管理客户端行为:

CreateTopicsOptions topicOptions = new CreateTopicsOptions()
  .validateOnly(true)
  .retryOnQuotaViolation(false);

CreateTopicsResult result = admin.createTopics(
  Collections.singleton(newTopic), topicOptions
);

在这里,我们将 validateOnly 选项设置为 true,这意味着客户端只会验证而不实际创建主题。同样, retryOnQuotaViolation 选项设置为 false,以便在配额违规时不会重试该操作。

6.3.新主题配置

Kafka 具有广泛的主题配置来控制主题行为,例如数据保留和压缩等。这些配置既有服务器默认值,也有可选的每个主题覆盖。

我们可以 通过使用新主题的配置映射来提供主题配置

// Create a compacted topic with 'lz4' compression codec
Map<String, String> newTopicConfig = new HashMap<>();
newTopicConfig.put(TopicConfig.CLEANUP_POLICY_CONFIG, TopicConfig.CLEANUP_POLICY_COMPACT);
newTopicConfig.put(TopicConfig.COMPRESSION_TYPE_CONFIG, "lz4");

NewTopic newTopic = new NewTopic(topicName, partitions, replicationFactor)
  .configs(newTopicConfig);

管理 API 中的 TopicConfig 类包含可用于在创建时配置主题的键。

7. 其他主题操作

除了创建新主题的功能外, Admin API 还具有删除列出描述主题的操作 。所有这些与主题相关的操作都遵循与我们在主题创建中看到的相同模式。

这些操作方法中的每一个都有一个重载版本,该版本将 xxxTopicOptions 对象作为输入。所有这些方法都返回相应的 xxxTopicsResult 对象。这反过来又提供了 KafkaFuture 来访问异步操作的结果。

最后,还值得一提的是,自从 Kafka 版本 0.11.0.0 中引入以来,管理 API 仍在不断发展,如 InterfaceStability.Evolving 注释所示。这意味着 API 将来可能会发生变化,并且次要版本可能会破坏兼容性。

八、结论

在本教程中,我们了解了如何使用 Java 管理客户端在 Kafka 中创建新主题。

最初,我们创建了一个带有默认选项的主题,然后带有显式选项。接下来,我们了解了如何使用各种属性来配置新主题。最后,我们简要介绍了使用管理客户端进行的其他与主题相关的操作。

在此过程中,我们还了解了如何使用 Testcontainers 通过测试来设置简单的单节点集群。

与往常一样,本文的完整源代码可以在 GitHub 上获取。