一、简介

Apache Kafka是最流行的开源分布式容错流处理系统。 Kafka Consumer 提供了处理消息的基本功能。 Kafka Streams还在 Kafka Consumer 客户端之上提供实时流处理。

在本教程中,我们将解释 Kafka Streams 的功能,以使流处理体验变得简单轻松。

2. Streams 和 Consumer API 之间的区别

2.1.卡夫卡消费者API

简而言之, Kafka Consumer API允许应用程序处理来自主题的消息。 它提供了与它们交互的基本组件,包括以下功能

  • 消费者和生产者的责任分离
  • 单件加工
  • 批处理支持
  • 仅无状态支持。客户端不保留先前的状态并单独评估流中的每个记录
  • 编写应用程序需要大量代码
  • 不使用线程或并行性
  • 可以在多个Kafka集群中写入

2.2.卡夫卡流API

Kafka Streams极大地简化了主题的流处理。 它构建在 Kafka 客户端库之上,提供数据并行、分布式协调、容错和可扩展性 。它将消息作为无界、连续、实时的记录流来处理,具有以下特征:

  • 用于消费和生产的单个 Kafka Stream
  • 进行复杂的处理
  • 不支持批量处理
  • 支持无状态和有状态操作
  • 编写应用程序只需几行代码
  • 线程和并行性
  • 仅与单个 Kafka 集群交互
  • 将流分区和任务作为存储和传输消息的逻辑单元

Kafka Streams 使用分区和任务的概念 作为与主题分区紧密相关的逻辑单元 。此外,它使用线程在应用程序实例内并行处理。支持的另一个重要功能是状态存储,Kafka Streams 使用它来存储和查询来自主题的数据。最后,Kafka Streams API 与集群交互,但它并不直接在集群之上运行。

在接下来的部分中,我们将重点关注与基本 Kafka 客户端不同的四个方面:流表二元性、Kafka 流域特定语言 (DSL)、Exactly-Once 处理语义 (EOS) 和交互式查询。

2.3.依赖关系

为了实现这些示例,我们只需将Kafka Consumer APIKafka Streams API依赖项添加到 pom.xml 中:

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>2.8.0</version>
</dependency>

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-streams</artifactId>
    <version>2.8.0</version>
 </dependency>

3. 流表对偶性

Kafka Streams 支持流,但也支持可双向转换的表。 这就是所谓的流表二元性。表格是一组不断变化的事实。每个新事件都会覆盖旧事件,而流是不可变事实的集合。

流处理来自主题的完整数据流。表通过聚合来自流的信息来存储状态。让我们想象一下卡夫卡数据建模中描述的下棋游戏。连续移动的流被聚合到一个表中,我们可以从一种状态转换到另一种状态:

3.1. KStreamKTableGlobalKTable

Kafka Streams 为 Streams 和 Tables 提供了两种抽象KStream 处理记录流。另一方面, KTable 使用给定键的最新状态来管理变更日志流。每条数据记录代表一次更新。

对于未分区的表还有另一种抽象。我们可以使用 GlobalKTables 向所有任务广播信息或进行连接,而无需重新分区输入数据。

我们可以将主题读取并反序列化为流:

StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> textLines = 
  builder.stream(inputTopic, Consumed.with(Serdes.String(), Serdes.String()));

还可以读取主题来跟踪以表格形式收到的最新单词:

KTable<String, String> textLinesTable = 
  builder.table(inputTopic, Consumed.with(Serdes.String(), Serdes.String()));

最后,我们可以使用全局表读取主题:

GlobalKTable<String, String> textLinesGlobalTable = 
  builder.globalTable(inputTopic, Consumed.with(Serdes.String(), Serdes.String()));

4.Kafka Streams DSL

Kafka Streams DSL 是一种声明式和函数式编程风格 。它构建在Streams Processor API之上。该语言为上一节中提到的流和表提供了内置抽象。

此外,它还支持无状态( 映射过滤器 等)和有状态转换( 聚合连接窗口 )。因此,只需几行代码就可以实现流处理操作。

4.1.无状态转换

无状态转换不需要状态来进行处理。同样,流处理器中也不需要状态存储。示例操作包括 filtermapflatMapgroupBy

现在让我们看看如何将值映射为大写,从主题中过滤它们并将它们存储为流:

KStream<String, String> textLinesUpperCase =
  textLines
    .map((key, value) -> KeyValue.pair(value, value.toUpperCase()))
    .filter((key, value) -> value.contains("FILTER"));

4.2.状态转换

有状态转换依赖于状态来完成处理操作。消息的处理取决于其他消息(状态存储)的处理。换句话说,任何表或状态存储都可以使用更改日志主题来恢复。

有状态转换的一个例子是字数统计算法:

KTable<String, Long> wordCounts = textLines
  .flatMapValues(value -> Arrays.asList(value
    .toLowerCase(Locale.getDefault()).split("\\W+")))
  .groupBy((key, word) -> word)
    .count(Materialized.<String, Long, KeyValueStore<Bytes, byte[]>> as("counts-store"));

我们将把这两个字符串发送到主题:

String TEXT_EXAMPLE_1 = "test test and test";
String TEXT_EXAMPLE_2 = "test filter filter this sentence";

结果是:

Word: and -> 1
Word: test -> 4
Word: filter -> 2
Word: this -> 1
Word: sentence -> 1

DSL 涵盖了多种转换功能。我们可以 连接 或合并具有相同键的两个输入流/表以生成新的流/表。我们还能够 聚合 或合并流/表中的多个记录到新表中的一条记录中。最后,可以应用 窗口化 ,对连接或聚合函数中具有相同键的记录进行分组。

使用 5 秒窗口连接的示例会将两个流中按键分组的记录合并到一个流中:

KStream<String, String> leftRightSource = leftSource.outerJoin(rightSource,
  (leftValue, rightValue) -> "left=" + leftValue + ", right=" + rightValue,
    JoinWindows.of(Duration.ofSeconds(5))).groupByKey()
      .reduce(((key, lastValue) -> lastValue))
  .toStream();

因此,我们将输入左流 value=leftkey=1 以及右流 value=rightkey=2 。结果如下:

(key= 1) -> (left=left, right=null)
(key= 2) -> (left=null, right=right)

对于聚合示例,我们将计算单词计数算法,但使用每个单词的前两个字母作为键:

KTable<String, Long> aggregated = input
  .groupBy((key, value) -> (value != null && value.length() > 0)
    ? value.substring(0, 2).toLowerCase() : "",
    Grouped.with(Serdes.String(), Serdes.String()))
  .aggregate(() -> 0L, (aggKey, newValue, aggValue) -> aggValue + newValue.length(),
    Materialized.with(Serdes.String(), Serdes.Long()));

具有以下条目:

"one", "two", "three", "four", "five"

输出是:

Word: on -> 3
Word: tw -> 3
Word: th -> 5
Word: fo -> 4
Word: fi -> 4

5. 一次性处理语义(EOS)

有时我们需要确保消费者只读取消息一次。 Kafka 引入了将消息包含到事务中的功能,以通过Transactional API实现 EOS。 Kafka Streams 从 0.11.0 版本开始涵盖了相同的功能。

要在 Kafka Streams 中配置 EOS,我们将包含以下属性:

streamsConfiguration.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG,
  StreamsConfig.EXACTLY_ONCE);

6. 互动查询

交互式查询允许查询分布式环境中应用程序的状态 。这意味着能够从本地存储以及多个实例上的远程存储中提取信息。基本上,我们将收集所有商店并将它们分组在一起以获得应用程序的完整状态。

让我们看一个使用交互式查询的示例。首先,我们将定义处理拓扑,在我们的例子中是字数统计算法:

KStream<String, String> textLines = 
  builder.stream(TEXT_LINES_TOPIC, Consumed.with(Serdes.String(), Serdes.String()));

final KGroupedStream<String, String> groupedByWord = textLines
  .flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+")))
  .groupBy((key, word) -> word, Grouped.with(stringSerde, stringSerde));

接下来,我们将为所有计算出的字数创建一个状态存储(键值):

groupedByWord
  .count(Materialized.<String, Long, KeyValueStore<Bytes, byte[]>>as("WordCountsStore")
  .withValueSerde(Serdes.Long()));

然后,我们可以查询键值存储:

ReadOnlyKeyValueStore<String, Long> keyValueStore =
  streams.store(StoreQueryParameters.fromNameAndType(
    "WordCountsStore", QueryableStoreTypes.keyValueStore()));

KeyValueIterator<String, Long> range = keyValueStore.all();
while (range.hasNext()) {
    KeyValue<String, Long> next = range.next();
    System.out.println("count for " + next.key + ": " + next.value);
}

该示例的输出如下:

Count for and: 1
Count for filter: 2
Count for sentence: 1
Count for test: 4
Count for this: 1

七、结论

在本教程中,我们展示了 Kafka Streams 在从 Kafka 主题检索消息时如何简化处理操作。它极大地简化了处理 Kafka 中的流时的实现。不仅适用于无状态处理,还适用于有状态转换。

当然,不使用Kafka Streams也可以完美构建消费者应用程序。但我们需要手动实现一堆免费提供的额外功能。

与往常一样,代码可以在 GitHub 上获取。