1. 引言
本文深入探讨使用 Spring WebFlux 编写的响应式程序中的并发机制。
我们将从响应式编程与并发的关系讲起,接着分析 Spring WebFlux 如何在不同响应式服务器库之上提供并发抽象。目标是帮助有经验的开发者真正理解底层线程模型,避免踩坑。
2. 响应式编程的动机
典型的 Web 应用由多个复杂且相互协作的组件构成。其中很多交互本质上是阻塞的,比如数据库查询或更新操作。但也有大量操作是相互独立的,完全可以并发甚至并行执行。
例如,Web 服务器可以用不同线程处理两个用户请求。在多核平台上,这能显著降低整体响应时间。这种模式被称为 “每请求一线程”(thread-per-request)模型:
如图所示,每个线程一次只处理一个请求。
虽然基于线程的并发解决了部分问题,但它并未解决单个线程内多数操作仍是阻塞的这一事实。更关键的是,Java 的原生线程在上下文切换时开销巨大。
随着 Web 应用面临越来越多的请求,“每请求一线程”模型逐渐力不从心。因此,我们需要一种能用更少线程处理更多请求的并发模型——这正是采用 响应式编程 的核心动机之一。
3. 响应式编程中的并发
响应式编程 的核心是将程序结构化为数据流,并管理变化的传播。在一个完全非阻塞的环境中,它能以更少的资源实现更高的并发。
但响应式编程是否完全抛弃了线程?不完全是。它的不同之处在于对线程的使用方式——核心在于异步性。
程序流从一连串同步操作,转变为异步事件流。例如,在响应式模型中,数据库读取调用不会阻塞当前线程,而是立即返回一个发布者(Publisher),供其他组件订阅。订阅者在事件发生后处理数据,甚至可以生成新的事件:
最关键的是,响应式编程并不关心事件在哪个线程上生成或消费,重点在于将程序结构化为异步事件流。发布者和订阅者无需在同一线程运行,这有助于更高效地利用线程资源,从而提升整体并发能力。
4. 事件循环(Event Loop)
有多种编程模型支持响应式并发,本节介绍其中一种核心模型:事件循环(Event Loop)模型。
这是一个典型的事件循环抽象设计:
- ✅ 事件循环在单个线程中持续运行,但可根据 CPU 核心数创建多个事件循环。
- ✅ 它从事件队列中顺序处理事件,注册回调后立即返回。
- ✅ 底层平台(如网络 I/O、数据库)完成操作后会通知事件循环。
- ✅ 事件循环触发回调,将结果返回给原始调用方。
该模型被 Node.js、Netty 和 Nginx 等平台广泛采用,相比 Apache HTTP Server、Tomcat 等传统阻塞式服务器,具备更强的可扩展性。
5. Spring WebFlux 中的响应式编程
Spring WebFlux 是 Spring 5.0 引入的响应式栈 Web 框架。我们来看它的服务端技术栈如何与传统栈并存:
关键点:
- ✅ WebFlux 并非取代传统 Spring MVC,而是并行存在。
- ✅ 扩展了注解式编程模型,支持函数式路由。
- ✅ 通过 Reactive Streams API 适配多种 HTTP 运行时,实现运行时互操作。
- ✅ 支持多种响应式运行时:Servlet 3.1+(Tomcat)、Reactor Netty、Undertow 等。
- ✅ 内置 WebClient —— 一个非阻塞、响应式的 HTTP 客户端,提供函数式流畅 API。
6. 支持的运行时中的线程模型
响应式程序通常只使用少量线程,但具体线程数量和行为取决于所选的响应式运行时。
Spring WebFlux 通过 HttpHandler API 抽象不同服务器的实现,实现运行时无关性。该接口仅一个方法,统一了 Reactor Netty、Servlet 3.1、Undertow 等底层差异。
可以通过排除依赖切换服务器:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
<exclusions>
<exclusion>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-reactor-netty</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-tomcat</artifactId>
</dependency>
查看 JVM 中线程的简单方式:
Thread.getAllStackTraces()
.keySet()
.stream()
.collect(Collectors.toList());
6.1. Reactor Netty
Reactor Netty 是 Spring Boot WebFlux 的默认嵌入式服务器。启动一个空的 WebFlux 应用,观察其创建的线程:
可见 Netty 创建了多个工作线程(数量通常等于 CPU 核心数),以及一些 JVM 常驻线程。
Netty 使用事件循环模型实现高并发:
- ✅ EventLoopGroup 管理一个或多个持续运行的 EventLoop。
- ⚠️ 不建议创建超过 CPU 核心数的 EventLoop。
- ✅ 每个 Channel 生命周期内,所有操作由同一个 EventLoop 线程执行。
6.2. Apache Tomcat
Spring WebFlux 也支持传统 Servlet 容器如 Apache Tomcat,依赖 Servlet 3.1 的非阻塞 I/O 特性。
在 Tomcat 上运行 WebFlux 应用,线程情况如下:
与 Netty 不同,Tomcat 默认启动更多工作线程(默认 10 个)。
其 NIO Connector 架构包含三个核心组件:
- ✅ Acceptor:接收新连接,通常 1 个线程。
- ✅ Poller:监听 I/O 事件,可配置多个线程。
- ✅ Worker:处理请求业务逻辑,使用独立线程池。
7. WebClient 的线程模型
WebClient 是 Spring WebFlux 提供的响应式 HTTP 客户端,用于构建端到端响应式应用。
7.1. 使用 WebClient
无需额外依赖,直接使用:
@GetMapping("/index")
public Mono<String> getIndex() {
return Mono.just("Hello World!");
}
用 WebClient 调用该接口:
WebClient.create("http://localhost:8080/index").get()
.retrieve()
.bodyToMono(String.class)
.doOnNext(s -> printThreads());
7.2. 线程模型解析
- ✅ WebClient 默认也采用事件循环模型。
- ✅ 在 Reactor Netty 上运行时,与服务器共享同一个事件循环,线程无明显变化。
- ⚠️ 在 Jetty 等 Servlet 容器上,WebClient 需创建独立的事件循环,因此会新增线程:
- ✅ 为 WebClient 分配独立线程池可提升性能,避免客户端操作阻塞服务器事件循环。
8. 数据访问库的线程模型
典型应用常需连接数据库、消息中间件等。传统库多为阻塞式,但响应式库正快速普及。
8.1. Spring Data MongoDB
通过添加依赖启用响应式 MongoDB 支持:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-mongodb-reactive</artifactId>
</dependency>
定义响应式仓库:
public interface PersonRepository extends ReactiveMongoRepository<Person, ObjectId> {
}
// 使用
personRepository.findAll().doOnComplete(this::printThreads);
- ✅ Spring Data 响应式仓库默认复用服务器的事件循环,在 Netty 上运行时线程无显著变化。
8.2. Reactor Kafka
Spring 官方对响应式 Kafka 的支持仍在完善,但可直接使用 Reactor Kafka。
添加依赖:
<dependency>
<groupId>io.projectreactor.kafka</groupId>
<artifactId>reactor-kafka</artifactId>
<version>1.3.10</version>
</dependency>
非阻塞生产消息:
SenderOptions<Integer, String> senderOptions = SenderOptions.create(producerProps);
KafkaSender<Integer, String> sender = KafkaSender.create(senderOptions);
Flux<SenderRecord<Integer, String, Integer>> outboundFlux = Flux
.range(1, 10)
.map(i -> SenderRecord.create(new ProducerRecord<>("reactive-test", i, "Message_" + i), i));
sender.send(outboundFlux).subscribe();
非阻塞消费消息:
ReceiverOptions<Integer, String> receiverOptions = ReceiverOptions.create(consumerProps);
receiverOptions.subscription(Collections.singleton("reactive-test"));
KafkaReceiver<Integer, String> receiver = KafkaReceiver.create(receiverOptions);
Flux<ReceiverRecord<Integer, String>> inboundFlux = receiver.receive();
inboundFlux.doOnComplete(this::printThreads);
线程情况:
- ✅ Reactor Kafka 管理独立的线程池,用于 Kafka 消息的收发处理。
- ✅ 生产者使用专用网络线程发送请求,响应通过单线程调度器返回。
- ✅ 消费者每组一个线程监听消息,实际处理由另一线程池执行。
9. WebFlux 中的调度选项
响应式编程在完全非阻塞环境下表现优异,但一旦出现阻塞操作,可能冻结整个事件循环,导致严重性能问题。
9.1. Reactor
Reactor 提供 Scheduler
控制执行上下文:
- ✅
Schedulers.immediate
:立即执行。 - ✅
Schedulers.single
:单线程。 - ✅
Schedulers.elastic
:弹性线程池,适合阻塞任务。 - ✅
Schedulers.parallel
:固定大小线程池,适合 CPU 密集型任务。
切换执行上下文的方法:
- ✅
publishOn(scheduler)
:影响其后所有操作符。 - ✅
subscribeOn(scheduler)
:影响数据源的发射线程。
为 WebClient 创建独立线程池示例:
Scheduler scheduler = Schedulers.newBoundedElastic(5, 10, "MyThreadGroup");
WebClient.create("http://localhost:8080/index").get()
.retrieve()
.bodyToMono(String.class)
.publishOn(scheduler)
.doOnNext(s -> printThreads());
效果:
可见名为 MyThreadGroup
的线程池已创建,避免阻塞主事件循环。
9.2. RxJava
RxJava 的调度机制与 Reactor 类似:
- ✅
Schedulers.io()
:I/O 密集型任务。 - ✅
Schedulers.computation()
:CPU 密集型任务。 - ✅
subscribeOn()
:指定数据源发射线程。 - ✅
observeOn()
:指定通知观察者的线程。
引入 RxJava 依赖:
<dependency>
<groupId>io.reactivex.rxjava2</groupId>
<artifactId>rxjava</artifactId>
<version>2.2.21</version>
</dependency>
使用示例:
io.reactivex.Observable
.fromIterable(Arrays.asList("Tom", "Sawyer"))
.map(s -> s.toUpperCase())
.observeOn(io.reactivex.schedulers.Schedulers.trampoline())
.doOnComplete(this::printThreads);
效果:
可见 RxJava 调度器创建的线程已出现。
10. 总结
本文系统分析了 Spring WebFlux 的并发模型:
- ✅ 响应式编程通过异步事件流和非阻塞 I/O,用更少线程实现高并发。
- ✅ Netty 使用事件循环模型,线程数通常等于 CPU 核心数。
- ✅ Tomcat 使用 Acceptor-Poller-Worker 模型,工作线程较多。
- ✅ WebClient 和数据访问库(如 Reactor Kafka)可能引入独立线程池。
- ✅ 关键技巧:使用
publishOn
/subscribeOn
将阻塞操作调度到专用线程池,保护事件循环。
掌握这些线程模型,才能在实践中避免性能瓶颈,充分发挥响应式架构的优势。
示例代码已上传至 GitHub:https://github.com/eugenp/tutorials/tree/master/spring-reactive-modules/spring-reactive