1. 概述

Reactor响应式编程中,我们可以通过多种方式创建 MonoFlux 类型的发布者。在这里,我们将了解如何使用 defer 方法来延迟 Mono 发布者的执行。

2. 什么是 Mono.defer 方法?

我们可以使用 Monodefer 方法创建一个最多可以产生一个值的冷发布者(cold publisher)。我们看一下方法签名:

public static <T> Mono<T> defer(Supplier<? extends Mono<? extends T>> supplier)

defer 接受 Mono 发布者的 Supplier ,并在下游订阅时延迟返回该 Mono

但问题是,什么是冷发布者(cold publisher)或懒发布者(lazy publisher)?让我们来看看。

仅当消费者订阅时,才产生数据的被称为冷发布者。而热发布者则在任何订阅之前都会生成数据。 我们有 Mono.just() 方法,它提供了 Mono 类型的热发布者。

3. 它是如何运作的?

让我们探讨一个具有 Mono 类型的 Supplier 示例:

private Mono<String> sampleMsg(String str) {
    log.debug("Call to Retrieve Sample Message!! --> {} at: {}", str, System.currentTimeMillis());
    return Mono.just(str);
}

在这里,该方法返回一个热 Mono 发布者。:

public void whenUsingMonoJust_thenEagerEvaluation() throws InterruptedException {

    Mono<String> msg = sampleMsg("Eager Publisher");

    log.debug("Intermediate Test Message....");

    StepVerifier.create(msg)
      .expectNext("Eager Publisher")
      .verifyComplete();

    Thread.sleep(5000);

    StepVerifier.create(msg)
      .expectNext("Eager Publisher")
      .verifyComplete();
}

执行时,我们可以在日志中看到以下内容:

20:44:30.250 [main] DEBUG com.baeldung.mono.MonoUnitTest - Call to Retrieve Sample Message!! --> Eager Publisher at: 1622819670247
20:44:30.365 [main] DEBUG reactor.util.Loggers$LoggerFactory - Using Slf4j logging framework
20:44:30.365 [main] DEBUG com.baeldung.mono.MonoUnitTest - Intermediate Test Message....

在这里,我们可以注意到:

  • 根据指令顺序, main 线程立即执行了 sampleMsg 方法。
  • 在使用 StepVerifier 的两个订阅上,main 线程使用相同的 sampleMsg 输出,即 sampleMsg 只被执行了一次。

让我们看看 Mono.defer() 如何将其转换为冷发布者:

public void whenUsingMonoDefer_thenLazyEvaluation() throws InterruptedException {

    Mono<String> deferMsg = Mono.defer(() -> sampleMsg("Lazy Publisher"));

    log.debug("Intermediate Test Message....");

    StepVerifier.create(deferMsg)
      .expectNext("Lazy Publisher")
      .verifyComplete();

    Thread.sleep(5000);

    StepVerifier.create(deferMsg)
      .expectNext("Lazy Publisher")
      .verifyComplete();

}

执行该方法后,我们可以在控制台看到以下日志:

20:01:05.149 [main] DEBUG com.baeldung.mono.MonoUnitTest - Intermediate Test Message....
20:01:05.187 [main] DEBUG com.baeldung.mono.MonoUnitTest - Call to Retrieve Sample Message!! --> Lazy Publisher at: 1622817065187
20:01:10.197 [main] DEBUG com.baeldung.mono.MonoUnitTest - Call to Retrieve Sample Message!! --> Lazy Publisher at: 1622817070197

从日志中,我们可以注意到:

  • StepVerifier 在每个订阅上执行方法 sampleMsg ,而不是在我们定义它时执行。
  • 延迟5秒后,订阅方法 sampleMsg的 第二个消费者再次执行它。

这就是 defer 方法如何从热发布者变成冷发布者。

4. Mono.defer 使用场景?

让我们看一下可以使用 Mono.defer() 方法的场景:

  • 当我们必须有条件地订阅发布者时
  • 当每个订阅的执行可能产生不同的结果时
  • deferContextual 可用于发布者当前基于上下文的评估

4.1.使用示例

让我们看一下使用条件 Mono.defer() 方法的一个示例:

public void whenEmptyList_thenMonoDeferExecuted() {

    Mono<List<String>> emptyList = Mono.defer(() -> monoOfEmptyList());

    //Empty list, hence Mono publisher in switchIfEmpty executed after condition evaluation
    Flux<String> emptyListElements = emptyList.flatMapIterable(l -> l)
      .switchIfEmpty(Mono.defer(() -> sampleMsg("EmptyList")))
      .log();

    StepVerifier.create(emptyListElements)
      .expectNext("EmptyList")
      .verifyComplete();
}

这里, MonopublishersampleMsg的Supplier 被放置在 switchIfEmpty 方法中进行条件执行。因此, sampleMsg 仅在延迟订阅时执行。

现在,让我们看一下非空列表的相同代码:

public void whenNonEmptyList_thenMonoDeferNotExecuted() {

    Mono<List<String>> nonEmptyist = Mono.defer(() -> monoOfList());

    //Non empty list, hence Mono publisher in switchIfEmpty won't evaluated.
    Flux<String> listElements = nonEmptyist.flatMapIterable(l -> l)
      .switchIfEmpty(Mono.defer(() -> sampleMsg("NonEmptyList")))
      .log();

    StepVerifier.create(listElements)
      .expectNext("one", "two", "three", "four")
      .verifyComplete();
}

这里, sampleMsg 没有被执行,因为它没有被订阅。

5. 结论

在本文中,我们讨论了 Mono.defer() 方法和热/冷发布者。另外,我们如何将热发布者转变为冷发布者。最后,我们还讨论了它与示例用例的配合。

与往常一样,代码示例可以在 GitHub 上找到。