1. 概述

在这个教程中,我们将探讨在Spring 5 WebFlux中访问流式(Flux)的第一个元素的不同方法。

首先,我们将使用API的非阻塞方法,如next()take()。然后,我们将学习如何借助elementAt()方法实现相同的操作,其中需要指定索引。最后,我们将了解API的阻塞方法,并使用blockFirst()来获取流式的第一项。

2. 测试设置

本文中的代码示例将使用Payment类,它只有一个字段:支付金额:

public class Payment {
    private final int amount;
    // constructor and getter
}

在测试中,我们将使用名为fluxOfThreePayments的测试辅助方法构造一个包含三个支付的流式:

private Flux<Payment> fluxOfThreePayments() {
    return Flux.just(paymentOf100, new Payment(200), new Payment(300));
}

之后,我们将使用Spring Reactor的[StepVerifier](/reactive-streams-step-verifier-test-publisher)来测试结果。

3. next()

首先,我们尝试next()方法。这个方法会返回流式的第一项,包装成reactive的Mono类型:

@Test
void givenAPaymentFlux_whenUsingNext_thenGetTheFirstPaymentAsMono() {
    Mono<Payment> firstPayment = fluxOfThreePayments().next();

    StepVerifier.create(firstPayment)
      .expectNext(paymentOf100)
      .verifyComplete();
}

另一方面,如果在空的流式上调用next(),结果将是空的Mono。因此,阻塞它将返回null

@Test
void givenAEmptyFlux_whenUsingNext_thenGetAnEmptyMono() {
    Flux<Payment> emptyFlux = Flux.empty();

    Mono<Payment> firstPayment = emptyFlux.next();

    StepVerifier.create(firstPayment)
      .verifyComplete();
}

4. take()

流式的take()方法相当于Java 8流的limit()。换句话说,我们可以使用take(1)限制流式至恰好一个元素,然后以阻塞或非阻塞方式使用它:

@Test
void givenAPaymentFlux_whenUsingTake_thenGetTheFirstPaymentAsFlux() {
    Flux<Payment> firstPaymentFlux = fluxOfThreePayments().take(1);

    StepVerifier.create(firstPaymentFlux)
      .expectNext(paymentOf100)
      .verifyComplete();
}

同样地,对于空流式,take(1)将返回一个空流式:

@Test
void givenAEmptyFlux_whenUsingNext_thenGetAnEmptyFlux() {
    Flux<Payment> emptyFlux = Flux.empty();

    Flux<Payment> firstPaymentFlux = emptyFlux.take(1);

    StepVerifier.create(firstPaymentFlux)
      .verifyComplete();
}

5. elementAt()

流式API还提供了elementAt()方法。我们可以使用elementAt(0)以非阻塞的方式获取流式的第一项:

@Test
void givenAPaymentFlux_whenUsingElementAt_thenGetTheFirstPaymentAsMono() {
    Mono<Payment> firstPayment = fluxOfThreePayments().elementAt(0);

    StepVerifier.create(firstPayment)
      .expectNext(paymentOf100)
      .verifyComplete();
}

然而,如果传递给方法的索引大于流式发出的元素数量,将会抛出错误:

@Test
void givenAEmptyFlux_whenUsingElementAt_thenGetAnEmptyMono() {
    Flux<Payment> emptyFlux = Flux.empty();

    Mono<Payment> firstPayment = emptyFlux.elementAt(0);

    StepVerifier.create(firstPayment)
      .expectError(IndexOutOfBoundsException.class);
}

6. blockFirst()

另一种选择是使用blockFirst()。然而,顾名思义,这是一个阻塞方法。因此,如果我们使用blockFirst(),我们将离开reactive世界,失去其所有优势:

@Test
void givenAPaymentFlux_whenUsingBlockFirst_thenGetTheFirstPayment() {
    Payment firstPayment = fluxOfThreePayments().blockFirst();

    assertThat(firstPayment).isEqualTo(paymentOf100);
}

7. toStream()

最后,我们可以将流式转换为Java 8流,然后访问第一个元素:

@Test
void givenAPaymentFlux_whenUsingToStream_thenGetTheFirstPaymentAsOptional() {
    Optional<Payment> firstPayment = fluxOfThreePayments().toStream()
      .findFirst();

    assertThat(firstPayment).contains(paymentOf100);
}

但再次强调,如果我们这样做,就无法继续使用reactive管道。

8. 总结

在这篇文章中,我们讨论了Java的reactive流(Reactive Streams)API。我们看到了获取流式第一个元素的不同方法,并了解到,如果我们想利用reactive管道,应该坚持使用非阻塞解决方案。

如往常一样,文章中的代码可以在GitHub上找到。