1. 引言

本文深入探讨使用 Spring WebFlux 编写的响应式程序中的并发机制。

我们将从响应式编程与并发的关系讲起,接着分析 Spring WebFlux 如何在不同响应式服务器库之上提供并发抽象。目标是帮助有经验的开发者真正理解底层线程模型,避免踩坑。

2. 响应式编程的动机

典型的 Web 应用由多个复杂且相互协作的组件构成。其中很多交互本质上是阻塞的,比如数据库查询或更新操作。但也有大量操作是相互独立的,完全可以并发甚至并行执行。

例如,Web 服务器可以用不同线程处理两个用户请求。在多核平台上,这能显著降低整体响应时间。这种模式被称为 “每请求一线程”(thread-per-request)模型

Thread per Request Model

如图所示,每个线程一次只处理一个请求。

虽然基于线程的并发解决了部分问题,但它并未解决单个线程内多数操作仍是阻塞的这一事实。更关键的是,Java 的原生线程在上下文切换时开销巨大。

随着 Web 应用面临越来越多的请求,“每请求一线程”模型逐渐力不从心。因此,我们需要一种能用更少线程处理更多请求的并发模型——这正是采用 响应式编程 的核心动机之一。

3. 响应式编程中的并发

响应式编程 的核心是将程序结构化为数据流,并管理变化的传播。在一个完全非阻塞的环境中,它能以更少的资源实现更高的并发。

但响应式编程是否完全抛弃了线程?不完全是。它的不同之处在于对线程的使用方式——核心在于异步性

程序流从一连串同步操作,转变为异步事件流。例如,在响应式模型中,数据库读取调用不会阻塞当前线程,而是立即返回一个发布者(Publisher),供其他组件订阅。订阅者在事件发生后处理数据,甚至可以生成新的事件:

Reactive Model

最关键的是,响应式编程并不关心事件在哪个线程上生成或消费,重点在于将程序结构化为异步事件流。发布者和订阅者无需在同一线程运行,这有助于更高效地利用线程资源,从而提升整体并发能力。

4. 事件循环(Event Loop)

有多种编程模型支持响应式并发,本节介绍其中一种核心模型:事件循环(Event Loop)模型

Event Loop

这是一个典型的事件循环抽象设计:

  • 事件循环在单个线程中持续运行,但可根据 CPU 核心数创建多个事件循环。
  • ✅ 它从事件队列中顺序处理事件,注册回调后立即返回。
  • ✅ 底层平台(如网络 I/O、数据库)完成操作后会通知事件循环。
  • ✅ 事件循环触发回调,将结果返回给原始调用方。

该模型被 Node.jsNettyNginx 等平台广泛采用,相比 Apache HTTP ServerTomcat 等传统阻塞式服务器,具备更强的可扩展性。

5. Spring WebFlux 中的响应式编程

Spring WebFlux 是 Spring 5.0 引入的响应式栈 Web 框架。我们来看它的服务端技术栈如何与传统栈并存:

Spring Web Stack

关键点:

  • ✅ WebFlux 并非取代传统 Spring MVC,而是并行存在。
  • ✅ 扩展了注解式编程模型,支持函数式路由。
  • ✅ 通过 Reactive Streams API 适配多种 HTTP 运行时,实现运行时互操作。
  • ✅ 支持多种响应式运行时:Servlet 3.1+(Tomcat)、Reactor Netty、Undertow 等。
  • ✅ 内置 WebClient —— 一个非阻塞、响应式的 HTTP 客户端,提供函数式流畅 API。

6. 支持的运行时中的线程模型

响应式程序通常只使用少量线程,但具体线程数量和行为取决于所选的响应式运行时。

Spring WebFlux 通过 HttpHandler API 抽象不同服务器的实现,实现运行时无关性。该接口仅一个方法,统一了 Reactor Netty、Servlet 3.1、Undertow 等底层差异。

可以通过排除依赖切换服务器:

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-webflux</artifactId>
    <exclusions>
        <exclusion>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-reactor-netty</artifactId>
        </exclusion>
    </exclusions>
</dependency>
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-tomcat</artifactId>
</dependency>

查看 JVM 中线程的简单方式:

Thread.getAllStackTraces()
  .keySet()
  .stream()
  .collect(Collectors.toList());

6.1. Reactor Netty

Reactor Netty 是 Spring Boot WebFlux 的默认嵌入式服务器。启动一个空的 WebFlux 应用,观察其创建的线程:

1

可见 Netty 创建了多个工作线程(数量通常等于 CPU 核心数),以及一些 JVM 常驻线程。

Netty 使用事件循环模型实现高并发:

Netty Threading Model

  • EventLoopGroup 管理一个或多个持续运行的 EventLoop
  • ⚠️ 不建议创建超过 CPU 核心数的 EventLoop
  • ✅ 每个 Channel 生命周期内,所有操作由同一个 EventLoop 线程执行。

6.2. Apache Tomcat

Spring WebFlux 也支持传统 Servlet 容器如 Apache Tomcat,依赖 Servlet 3.1 的非阻塞 I/O 特性。

在 Tomcat 上运行 WebFlux 应用,线程情况如下:

2

与 Netty 不同,Tomcat 默认启动更多工作线程(默认 10 个)。

其 NIO Connector 架构包含三个核心组件:

Tomcat NIO Connector

  • Acceptor:接收新连接,通常 1 个线程。
  • Poller:监听 I/O 事件,可配置多个线程。
  • Worker:处理请求业务逻辑,使用独立线程池。

7. WebClient 的线程模型

WebClient 是 Spring WebFlux 提供的响应式 HTTP 客户端,用于构建端到端响应式应用。

7.1. 使用 WebClient

无需额外依赖,直接使用:

@GetMapping("/index")
public Mono<String> getIndex() {
    return Mono.just("Hello World!");
}

用 WebClient 调用该接口:

WebClient.create("http://localhost:8080/index").get()
  .retrieve()
  .bodyToMono(String.class)
  .doOnNext(s -> printThreads());

7.2. 线程模型解析

  • WebClient 默认也采用事件循环模型
  • ✅ 在 Reactor Netty 上运行时,与服务器共享同一个事件循环,线程无明显变化。
  • ⚠️ 在 Jetty 等 Servlet 容器上,WebClient 需创建独立的事件循环,因此会新增线程:

3

  • 为 WebClient 分配独立线程池可提升性能,避免客户端操作阻塞服务器事件循环。

8. 数据访问库的线程模型

典型应用常需连接数据库、消息中间件等。传统库多为阻塞式,但响应式库正快速普及。

8.1. Spring Data MongoDB

通过添加依赖启用响应式 MongoDB 支持:

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-data-mongodb-reactive</artifactId>
</dependency>

定义响应式仓库:

public interface PersonRepository extends ReactiveMongoRepository<Person, ObjectId> {
}
// 使用
personRepository.findAll().doOnComplete(this::printThreads);
  • Spring Data 响应式仓库默认复用服务器的事件循环,在 Netty 上运行时线程无显著变化。

8.2. Reactor Kafka

Spring 官方对响应式 Kafka 的支持仍在完善,但可直接使用 Reactor Kafka

添加依赖:

<dependency>
    <groupId>io.projectreactor.kafka</groupId>
    <artifactId>reactor-kafka</artifactId>
    <version>1.3.10</version>
</dependency>

非阻塞生产消息:

SenderOptions<Integer, String> senderOptions = SenderOptions.create(producerProps);
KafkaSender<Integer, String> sender = KafkaSender.create(senderOptions);
Flux<SenderRecord<Integer, String, Integer>> outboundFlux = Flux
  .range(1, 10)
  .map(i -> SenderRecord.create(new ProducerRecord<>("reactive-test", i, "Message_" + i), i));
sender.send(outboundFlux).subscribe();

非阻塞消费消息:

ReceiverOptions<Integer, String> receiverOptions = ReceiverOptions.create(consumerProps);
receiverOptions.subscription(Collections.singleton("reactive-test"));
KafkaReceiver<Integer, String> receiver = KafkaReceiver.create(receiverOptions);
Flux<ReceiverRecord<Integer, String>> inboundFlux = receiver.receive();
inboundFlux.doOnComplete(this::printThreads);

线程情况:

4

  • Reactor Kafka 管理独立的线程池,用于 Kafka 消息的收发处理。
  • ✅ 生产者使用专用网络线程发送请求,响应通过单线程调度器返回。
  • ✅ 消费者每组一个线程监听消息,实际处理由另一线程池执行。

9. WebFlux 中的调度选项

响应式编程在完全非阻塞环境下表现优异,但一旦出现阻塞操作,可能冻结整个事件循环,导致严重性能问题。

9.1. Reactor

Reactor 提供 Scheduler 控制执行上下文:

  • Schedulers.immediate:立即执行。
  • Schedulers.single:单线程。
  • Schedulers.elastic:弹性线程池,适合阻塞任务。
  • Schedulers.parallel:固定大小线程池,适合 CPU 密集型任务。

切换执行上下文的方法:

  • publishOn(scheduler):影响其后所有操作符。
  • subscribeOn(scheduler):影响数据源的发射线程。

为 WebClient 创建独立线程池示例:

Scheduler scheduler = Schedulers.newBoundedElastic(5, 10, "MyThreadGroup");

WebClient.create("http://localhost:8080/index").get()
  .retrieve()
  .bodyToMono(String.class)
  .publishOn(scheduler)
  .doOnNext(s -> printThreads());

效果:

5

可见名为 MyThreadGroup 的线程池已创建,避免阻塞主事件循环。

9.2. RxJava

RxJava 的调度机制与 Reactor 类似:

  • Schedulers.io():I/O 密集型任务。
  • Schedulers.computation():CPU 密集型任务。
  • subscribeOn():指定数据源发射线程。
  • observeOn():指定通知观察者的线程。

引入 RxJava 依赖:

<dependency>
    <groupId>io.reactivex.rxjava2</groupId>
    <artifactId>rxjava</artifactId>
    <version>2.2.21</version>
</dependency>

使用示例:

io.reactivex.Observable
  .fromIterable(Arrays.asList("Tom", "Sawyer"))
  .map(s -> s.toUpperCase())
  .observeOn(io.reactivex.schedulers.Schedulers.trampoline())
  .doOnComplete(this::printThreads);

效果:

6

可见 RxJava 调度器创建的线程已出现。

10. 总结

本文系统分析了 Spring WebFlux 的并发模型:

  • ✅ 响应式编程通过异步事件流和非阻塞 I/O,用更少线程实现高并发。
  • ✅ Netty 使用事件循环模型,线程数通常等于 CPU 核心数。
  • ✅ Tomcat 使用 Acceptor-Poller-Worker 模型,工作线程较多。
  • ✅ WebClient 和数据访问库(如 Reactor Kafka)可能引入独立线程池。
  • 关键技巧:使用 publishOn / subscribeOn 将阻塞操作调度到专用线程池,保护事件循环

掌握这些线程模型,才能在实践中避免性能瓶颈,充分发挥响应式架构的优势。

示例代码已上传至 GitHub:https://github.com/eugenp/tutorials/tree/master/spring-reactive-modules/spring-reactive


原始标题:Concurrency in Spring WebFlux | Baeldung