1. 概述
在这个教程中,我们将探讨在Project Reactor中处理异常的几种方法。代码示例中引入的运算符定义在Mono
和Flux
类中。然而,我们将主要关注Flux
类中的方法。
2. Maven依赖
首先,添加Reactor核心依赖项:
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-core</artifactId
<version>3.6.0</version>
</dependency>
3. 直接在管道运算符中抛出异常
处理异常的最简单方式是直接抛出。如果流元素处理过程中发生异常情况,我们可以使用throw
关键字像正常方法执行一样抛出一个异常。
假设我们需要将一个流的String
转换为Integer
。如果元素不是一个数值String
,我们需要抛出一个异常。
通常的做法是使用map
运算符进行此类转换:
Function<String, Integer> mapper = input -> {
if (input.matches("\\D")) {
throw new NumberFormatException();
} else {
return Integer.parseInt(input);
}
};
Flux<String> inFlux = Flux.just("1", "1.5", "2");
Flux<Integer> outFlux = inFlux.map(mapper);
如我们所见,如果输入元素无效,运算符会抛出异常。当我们这样抛出异常时,Reactor捕获它并向下游发出错误信号:
StepVerifier.create(outFlux)
.expectNext(1)
.expectError(NumberFormatException.class)
.verify();
这种方法可行,但不优雅。根据Reactive Streams规范,规则2.13,运算符必须正常返回。Reactor通过将异常转换为错误信号帮了我们一把。然而,我们可以做得更好。
本质上,反应式流依赖于onError
方法来指示失败条件。在大多数情况下,这种条件必须由对Publisher
的error
方法的调用来触发。在这种情况下使用异常,又回到了传统的编程方式。
4. 在handle
运算符中处理异常
类似于map
运算符,我们可以使用handle
运算符逐个处理流中的项目。不同的是,Reactor为handle
运算符提供了一个输出槽,使我们能够应用更复杂的转换。
让我们更新上一节的示例,使用handle
运算符:
BiConsumer<String, SynchronousSink<Integer>> handler = (input, sink) -> {
if (input.matches("\\D")) {
sink.error(new NumberFormatException());
} else {
sink.next(Integer.parseInt(input));
}
};
Flux<String> inFlux = Flux.just("1", "1.5", "2");
Flux<Integer> outFlux = inFlux.handle(handler);
与map
运算符不同,**handle
运算符接收一个函数式消费者,每个元素只调用一次。这个消费者有两个参数:来自上游的元素和一个用于构建要发送到下游的输出的SynchronousSink
**。
如果输入元素是数值String
,我们在sink上调用next
方法,提供从输入转换得到的Integer
。如果不是数值String
,我们会通过调用带有异常对象的error
方法来表示这种情况。
请注意,调用error
方法会取消对上游的订阅,并在下游调用onError
方法。这是反应式流中处理异常的标准方式。
让我们验证输出流:
StepVerifier.create(outFlux)
.expectNext(1)
.expectError(NumberFormatException.class)
.verify();
5. 在flatMap
运算符中处理异常
另一个常用于支持错误处理的运算符是flatMap
(https://projectreactor.io/docs/core/release/api/reactor/core/publisher/Flux.html#flatMap-java.util.function.Function-)。这个运算符将输入元素转换为Publisher
,然后将这些Publisher
合并成一个新的流。我们可以利用这些Publisher
来表示错误状态。
让我们尝试使用flatMap
来处理同一个例子:
Function<String, Publisher<Integer>> mapper = input -> {
if (input.matches("\\D")) {
return Mono.error(new NumberFormatException());
} else {
return Mono.just(Integer.parseInt(input));
}
};
Flux<String> inFlux = Flux.just("1", "1.5", "2");
Flux<Integer> outFlux = inFlux.flatMap(mapper);
StepVerifier.create(outFlux)
.expectNext(1)
.expectError(NumberFormatException.class)
.verify();
不出所料,结果与之前相同。
注意关于错误处理,handle
和flatMap
之间的唯一区别在于,handle
运算符在sink上调用error
方法,而flatMap
在Publisher
上调用。
如果我们正在处理由Flux
对象表示的流,也可以使用concatMap
(https://projectreactor.io/docs/core/release/api/reactor/core/publisher/Flux.html#concatMap-java.util.function.Function-)来处理错误。此方法的行为与flatMap
类似,但不支持异步处理。
6. 避免NullPointerException
本节讨论如何处理可能导致NullPointerException
的null
引用,这是Java中常见的异常。为了避免这个异常,我们通常会比较变量是否为null
,如果变量确实为null
,则引导执行到不同的路径。在反应式流中做同样的事情很有诱惑力:
Function<String, Integer> mapper = input -> {
if (input == null) {
return 0;
} else {
return Integer.parseInt(input);
}
};
我们可能会认为NullPointerException
不会发生,因为我们已经处理了输入值为null
的情况。然而,实际情况并非如此:
Flux<String> inFlux = Flux.just("1", null, "2");
Flux<Integer> outFlux = inFlux.map(mapper);
StepVerifier.create(outFlux)
.expectNext(1)
.expectError(NullPointerException.class)
.verify();
显然,一个NullPointerException
触发了下游的错误,这意味着我们的null
检查没有起作用。
要理解为什么会这样,我们需要回到Reactive Streams规范。规范的规则2.13指出:“调用onSubscribe
、onNext
、onError
或onComplete
必须正常返回,除非提供的参数之一为null
,在这种情况下,必须向调用者抛出java.lang.NullPointerException
。”
**按照规范的要求,当null
值到达map
函数时,Reactor会抛出NullPointerException
**。
因此,当某个流达到一定阶段时,我们无法处理或在传递给下游之前将其转换为非null
值。因此,避免NullPointerException
的唯一方法是确保null
值不会进入管道。
7. 总结
在这篇文章中,我们探讨了Project Reactor中的异常处理。我们讨论了一些示例并明确了过程。我们也涵盖了处理反应式流时可能遇到的一种特殊异常——NullPointerException
。
如往常一样,我们的应用程序源代码可以在GitHub上找到。