1. 概述
在这个教程中,我们将探讨在Spring 5 WebFlux中访问流式(Flux)的第一个元素的不同方法。
首先,我们将使用API的非阻塞方法,如next()
和take()
。然后,我们将学习如何借助elementAt()
方法实现相同的操作,其中需要指定索引。最后,我们将了解API的阻塞方法,并使用blockFirst()
来获取流式的第一项。
2. 测试设置
本文中的代码示例将使用Payment
类,它只有一个字段:支付金额:
public class Payment {
private final int amount;
// constructor and getter
}
在测试中,我们将使用名为fluxOfThreePayments
的测试辅助方法构造一个包含三个支付的流式:
private Flux<Payment> fluxOfThreePayments() {
return Flux.just(paymentOf100, new Payment(200), new Payment(300));
}
之后,我们将使用Spring Reactor的[StepVerifier](/reactive-streams-step-verifier-test-publisher)
来测试结果。
3. next()
首先,我们尝试next()
方法。这个方法会返回流式的第一项,包装成reactive的Mono
类型:
@Test
void givenAPaymentFlux_whenUsingNext_thenGetTheFirstPaymentAsMono() {
Mono<Payment> firstPayment = fluxOfThreePayments().next();
StepVerifier.create(firstPayment)
.expectNext(paymentOf100)
.verifyComplete();
}
另一方面,如果在空的流式上调用next()
,结果将是空的Mono
。因此,阻塞它将返回null
:
@Test
void givenAEmptyFlux_whenUsingNext_thenGetAnEmptyMono() {
Flux<Payment> emptyFlux = Flux.empty();
Mono<Payment> firstPayment = emptyFlux.next();
StepVerifier.create(firstPayment)
.verifyComplete();
}
4. take()
流式的take()
方法相当于Java 8流的limit()
。换句话说,我们可以使用take(1)
限制流式至恰好一个元素,然后以阻塞或非阻塞方式使用它:
@Test
void givenAPaymentFlux_whenUsingTake_thenGetTheFirstPaymentAsFlux() {
Flux<Payment> firstPaymentFlux = fluxOfThreePayments().take(1);
StepVerifier.create(firstPaymentFlux)
.expectNext(paymentOf100)
.verifyComplete();
}
同样地,对于空流式,take(1)
将返回一个空流式:
@Test
void givenAEmptyFlux_whenUsingNext_thenGetAnEmptyFlux() {
Flux<Payment> emptyFlux = Flux.empty();
Flux<Payment> firstPaymentFlux = emptyFlux.take(1);
StepVerifier.create(firstPaymentFlux)
.verifyComplete();
}
5. elementAt()
流式API还提供了elementAt()
方法。我们可以使用elementAt(0)
以非阻塞的方式获取流式的第一项:
@Test
void givenAPaymentFlux_whenUsingElementAt_thenGetTheFirstPaymentAsMono() {
Mono<Payment> firstPayment = fluxOfThreePayments().elementAt(0);
StepVerifier.create(firstPayment)
.expectNext(paymentOf100)
.verifyComplete();
}
然而,如果传递给方法的索引大于流式发出的元素数量,将会抛出错误:
@Test
void givenAEmptyFlux_whenUsingElementAt_thenGetAnEmptyMono() {
Flux<Payment> emptyFlux = Flux.empty();
Mono<Payment> firstPayment = emptyFlux.elementAt(0);
StepVerifier.create(firstPayment)
.expectError(IndexOutOfBoundsException.class);
}
6. blockFirst()
另一种选择是使用blockFirst()
。然而,顾名思义,这是一个阻塞方法。因此,如果我们使用blockFirst()
,我们将离开reactive世界,失去其所有优势:
@Test
void givenAPaymentFlux_whenUsingBlockFirst_thenGetTheFirstPayment() {
Payment firstPayment = fluxOfThreePayments().blockFirst();
assertThat(firstPayment).isEqualTo(paymentOf100);
}
7. toStream()
最后,我们可以将流式转换为Java 8流,然后访问第一个元素:
@Test
void givenAPaymentFlux_whenUsingToStream_thenGetTheFirstPaymentAsOptional() {
Optional<Payment> firstPayment = fluxOfThreePayments().toStream()
.findFirst();
assertThat(firstPayment).contains(paymentOf100);
}
但再次强调,如果我们这样做,就无法继续使用reactive管道。
8. 总结
在这篇文章中,我们讨论了Java的reactive流(Reactive Streams)API。我们看到了获取流式第一个元素的不同方法,并了解到,如果我们想利用reactive管道,应该坚持使用非阻塞解决方案。
如往常一样,文章中的代码可以在GitHub上找到。