1. 概述

本教程介绍Project Reactor中的 mapflatMap 运算符。它们在 MonoFlux 类中定义,用于在处理流时转换项目。

在下面的部分中, 我们将重点关注 Flux 类中的 mapflatMap 方法Mono 类中同名的那些以同样的方式工作。

2.Maven依赖

要编写一些代码示例,我们需要Reactor 核心依赖项

<dependency>
    <groupId>io.projectreactor</groupId>
    <artifactId>reactor-core</artifactId>
    <version>3.3.9.RELEASE</version>
</dependency>

3. 地图 操作员

现在,让我们看看如何使用 地图 运算符。

Flux#map 方法需要一个 Function 参数,该参数可以简单如下:

Function<String, String> mapper = String::toUpperCase;

该映射器将字符串转换为其大写版本。我们可以将它应用到 Flux 流上:

Flux<String> inFlux = Flux.just("baeldung", ".", "com");
Flux<String> outFlux = inFlux.map(mapper);

给定的映射器 将输入流中的每个项目转换为输出中的新项目,并保留顺序

我们来证明一下:

StepVerifier.create(outFlux)
  .expectNext("BAELDUNG", ".", "COM")
  .expectComplete()
  .verify();

请注意,调用 map 方法时,不会执行映射器函数。相反, 它在我们订阅流时运行

4. flatMap 操作符

现在是时候转向 flatMap 运算符了。

4.1.代码示例

map 类似, flatMap 运算符有一个 Function 类型的参数。但是,与使用 map 的 函数不同, flatMap 映射器函数 将输入项转换为 Publisher 而不是普通对象。

这是一个例子:

Function<String, Publisher<String>> mapper = s -> Flux.just(s.toUpperCase().split(""));

在这种情况下,映射器函数将字符串转换为其大写版本,然后将其拆分为单独的字符。最后,该函数根据这些字符构建一个新的流。

我们现在可以将给定的映射器传递给 flatMap 方法:

Flux<String> inFlux = Flux.just("baeldung", ".", "com");
Flux<String> outFlux = inFlux.flatMap(mapper);

我们看到的平面映射操作使用三个字符串项从上游创建三个新流。之后,这三个流中的元素被分割并交织在一起形成另一个新流。最终的流包含来自所有三个输入字符串的字符。

然后我们可以订阅这个新形成的流来触发管道并验证输出:

List<String> output = new ArrayList<>();
outFlux.subscribe(output::add);
assertThat(output).containsExactlyInAnyOrder("B", "A", "E", "L", "D", "U", "N", "G", ".", "C", "O", "M");

请注意,由于来自不同来源的项目交错, 它们在输出中的顺序可能与我们在输入中看到的顺序不同

4.2.管道操作说明

我们刚刚定义了一个映射器,将其传递给一个 flatMap 运算符,并在流上调用该运算符。现在是时候深入研究细节并了解输出中的项目可能无序的原因了。

首先,我们要明确的是, 在流被订阅之前不会发生任何操作 。当发生这种情况时,管道执行并调用传递给 flatMap 方法的映射器函数。

此时,映射器对输入流中的元素执行必要的转换。 这些元素中的每一个都可以转换为多个项目,然后用于创建新的流 。在我们的代码示例中,表达式Flux.just(s.toUpperCase().split(""))的值指示这样的流。

一旦新的流(由 Publisher 实例表示)准备就绪, flatMap 就会急切地订阅。 操作员不会等待发布者完成即可继续下一个流 ,这意味着订阅是非阻塞的。

由于管道同时处理所有派生流,因此它们的项目可能随时进入。结果,原来的顺序就丢失了。如果项目的顺序很重要,请考虑使用 flatMapSequential 运算符。

5.mapflatMap 的区别

到目前为止,我们已经介绍了 mapflatMap 运算符。让我们总结一下它们之间的主要区别。

5.1.一对一与一对多

map 运算符对流元素应用一对一转换,而 flatMap 则执行一对多转换 。查看方法签名时,这种区别很明显:

  • 通量 map(Function<? super T, ? extends V> mapper) – 映射器将 T 类型的单个值转换为 V 类型的单个值
  • 通量 flatMap(Function<? super T, ? extends Publisher<? extends R>> mapper) – 映射器将 T 类型的单个值转换为 R 类型元素的 Publisher

我们可以看到,从功能上来说,Project Reactor 中的 mapflatMap 之间的区别类似于Java Stream API 中 mapflatMap 之间的区别。

5.2.同步与异步

以下是 Reactor Core 库 API 规范的两个摘录:

  • map :通过对每个项目应用异步函数来转换此 Flux 发出的项目
  • flatMap :将此 Flux 发出的元素异步转换为 Publishers

很容易看出 map 是一个同步运算符 ——它只是一种将一个值转换为另一个值的方法。此方法与调用者在同一线程中执行。

另一种说法——flatMap 异步的 ——不太清楚。事实上,元素到 发布者 的转换可以是同步的,也可以是异步的。

在我们的示例代码中,该操作是同步的,因为我们使用 Flux#just 方法发出元素。但是, 在处理引入高延迟的源(例如远程服务器)时,异步处理是更好的选择

重要的一点是,管道并不关心元素来自哪个线程——它只关注发布者本身。

六,结论

在本文中,我们介绍了 Project Reactor 中的 mapflatMap 运算符。我们讨论了几个例子并阐明了这个过程。

与往常一样,我们的应用程序的源代码可以在 GitHub 上获取。