1. 概述

本文将对比 Java 19 的虚拟线程与 Project Reactor 的 WebFlux 框架。首先回顾两者的核心工作原理,然后分析各自的优缺点:

  • ✅ 先探讨响应式框架的优势,说明 WebFlux 的价值
  • ✅ 再讨论线程每请求模型,分析虚拟线程的适用场景
  • ❌ 避免基础概念赘述(假设读者熟悉并发编程)

2. 代码示例

假设我们正在开发电商应用后端,核心需求是计算并发布购物车商品价格。以下是基础实现:

class ProductService {
    private final String PRODUCT_ADDED_TO_CART_TOPIC = "product-added-to-cart";

    private final ProductRepository repository;
    private final DiscountService discountService;
    private final KafkaTemplate<String, ProductAddedToCartEvent> kafkaTemplate;

    // constructor

    public void addProductToCart(String productId, String cartId) {
        Product product = repository.findById(productId)
          .orElseThrow(() -> new IllegalArgumentException("not found!"));

        Price price = product.basePrice();
        if (product.category().isEligibleForDiscount()) {
            BigDecimal discount = discountService.discountForProduct(productId);
            price.setValue(price.getValue().subtract(discount));
        }

        var event = new ProductAddedToCartEvent(productId, price.getValue(), price.getCurrency(), cartId);
        kafkaTemplate.send(PRODUCT_ADDED_TO_CART_TOPIC, cartId, event);
    }
}

执行流程:

  1. 从 MongoDB 查询商品(MongoRepository
  2. 若商品符合折扣条件,通过 HTTP 请求获取折扣(DiscountService
  3. 计算最终价格
  4. 发送 Kafka 消息(包含商品ID、购物车ID和价格)

3. WebFlux

WebFlux 是构建异步、非阻塞、事件驱动应用的框架,基于响应式编程原理,通过 FluxMono 类型处理异步通信。这些类型实现了发布-订阅模式,解耦数据的生产者和消费者。

3.1. 响应式库集成

Spring 生态的多个模块可与 WebFlux 集成。重构代码时需替换关键组件:

  • MongoRepository 替换为 ReactiveMongoRepository,返回 Mono<Product> 而非 Optional<Product>

    Mono<Product> product = repository.findById(productId)
      .switchIfEmpty(Mono.error(() -> new IllegalArgumentException("not found!")));
    
  • 使用 WebClient 替代阻塞式 HTTP 客户端,返回 Mono<BigDecimal>

    Mono<BigDecimal> discount = discountService.discountForProduct(productId);
    

3.2. 不可变性设计

函数式/响应式编程优先使用不可变对象。原代码通过 setter 修改 Price 值,重构后改为不可变设计:

record Price(BigDecimal value, String currency) {  
    public Price applyDiscount(BigDecimal discount) {
        return new Price(value.subtract(discount), currency);
    }
}

使用 map() 应用折扣:

Mono<Price> price = discountService.discountForProduct(productId)
  .map(discount -> price.applyDiscount(discount));

或更简洁的方法引用:

Mono<Price> price = discountService.discountForProduct(productId).map(price::applyDiscount);

3.3. 函数式管道

Mono/Flux 遵循函子和单子模式,通过 map()/flatMap() 构建不可变数据的转换管道。业务流程可分解为:

  1. 原始 productId
  2. → 转换为 Product
  3. → 计算为 Price
  4. → 生成 event
  5. → 发布到消息队列

重构后的管道代码:

void addProductToCart(String productId, String cartId) {
    repository.findById(productId)
      .switchIfEmpty(Mono.error(() -> new IllegalArgumentException("not found!")))
      .flatMap(this::computePrice)
      .map(price -> new ProductAddedToCartEvent(productId, price.value(), price.currency(), cartId))
      .subscribe(event -> kafkaTemplate.send(PRODUCT_ADDED_TO_CART_TOPIC, cartId, event));
}

Mono<Price> computePrice(Product product) {
    if (product.category().isEligibleForDiscount()) {
        return discountService.discountForProduct(product.id())
          .map(product.basePrice()::applyDiscount);
    }
    return Mono.just(product.basePrice());
}

4. 虚拟线程

虚拟线程是 Project Loom 引入的轻量级并发方案,由 JVM 管理的用户态线程。特别适合 I/O 密集型场景,避免传统线程在等待外部资源时的阻塞。

与响应式方案不同,虚拟线程允许继续使用线程每请求模型,保持代码的顺序性。

4.1. 虚拟线程使用方式

执行单任务时,使用 Thread.startVirtualThread()

public void addProductToCart(String productId, String cartId) {
    Thread.startVirtualThread(() -> computePriceAndPublishMessage(productId, cartId));
}

private void computePriceAndPublishMessage(String productId, String cartId) {
    // 业务逻辑
}

批量任务场景,使用虚拟线程池:

ExecutorService executor = Executors.newVirtualThreadPerTaskExecutor();

Spring Boot 3+ 可直接配置虚拟线程作为默认执行器。

4.2. 兼容性优势

虚拟线程采用传统同步模型,代码迁移成本极低。原阻塞代码几乎无需修改:

void addProductToCart(String productId, String cartId) {
    Thread.startVirtualThread(() -> computePriceAndPublishMessage(productId, cartId));
}

void computePriceAndPublishMessage(String productId, String cartId) {
    Product product = repository.findById(productId)
      .orElseThrow(() -> new IllegalArgumentException("not found!"));

    Price price = computePrice(productId, product);

    var event = new ProductAddedToCartEvent(productId, price.value(), price.currency(), cartId);
    kafkaTemplate.send(PRODUCT_ADDED_TO_CART_TOPIC, cartId, event);
}

Price computePrice(String productId, Product product) {
    if (product.category().isEligibleForDiscount()) {
        BigDecimal discount = discountService.discountForProduct(productId);
        return product.basePrice().applyDiscount(discount);
    }
    return product.basePrice();
}

4.3. 可读性提升

线程每请求模型显著降低认知负担

  • ✅ 业务逻辑与技术实现清晰分离
  • ✅ 无需混合响应式 API 与业务代码
  • ✅ 调试和错误追踪更直观

5. 结论

两种方案的核心差异:

维度 WebFlux 虚拟线程
编程模型 响应式(函数式管道) 传统阻塞式
代码迁移 需重构为不可变对象+异步链 几乎零修改
适用场景 高吞吐量、复杂异步流 I/O 密集型、遗留系统升级
学习成本 ⚠️ 较高(需理解响应式概念) ✅ 极低(传统编程思维)

选择建议

  • 新建高吞吐量系统 → WebFlux
  • 遗留系统改造或简单 I/O 场景 → 虚拟线程

所有代码示例可在 GitHub 获取。


原始标题:Reactor WebFlux vs Virtual Threads | Baeldung