1. 简介
Spring WebFlux 为 Web 应用提供了响应式编程(Reactive Programming)的能力。其异步、非阻塞的设计显著提升了性能和内存使用效率。而 Project Reactor 则为高效管理数据流提供了核心支持。
但在响应式系统中,背压(Backpressure) 是一个绕不开的问题。本文将深入解释背压的含义,并介绍如何在 Spring WebFlux 中应用背压机制来缓解这一问题。
2. 响应式流中的背压
由于响应式编程的非阻塞特性,服务端不会一次性发送完整数据流,而是可以并发地在数据就绪时推送。这让客户端能更快地接收和处理事件。但随之而来的也有挑战。
2.1. 什么是背压?
✅ 在响应式流中,背压指的是如何控制数据流的传输速率,即控制接收方能消费多少元素。
举个例子:
- 系统包含三个组件:发布者(Publisher)、消费者(Consumer)和图形界面(GUI)
- Publisher 每秒发送 10000 个事件给 Consumer
- Consumer 处理这些事件并将结果传给 GUI
- GUI 负责展示结果给用户
- 但 Consumer 每秒只能处理 7500 个事件
在这种速率下,Consumer 无法及时处理所有事件,造成 背压。最终系统会崩溃,用户也无法看到结果。
2.2. 使用背压防止系统崩溃
面对这种情况,我们需要采取一些背压策略来防止系统性失败。主要目标是高效管理那些无法及时处理的事件:
- ✅ 控制数据流发送速率:这是首选方案。Publisher 应该放慢发送节奏,避免 Consumer 被压垮。但这种方式并不总是可行。
- ✅ 缓存多余数据:Consumer 临时缓存来不及处理的事件,等有能力时再处理。缺点是缓冲区可能无限增长,导致内存溢出。
- ✅ 丢弃多余事件:虽然不是理想方案,但至少能避免系统崩溃。
2.3. 控制背压的策略
我们主要关注 Publisher 如何控制事件发送速率,主要有三种策略:
- ✅ Pull 模式:只有当订阅者请求时,Publisher 才发送新事件
- ✅ Push 限流:Publisher 限制每次发送的事件数量
- ✅ 取消流传输:当 Consumer 无法处理更多事件时,可随时取消订阅
3. 在 Spring WebFlux 中处理背压
Spring WebFlux 提供了异步非阻塞的响应式流处理能力。它依赖 Project Reactor 来实现背压控制,内部使用 Flux 来管理事件流。
⚠️ WebFlux 通过 TCP 流控制来管理字节级的背压,但对逻辑事件的控制较弱。来看一下其内部机制:
- WebFlux 负责将事件转换为字节以便通过 TCP 传输
- Consumer 可能在处理事件时,未请求新数据
- 此时,WebFlux 仍会缓存字节数据,但不会发送新事件
- 但由于 TCP 特性,Publisher 仍可能继续发送数据
结论是:Spring WebFlux 并未完美处理服务间的逻辑背压。它只是分别处理了 Publisher 和 Consumer 的背压,但没有考虑它们之间的整体协调。
4. 在 Spring WebFlux 中实现背压机制
我们使用 Flux 来控制事件流的读写,实现背压支持。这样,当 Consumer 能力不足时,Producer 会自动减缓或暂停发送。
4.1. 依赖配置
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-test</artifactId>
<scope>test</scope>
</dependency>
4.2. 控制请求频率(Request)
✅ 让 Consumer 控制自己能处理的事件数量。Publisher 会等待 Consumer 主动请求新事件。
@Test
public void whenRequestingChunks10_thenMessagesAreReceived() {
Flux request = Flux.range(1, 50);
request.subscribe(
System.out::println,
err -> err.printStackTrace(),
() -> System.out.println("All 50 items have been successfully processed!!!"),
subscription -> {
for (int i = 0; i < 5; i++) {
System.out.println("Requesting the next 10 elements!!!");
subscription.request(10);
}
}
);
StepVerifier.create(request)
.expectSubscription()
.thenRequest(10)
.expectNext(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
.thenRequest(10)
.expectNext(11, 12, 13, 14, 15, 16, 17, 18, 19, 20)
.thenRequest(10)
.expectNext(21, 22, 23, 24, 25, 26, 27 , 28, 29 ,30)
.thenRequest(10)
.expectNext(31, 32, 33, 34, 35, 36, 37 , 38, 39 ,40)
.thenRequest(10)
.expectNext(41, 42, 43, 44, 45, 46, 47 , 48, 49 ,50)
.verifyComplete();
4.3. 限制请求速率(Limit)
✅ 使用 limitRate()
控制每次预取事件数量。即使 Subscriber 请求更多,也不会超过限制。
@Test
public void whenLimitRateSet_thenSplitIntoChunks() throws InterruptedException {
Flux<Integer> limit = Flux.range(1, 25);
limit.limitRate(10);
limit.subscribe(
value -> System.out.println(value),
err -> err.printStackTrace(),
() -> System.out.println("Finished!!"),
subscription -> subscription.request(15)
);
StepVerifier.create(limit)
.expectSubscription()
.thenRequest(15)
.expectNext(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
.expectNext(11, 12, 13, 14, 15)
.thenRequest(10)
.expectNext(16, 17, 18, 19, 20, 21, 22, 23, 24, 25)
.verifyComplete();
}
4.4. 取消订阅(Cancel)
✅ Consumer 可以在任意时刻取消订阅,中止事件接收。
@Test
public void whenCancel_thenSubscriptionFinished() {
Flux<Integer> cancel = Flux.range(1, 10).log();
cancel.subscribe(new BaseSubscriber<Integer>() {
@Override
protected void hookOnNext(Integer value) {
request(3);
System.out.println(value);
cancel();
}
});
StepVerifier.create(cancel)
.expectNext(1, 2, 3)
.thenCancel()
.verify();
}
5. 总结
本文介绍了响应式编程中的背压问题,以及如何通过 Spring WebFlux 和 Project Reactor 来应对。通过合理的背压控制,系统可以在高并发下保持稳定性,避免因事件堆积导致的崩溃。
相关代码可在 GitHub 获取。