1. 概述
在Reactor响应式编程中,我们可以通过多种方式创建 Mono 或 Flux 类型的发布者。在这里,我们将了解如何使用 defer 方法来延迟 Mono 发布者的执行。
2. 什么是 Mono.defer 方法?
我们可以使用 Mono 的 defer 方法创建一个最多可以产生一个值的冷发布者(cold publisher)。我们看一下方法签名:
public static <T> Mono<T> defer(Supplier<? extends Mono<? extends T>> supplier)
defer 接受 Mono 发布者的 Supplier ,并在下游订阅时延迟返回该 Mono 。
但问题是,什么是冷发布者(cold publisher)或懒发布者(lazy publisher)?让我们来看看。
仅当消费者订阅时,才产生数据的被称为冷发布者。而热发布者则在任何订阅之前都会生成数据。 我们有 Mono.just() 方法,它提供了 Mono 类型的热发布者。
3. 它是如何运作的?
让我们探讨一个具有 Mono 类型的 Supplier 示例:
private Mono<String> sampleMsg(String str) {
log.debug("Call to Retrieve Sample Message!! --> {} at: {}", str, System.currentTimeMillis());
return Mono.just(str);
}
在这里,该方法返回一个热 Mono 发布者。:
public void whenUsingMonoJust_thenEagerEvaluation() throws InterruptedException {
Mono<String> msg = sampleMsg("Eager Publisher");
log.debug("Intermediate Test Message....");
StepVerifier.create(msg)
.expectNext("Eager Publisher")
.verifyComplete();
Thread.sleep(5000);
StepVerifier.create(msg)
.expectNext("Eager Publisher")
.verifyComplete();
}
执行时,我们可以在日志中看到以下内容:
20:44:30.250 [main] DEBUG com.baeldung.mono.MonoUnitTest - Call to Retrieve Sample Message!! --> Eager Publisher at: 1622819670247
20:44:30.365 [main] DEBUG reactor.util.Loggers$LoggerFactory - Using Slf4j logging framework
20:44:30.365 [main] DEBUG com.baeldung.mono.MonoUnitTest - Intermediate Test Message....
在这里,我们可以注意到:
- 根据指令顺序, main 线程立即执行了 sampleMsg 方法。
- 在使用 StepVerifier 的两个订阅上,main 线程使用相同的 sampleMsg 输出,即 sampleMsg 只被执行了一次。
让我们看看 Mono.defer() 如何将其转换为冷发布者:
public void whenUsingMonoDefer_thenLazyEvaluation() throws InterruptedException {
Mono<String> deferMsg = Mono.defer(() -> sampleMsg("Lazy Publisher"));
log.debug("Intermediate Test Message....");
StepVerifier.create(deferMsg)
.expectNext("Lazy Publisher")
.verifyComplete();
Thread.sleep(5000);
StepVerifier.create(deferMsg)
.expectNext("Lazy Publisher")
.verifyComplete();
}
执行该方法后,我们可以在控制台看到以下日志:
20:01:05.149 [main] DEBUG com.baeldung.mono.MonoUnitTest - Intermediate Test Message....
20:01:05.187 [main] DEBUG com.baeldung.mono.MonoUnitTest - Call to Retrieve Sample Message!! --> Lazy Publisher at: 1622817065187
20:01:10.197 [main] DEBUG com.baeldung.mono.MonoUnitTest - Call to Retrieve Sample Message!! --> Lazy Publisher at: 1622817070197
从日志中,我们可以注意到:
- StepVerifier 在每个订阅上执行方法 sampleMsg ,而不是在我们定义它时执行。
- 延迟5秒后,订阅方法 sampleMsg的 第二个消费者再次执行它。
这就是 defer 方法如何从热发布者变成冷发布者。
4. Mono.defer 使用场景?
让我们看一下可以使用 Mono.defer() 方法的场景:
- 当我们必须有条件地订阅发布者时
- 当每个订阅的执行可能产生不同的结果时
- deferContextual 可用于发布者当前基于上下文的评估
4.1.使用示例
让我们看一下使用条件 Mono.defer() 方法的一个示例:
public void whenEmptyList_thenMonoDeferExecuted() {
Mono<List<String>> emptyList = Mono.defer(() -> monoOfEmptyList());
//Empty list, hence Mono publisher in switchIfEmpty executed after condition evaluation
Flux<String> emptyListElements = emptyList.flatMapIterable(l -> l)
.switchIfEmpty(Mono.defer(() -> sampleMsg("EmptyList")))
.log();
StepVerifier.create(emptyListElements)
.expectNext("EmptyList")
.verifyComplete();
}
这里, MonopublishersampleMsg的Supplier 被放置在 switchIfEmpty 方法中进行条件执行。因此, sampleMsg 仅在延迟订阅时执行。
现在,让我们看一下非空列表的相同代码:
public void whenNonEmptyList_thenMonoDeferNotExecuted() {
Mono<List<String>> nonEmptyist = Mono.defer(() -> monoOfList());
//Non empty list, hence Mono publisher in switchIfEmpty won't evaluated.
Flux<String> listElements = nonEmptyist.flatMapIterable(l -> l)
.switchIfEmpty(Mono.defer(() -> sampleMsg("NonEmptyList")))
.log();
StepVerifier.create(listElements)
.expectNext("one", "two", "three", "four")
.verifyComplete();
}
这里, sampleMsg 没有被执行,因为它没有被订阅。
5. 结论
在本文中,我们讨论了 Mono.defer() 方法和热/冷发布者。另外,我们如何将热发布者转变为冷发布者。最后,我们还讨论了它与示例用例的配合。
与往常一样,代码示例可以在 GitHub 上找到。