1. 概述

在这个教程中,我们将探讨在Project Reactor中处理异常的几种方法。代码示例中引入的运算符定义在MonoFlux类中。然而,我们将主要关注Flux类中的方法

2. Maven依赖

首先,添加Reactor核心依赖项:

<dependency>
    <groupId>io.projectreactor</groupId>
    <artifactId>reactor-core</artifactId
    <version>3.6.0</version>
</dependency>

3. 直接在管道运算符中抛出异常

处理异常的最简单方式是直接抛出。如果流元素处理过程中发生异常情况,我们可以使用throw关键字像正常方法执行一样抛出一个异常

假设我们需要将一个流的String转换为Integer。如果元素不是一个数值String,我们需要抛出一个异常。

通常的做法是使用map运算符进行此类转换:

Function<String, Integer> mapper = input -> {
    if (input.matches("\\D")) {
        throw new NumberFormatException();
    } else {
        return Integer.parseInt(input);
    }
};

Flux<String> inFlux = Flux.just("1", "1.5", "2");
Flux<Integer> outFlux = inFlux.map(mapper);

如我们所见,如果输入元素无效,运算符会抛出异常。当我们这样抛出异常时,Reactor捕获它并向下游发出错误信号

StepVerifier.create(outFlux)
    .expectNext(1)
    .expectError(NumberFormatException.class)
    .verify();

这种方法可行,但不优雅。根据Reactive Streams规范,规则2.13,运算符必须正常返回。Reactor通过将异常转换为错误信号帮了我们一把。然而,我们可以做得更好。

本质上,反应式流依赖于onError方法来指示失败条件。在大多数情况下,这种条件必须由对Publishererror方法的调用来触发。在这种情况下使用异常,又回到了传统的编程方式。

4. 在handle运算符中处理异常

类似于map运算符,我们可以使用handle运算符逐个处理流中的项目。不同的是,Reactor为handle运算符提供了一个输出槽,使我们能够应用更复杂的转换

让我们更新上一节的示例,使用handle运算符:

BiConsumer<String, SynchronousSink<Integer>> handler = (input, sink) -> {
    if (input.matches("\\D")) {
        sink.error(new NumberFormatException());
    } else {
        sink.next(Integer.parseInt(input));
    }
};

Flux<String> inFlux = Flux.just("1", "1.5", "2");
Flux<Integer> outFlux = inFlux.handle(handler);

map运算符不同,**handle运算符接收一个函数式消费者,每个元素只调用一次。这个消费者有两个参数:来自上游的元素和一个用于构建要发送到下游的输出的SynchronousSink**。

如果输入元素是数值String,我们在sink上调用next方法,提供从输入转换得到的Integer。如果不是数值String,我们会通过调用带有异常对象的error方法来表示这种情况。

请注意,调用error方法会取消对上游的订阅,并在下游调用onError方法。这是反应式流中处理异常的标准方式。

让我们验证输出流:

StepVerifier.create(outFlux)
    .expectNext(1)
    .expectError(NumberFormatException.class)
    .verify();

5. 在flatMap运算符中处理异常

另一个常用于支持错误处理的运算符是flatMaphttps://projectreactor.io/docs/core/release/api/reactor/core/publisher/Flux.html#flatMap-java.util.function.Function-)。这个运算符将输入元素转换为Publisher,然后将这些Publisher合并成一个新的流。我们可以利用这些Publisher来表示错误状态。

让我们尝试使用flatMap来处理同一个例子:

Function<String, Publisher<Integer>> mapper = input -> {
    if (input.matches("\\D")) {
        return Mono.error(new NumberFormatException());
    } else {
        return Mono.just(Integer.parseInt(input));
    }
};

Flux<String> inFlux = Flux.just("1", "1.5", "2");
Flux<Integer> outFlux = inFlux.flatMap(mapper);

StepVerifier.create(outFlux)
    .expectNext(1)
    .expectError(NumberFormatException.class)
    .verify();

不出所料,结果与之前相同。

注意关于错误处理,handleflatMap之间的唯一区别在于,handle运算符在sink上调用error方法,而flatMapPublisher上调用

如果我们正在处理由Flux对象表示的流,也可以使用concatMaphttps://projectreactor.io/docs/core/release/api/reactor/core/publisher/Flux.html#concatMap-java.util.function.Function-)来处理错误。此方法的行为与flatMap类似,但不支持异步处理。

6. 避免NullPointerException

本节讨论如何处理可能导致NullPointerExceptionnull引用,这是Java中常见的异常。为了避免这个异常,我们通常会比较变量是否为null,如果变量确实为null,则引导执行到不同的路径。在反应式流中做同样的事情很有诱惑力:

Function<String, Integer> mapper = input -> {
    if (input == null) {
        return 0;
    } else {
        return Integer.parseInt(input);
    }
};

我们可能会认为NullPointerException不会发生,因为我们已经处理了输入值为null的情况。然而,实际情况并非如此:

Flux<String> inFlux = Flux.just("1", null, "2");
Flux<Integer> outFlux = inFlux.map(mapper);

StepVerifier.create(outFlux)
    .expectNext(1)
    .expectError(NullPointerException.class)
    .verify();

显然,一个NullPointerException触发了下游的错误,这意味着我们的null检查没有起作用

要理解为什么会这样,我们需要回到Reactive Streams规范。规范的规则2.13指出:“调用onSubscribeonNextonErroronComplete必须正常返回,除非提供的参数之一为null,在这种情况下,必须向调用者抛出java.lang.NullPointerException。”

**按照规范的要求,当null值到达map函数时,Reactor会抛出NullPointerException**。

因此,当某个流达到一定阶段时,我们无法处理或在传递给下游之前将其转换为非null值。因此,避免NullPointerException的唯一方法是确保null值不会进入管道

7. 总结

在这篇文章中,我们探讨了Project Reactor中的异常处理。我们讨论了一些示例并明确了过程。我们也涵盖了处理反应式流时可能遇到的一种特殊异常——NullPointerException

如往常一样,我们的应用程序源代码可以在GitHub上找到。