1. 概述
在Spring WebFlux响应式流中使用条件语句,可以在处理数据流时实现动态决策。与命令式编程不同,响应式编程的条件逻辑不局限于if-else
语句。我们可以利用多种操作符(如map()
、filter()
、switchIfEmpty()
等)来构建条件流,同时保持非阻塞特性。
本文将探讨在Spring WebFlux中实现条件逻辑的多种方法。除非特别说明,这些方法同时适用于Mono
和Flux
。
2. 使用map()
进行条件构建
map()
操作符可用于转换流中的元素。我们可以在映射函数中使用if-else
语句实现条件转换:
Flux<String> oddEvenFlux = Flux.just(1, 2, 3, 4, 5, 6)
.map(num -> {
if (num % 2 == 0) {
return "Even";
} else {
return "Odd";
}
});
⚠️ 注意:map()
是同步操作,元素发出后会立即执行转换。
使用StepVerifier
验证条件标记逻辑:
StepVerifier.create(oddEvenFlux)
.expectNext("Odd")
.expectNext("Even")
.expectNext("Odd")
.expectNext("Even")
.expectNext("Odd")
.expectNext("Even")
.verifyComplete();
✅ 每个数字都根据奇偶性正确标记。
3. 使用filter()
filter()
操作符通过谓词(predicate)过滤元素,确保只有符合条件的元素传递到下游:
Flux<Integer> evenNumbersFlux = Flux.just(1, 2, 3, 4, 5, 6)
.filter(num -> num % 2 == 0);
验证过滤结果:
StepVerifier.create(evenNumbersFlux)
.expectNext(2)
.expectNext(4)
.expectNext(6)
.verifyComplete();
✅ 仅偶数通过过滤。
4. 使用switchIfEmpty()
和defaultIfEmpty()
当上游流为空时,这两个操作符提供条件处理方案。
4.1. 使用switchIfEmpty()
当上游流为空时切换到备用流:
Flux<String> flux = Flux.just("A", "B", "C", "D", "E")
.filter(word -> word.length() >= 2)
.switchIfEmpty(Flux.defer(() -> Flux.just("AA", "BB", "CC")));
Flux.defer()
确保备用流仅在需要时创建。验证切换逻辑:
StepVerifier.create(flux)
.expectNext("AA")
.expectNext("BB")
.expectNext("CC")
.verifyComplete();
✅ 上游为空时成功切换到备用流。
4.2. 使用defaultIfEmpty()
当上游流为空时提供默认值:
flux = flux.defaultIfEmpty("No words found!");
验证默认值逻辑:
StepVerifier.create(flux)
.expectNext("No words found!")
.verifyComplete();
✅ 上游为空时返回默认值。
5. 使用flatMap()
flatMap()
操作符可创建多个条件分支,同时保持异步非阻塞特性:
Flux<String> flux = Flux.just("A", "B", "C")
.flatMap(word -> {
if (word.startsWith("A")) {
return Flux.just(word + "1", word + "2", word + "3");
} else {
return Flux.just(word);
}
})
.flatMap(word -> {
if (word.startsWith("B")) {
return Flux.just(word + "1", word + "2");
} else {
return Flux.just(word);
}
});
验证条件分支逻辑:
StepVerifier.create(flux)
.expectNext("A1")
.expectNext("A2")
.expectNext("A3")
.expectNext("B1")
.expectNext("B2")
.expectNext("C")
.verifyComplete();
✅ 多级条件分支正确执行。
⚠️ 对于Mono
类型,可使用flatMapMany()
实现类似功能。
6. 使用副作用操作符
6.1. 使用doOnNext()
为每个元素执行同步副作用操作:
AtomicInteger evenCounter = new AtomicInteger(0);
Flux<Integer> flux = Flux.just(1, 2, 3, 4, 5, 6)
.doOnNext(num -> {
if (num % 2 == 0) {
evenCounter.incrementAndGet();
}
});
验证计数器状态:
StepVerifier.create(flux)
.expectNextMatches(num -> num == 1 && evenCounter.get() == 0)
.expectNextMatches(num -> num == 2 && evenCounter.get() == 1)
.expectNextMatches(num -> num == 3 && evenCounter.get() == 1)
.expectNextMatches(num -> num == 4 && evenCounter.get() == 2)
.expectNextMatches(num -> num == 5 && evenCounter.get() == 2)
.expectNextMatches(num -> num == 6 && evenCounter.get() == 3)
.verifyComplete();
✅ 偶数计数器正确递增。
6.2. 使用doOnComplete()
在流完成时执行操作:
AtomicBoolean done = new AtomicBoolean(false);
Flux<Integer> flux = Flux.just(1, 2, 3, 4, 5, 6)
.doOnComplete(() -> done.set(true));
验证完成信号处理:
StepVerifier.create(flux)
.expectNextMatches(num -> num == 1 && !done.get())
.expectNextMatches(num -> num == 2 && !done.get())
.expectNextMatches(num -> num == 3 && !done.get())
.expectNextMatches(num -> num == 4 && !done.get())
.expectNextMatches(num -> num == 5 && !done.get())
.expectNextMatches(num -> num == 6 && !done.get())
.then(() -> Assertions.assertTrue(done.get()))
.expectComplete()
.verify();
✅ 完成信号仅在所有元素发出后触发。
⚠️ Mono
类型应使用doOnSuccess()
。
7. 使用firstWithValue()
从多个源中选择最先发出值的源(适用于低延迟场景):
Mono<String[]> source1 = Mono.defer(() -> Mono.just(new String[] { "val", "source1" })
.delayElement(Duration.ofMillis(200)));
Mono<String[]> source2 = Mono.defer(() -> Mono.just(new String[] { "val", "source2" })
.delayElement(Duration.ofMillis(10)));
Mono<String[]> mono = Mono.firstWithValue(source1, source2);
验证低延迟源优先:
StepVerifier.create(mono)
.expectNextMatches(item -> "val".equals(item[0]) && "source2".equals(item[1]))
.verifyComplete();
✅ 延迟更低的source2
被优先选择。
⚠️ 仅适用于Mono
类型。
8. 使用zip()
和zipWhen()
8.1. 使用zip()
组合多个源并添加条件逻辑(如数据一致性检查):
Mono<String> dataFromDB = Mono.defer(() -> Mono.just("db_val")
.delayElement(Duration.ofMillis(200)));
Mono<String> dataFromCache = Mono.defer(() -> Mono.just("cache_val")
.delayElement(Duration.ofMillis(10)));
Mono<String[]> mono = Mono.zip(dataFromDB, dataFromCache,
(dbValue, cacheValue) ->
new String[] { dbValue, dbValue.equals(cacheValue) ? "VALID_CACHE" : "INVALID_CACHE" });
验证缓存一致性检查:
StepVerifier.create(mono)
.expectNextMatches(item -> "db_val".equals(item[0]) && "INVALID_CACHE".equals(item[1]))
.verifyComplete();
✅ 正确识别缓存不一致。
8.2. 使用zipWhen()
仅在第一个源有值时处理第二个源:
int userId = 1;
Mono<String> userAgeCategory = Mono.defer(() -> Mono.just(userId))
.zipWhen(id -> Mono.defer(() -> Mono.just(20)), (id, age) -> age >= 18 ? "ADULT" : "KID");
验证用户分类:
StepVerifier.create(userAgeCategory)
.expectNext("ADULT")
.verifyComplete();
✅ 有效用户被正确分类。
模拟无效用户场景:
Mono<String> noUserDetail = Mono.empty()
.zipWhen(id -> Mono.just(20), (id, age) -> age >= 18 ? "ADULT" : "KID");
StepVerifier.create(noUserDetail)
.verifyComplete();
✅ 无效用户不产生输出。
⚠️ 仅适用于Mono
类型。
9. 结论
本文系统介绍了在Spring WebFlux中实现条件逻辑的多种方法,包括:
- ✅ 基础转换:
map()
、filter()
- ✅ 空值处理:
switchIfEmpty()
、defaultIfEmpty()
- ✅ 分支处理:
flatMap()
- ✅ 副作用操作:
doOnNext()
、doOnComplete()
- ✅ 竞争选择:
firstWithValue()
- ✅ 组合处理:
zip()
、zipWhen()
这些操作符为响应式流提供了灵活的条件控制能力,同时保持了非阻塞特性。实际开发中应根据具体场景选择最合适的方案,避免踩坑。
本文代码示例可在GitHub仓库获取。