1. 简介

Spring WebFlux 为 Web 应用提供了响应式编程(Reactive Programming)的能力。其异步、非阻塞的设计显著提升了性能和内存使用效率。而 Project Reactor 则为高效管理数据流提供了核心支持。

但在响应式系统中,背压(Backpressure) 是一个绕不开的问题。本文将深入解释背压的含义,并介绍如何在 Spring WebFlux 中应用背压机制来缓解这一问题。

2. 响应式流中的背压

由于响应式编程的非阻塞特性,服务端不会一次性发送完整数据流,而是可以并发地在数据就绪时推送。这让客户端能更快地接收和处理事件。但随之而来的也有挑战。

2.1. 什么是背压?

在响应式流中,背压指的是如何控制数据流的传输速率,即控制接收方能消费多少元素。

举个例子:

  • 系统包含三个组件:发布者(Publisher)、消费者(Consumer)和图形界面(GUI)
  • Publisher 每秒发送 10000 个事件给 Consumer
  • Consumer 处理这些事件并将结果传给 GUI
  • GUI 负责展示结果给用户
  • 但 Consumer 每秒只能处理 7500 个事件

Screenshot-2021-02-18-at-13.10.26

在这种速率下,Consumer 无法及时处理所有事件,造成 背压。最终系统会崩溃,用户也无法看到结果。

2.2. 使用背压防止系统崩溃

面对这种情况,我们需要采取一些背压策略来防止系统性失败。主要目标是高效管理那些无法及时处理的事件:

  • 控制数据流发送速率:这是首选方案。Publisher 应该放慢发送节奏,避免 Consumer 被压垮。但这种方式并不总是可行。
  • 缓存多余数据:Consumer 临时缓存来不及处理的事件,等有能力时再处理。缺点是缓冲区可能无限增长,导致内存溢出。
  • 丢弃多余事件:虽然不是理想方案,但至少能避免系统崩溃。

Screenshot-2021-02-18-at-16.48.38

2.3. 控制背压的策略

我们主要关注 Publisher 如何控制事件发送速率,主要有三种策略:

  • Pull 模式:只有当订阅者请求时,Publisher 才发送新事件
  • Push 限流:Publisher 限制每次发送的事件数量
  • 取消流传输:当 Consumer 无法处理更多事件时,可随时取消订阅

Screenshot-2021-02-25-at-16.51.46

3. 在 Spring WebFlux 中处理背压

Spring WebFlux 提供了异步非阻塞的响应式流处理能力。它依赖 Project Reactor 来实现背压控制,内部使用 Flux 来管理事件流。

⚠️ WebFlux 通过 TCP 流控制来管理字节级的背压,但对逻辑事件的控制较弱。来看一下其内部机制:

  • WebFlux 负责将事件转换为字节以便通过 TCP 传输
  • Consumer 可能在处理事件时,未请求新数据
  • 此时,WebFlux 仍会缓存字节数据,但不会发送新事件
  • 但由于 TCP 特性,Publisher 仍可能继续发送数据

Screenshot-2021-03-19-at-16.40.30

结论是:Spring WebFlux 并未完美处理服务间的逻辑背压。它只是分别处理了 Publisher 和 Consumer 的背压,但没有考虑它们之间的整体协调。

4. 在 Spring WebFlux 中实现背压机制

我们使用 Flux 来控制事件流的读写,实现背压支持。这样,当 Consumer 能力不足时,Producer 会自动减缓或暂停发送。

4.1. 依赖配置

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-webflux</artifactId>
</dependency>

<dependency>
    <groupId>io.projectreactor</groupId>
    <artifactId>reactor-test</artifactId>
    <scope>test</scope>
</dependency>

4.2. 控制请求频率(Request)

让 Consumer 控制自己能处理的事件数量。Publisher 会等待 Consumer 主动请求新事件。

@Test
public void whenRequestingChunks10_thenMessagesAreReceived() {
    Flux request = Flux.range(1, 50);

    request.subscribe(
      System.out::println,
      err -> err.printStackTrace(),
      () -> System.out.println("All 50 items have been successfully processed!!!"),
      subscription -> {
          for (int i = 0; i < 5; i++) {
              System.out.println("Requesting the next 10 elements!!!");
              subscription.request(10);
          }
      }
    );

    StepVerifier.create(request)
      .expectSubscription()
      .thenRequest(10)
      .expectNext(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
      .thenRequest(10)
      .expectNext(11, 12, 13, 14, 15, 16, 17, 18, 19, 20)
      .thenRequest(10)
      .expectNext(21, 22, 23, 24, 25, 26, 27 , 28, 29 ,30)
      .thenRequest(10)
      .expectNext(31, 32, 33, 34, 35, 36, 37 , 38, 39 ,40)
      .thenRequest(10)
      .expectNext(41, 42, 43, 44, 45, 46, 47 , 48, 49 ,50)
      .verifyComplete();

4.3. 限制请求速率(Limit)

✅ 使用 limitRate() 控制每次预取事件数量。即使 Subscriber 请求更多,也不会超过限制。

@Test
public void whenLimitRateSet_thenSplitIntoChunks() throws InterruptedException {
    Flux<Integer> limit = Flux.range(1, 25);

    limit.limitRate(10);
    limit.subscribe(
      value -> System.out.println(value),
      err -> err.printStackTrace(),
      () -> System.out.println("Finished!!"),
      subscription -> subscription.request(15)
    );

    StepVerifier.create(limit)
      .expectSubscription()
      .thenRequest(15)
      .expectNext(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
      .expectNext(11, 12, 13, 14, 15)
      .thenRequest(10)
      .expectNext(16, 17, 18, 19, 20, 21, 22, 23, 24, 25)
      .verifyComplete();
}

4.4. 取消订阅(Cancel)

✅ Consumer 可以在任意时刻取消订阅,中止事件接收。

@Test
public void whenCancel_thenSubscriptionFinished() {
    Flux<Integer> cancel = Flux.range(1, 10).log();

    cancel.subscribe(new BaseSubscriber<Integer>() {
        @Override
        protected void hookOnNext(Integer value) {
            request(3);
            System.out.println(value);
            cancel();
        }
    });

    StepVerifier.create(cancel)
      .expectNext(1, 2, 3)
      .thenCancel()
      .verify();
}

5. 总结

本文介绍了响应式编程中的背压问题,以及如何通过 Spring WebFlux 和 Project Reactor 来应对。通过合理的背压控制,系统可以在高并发下保持稳定性,避免因事件堆积导致的崩溃。

相关代码可在 GitHub 获取。


原始标题:Backpressure Mechanism in Spring WebFlux | Baeldung

« 上一篇: Spring TLS/HTTPS配置
» 下一篇: Java Weekly, 第380期