1. 概述

在Spring WebFlux响应式流中使用条件语句,可以在处理数据流时实现动态决策。与命令式编程不同,响应式编程的条件逻辑不局限于if-else语句。我们可以利用多种操作符(如map()filter()switchIfEmpty()等)来构建条件流,同时保持非阻塞特性。

本文将探讨在Spring WebFlux中实现条件逻辑的多种方法。除非特别说明,这些方法同时适用于MonoFlux

2. 使用map()进行条件构建

map()操作符可用于转换流中的元素。我们可以在映射函数中使用if-else语句实现条件转换:

Flux<String> oddEvenFlux = Flux.just(1, 2, 3, 4, 5, 6)
  .map(num -> {
    if (num % 2 == 0) {
      return "Even";
    } else {
      return "Odd";
    }
  });

⚠️ 注意:map()是同步操作,元素发出后会立即执行转换。

使用StepVerifier验证条件标记逻辑:

StepVerifier.create(oddEvenFlux)
  .expectNext("Odd")
  .expectNext("Even")
  .expectNext("Odd")
  .expectNext("Even")
  .expectNext("Odd")
  .expectNext("Even")
  .verifyComplete();

✅ 每个数字都根据奇偶性正确标记。

3. 使用filter()

filter()操作符通过谓词(predicate)过滤元素,确保只有符合条件的元素传递到下游:

Flux<Integer> evenNumbersFlux = Flux.just(1, 2, 3, 4, 5, 6)
  .filter(num -> num % 2 == 0);

验证过滤结果:

StepVerifier.create(evenNumbersFlux)
  .expectNext(2)
  .expectNext(4)
  .expectNext(6)
  .verifyComplete();

✅ 仅偶数通过过滤。

4. 使用switchIfEmpty()defaultIfEmpty()

当上游流为空时,这两个操作符提供条件处理方案。

4.1. 使用switchIfEmpty()

当上游流为空时切换到备用流:

Flux<String> flux = Flux.just("A", "B", "C", "D", "E")
  .filter(word -> word.length() >= 2)
  .switchIfEmpty(Flux.defer(() -> Flux.just("AA", "BB", "CC")));

Flux.defer()确保备用流仅在需要时创建。验证切换逻辑:

StepVerifier.create(flux)
  .expectNext("AA")
  .expectNext("BB")
  .expectNext("CC")
  .verifyComplete();

✅ 上游为空时成功切换到备用流。

4.2. 使用defaultIfEmpty()

当上游流为空时提供默认值:

flux = flux.defaultIfEmpty("No words found!");

验证默认值逻辑:

StepVerifier.create(flux)
  .expectNext("No words found!")
  .verifyComplete();

✅ 上游为空时返回默认值。

5. 使用flatMap()

flatMap()操作符可创建多个条件分支,同时保持异步非阻塞特性:

Flux<String> flux = Flux.just("A", "B", "C")
  .flatMap(word -> {
    if (word.startsWith("A")) {
      return Flux.just(word + "1", word + "2", word + "3");
    } else {
      return Flux.just(word);
    }
  })
  .flatMap(word -> {
    if (word.startsWith("B")) {
      return Flux.just(word + "1", word + "2");
    } else {
      return Flux.just(word);
    }
  });

验证条件分支逻辑:

StepVerifier.create(flux)
  .expectNext("A1")
  .expectNext("A2")
  .expectNext("A3")
  .expectNext("B1")
  .expectNext("B2")
  .expectNext("C")
  .verifyComplete();

✅ 多级条件分支正确执行。
⚠️ 对于Mono类型,可使用flatMapMany()实现类似功能。

6. 使用副作用操作符

6.1. 使用doOnNext()

为每个元素执行同步副作用操作:

AtomicInteger evenCounter = new AtomicInteger(0);
Flux<Integer> flux = Flux.just(1, 2, 3, 4, 5, 6)
  .doOnNext(num -> {
    if (num % 2 == 0) {
      evenCounter.incrementAndGet();
    }
  });

验证计数器状态:

StepVerifier.create(flux)
  .expectNextMatches(num -> num == 1 && evenCounter.get() == 0)
  .expectNextMatches(num -> num == 2 && evenCounter.get() == 1)
  .expectNextMatches(num -> num == 3 && evenCounter.get() == 1)
  .expectNextMatches(num -> num == 4 && evenCounter.get() == 2)
  .expectNextMatches(num -> num == 5 && evenCounter.get() == 2)
  .expectNextMatches(num -> num == 6 && evenCounter.get() == 3)
  .verifyComplete();

✅ 偶数计数器正确递增。

6.2. 使用doOnComplete()

在流完成时执行操作:

AtomicBoolean done = new AtomicBoolean(false);
Flux<Integer> flux = Flux.just(1, 2, 3, 4, 5, 6)
  .doOnComplete(() -> done.set(true));

验证完成信号处理:

StepVerifier.create(flux)
  .expectNextMatches(num -> num == 1 && !done.get())
  .expectNextMatches(num -> num == 2 && !done.get())
  .expectNextMatches(num -> num == 3 && !done.get())
  .expectNextMatches(num -> num == 4 && !done.get())
  .expectNextMatches(num -> num == 5 && !done.get())
  .expectNextMatches(num -> num == 6 && !done.get())
  .then(() -> Assertions.assertTrue(done.get()))
  .expectComplete()
  .verify();

✅ 完成信号仅在所有元素发出后触发。
⚠️ Mono类型应使用doOnSuccess()

7. 使用firstWithValue()

从多个源中选择最先发出值的源(适用于低延迟场景):

Mono<String[]> source1 = Mono.defer(() -> Mono.just(new String[] { "val", "source1" })
  .delayElement(Duration.ofMillis(200)));
Mono<String[]> source2 = Mono.defer(() -> Mono.just(new String[] { "val", "source2" })
  .delayElement(Duration.ofMillis(10)));

Mono<String[]> mono = Mono.firstWithValue(source1, source2);

验证低延迟源优先:

StepVerifier.create(mono)
  .expectNextMatches(item -> "val".equals(item[0]) && "source2".equals(item[1]))
  .verifyComplete();

✅ 延迟更低的source2被优先选择。
⚠️ 仅适用于Mono类型。

8. 使用zip()zipWhen()

8.1. 使用zip()

组合多个源并添加条件逻辑(如数据一致性检查):

Mono<String> dataFromDB = Mono.defer(() -> Mono.just("db_val")
  .delayElement(Duration.ofMillis(200)));
Mono<String> dataFromCache = Mono.defer(() -> Mono.just("cache_val")
  .delayElement(Duration.ofMillis(10)));

Mono<String[]> mono = Mono.zip(dataFromDB, dataFromCache, 
  (dbValue, cacheValue) -> 
  new String[] { dbValue, dbValue.equals(cacheValue) ? "VALID_CACHE" : "INVALID_CACHE" });

验证缓存一致性检查:

StepVerifier.create(mono)
  .expectNextMatches(item -> "db_val".equals(item[0]) && "INVALID_CACHE".equals(item[1]))
  .verifyComplete();

✅ 正确识别缓存不一致。

8.2. 使用zipWhen()

仅在第一个源有值时处理第二个源:

int userId = 1;
Mono<String> userAgeCategory = Mono.defer(() -> Mono.just(userId))
  .zipWhen(id -> Mono.defer(() -> Mono.just(20)), (id, age) -> age >= 18 ? "ADULT" : "KID");

验证用户分类:

StepVerifier.create(userAgeCategory)
  .expectNext("ADULT")
  .verifyComplete();

✅ 有效用户被正确分类。
模拟无效用户场景:

Mono<String> noUserDetail = Mono.empty()
  .zipWhen(id -> Mono.just(20), (id, age) -> age >= 18 ? "ADULT" : "KID");

StepVerifier.create(noUserDetail)
  .verifyComplete();

✅ 无效用户不产生输出。
⚠️ 仅适用于Mono类型。

9. 结论

本文系统介绍了在Spring WebFlux中实现条件逻辑的多种方法,包括:

  • ✅ 基础转换:map()filter()
  • ✅ 空值处理:switchIfEmpty()defaultIfEmpty()
  • ✅ 分支处理:flatMap()
  • ✅ 副作用操作:doOnNext()doOnComplete()
  • ✅ 竞争选择:firstWithValue()
  • ✅ 组合处理:zip()zipWhen()

这些操作符为响应式流提供了灵活的条件控制能力,同时保持了非阻塞特性。实际开发中应根据具体场景选择最合适的方案,避免踩坑。

本文代码示例可在GitHub仓库获取。


原始标题:Conditional Statements in Spring WebFlux Reactive Flow | Baeldung