1. 概述
在响应式编程中,经常需要将传统集合(Collections)转换为响应式流(Flux)。这是将现有数据结构集成到响应式管道中的关键步骤。
本文将探讨如何将元素集合转换为Flux流,特别聚焦于Project Reactor中的两种核心操作符:fromIterable和create。
2. 问题定义
Project Reactor中的两种主要发布器(Publisher)类型是Flux和Mono:
- ✅ Mono:最多发射一个值
- ✅ Flux:可发射任意数量的值
当获取*List
- 包装为*Mono<List
>*:一次性发射整个列表 - 转换为*Flux
*:允许订阅者按需分块处理数据
⚠️ 关键区别:对于大型列表,使用Flux能让订阅者按需请求数据,实现逐个或小批量处理,避免内存压力。
本文将重点讨论如何将已包含T类型元素的List转换为*Flux
- fromIterable:直接从集合创建流
- create:通过编程方式动态创建流
3. fromIterable操作符
先创建一个整数列表作为示例:
List<Integer> list = List.of(1, 2, 3);
fromIterable是Flux的核心操作符,用于发射集合中的所有元素。通过*log()*操作符可以观察发射过程:
private <T> Flux<T> listToFluxUsingFromIterableOperator(List<T> list) {
return Flux
.fromIterable(list)
.log();
}
测试转换效果:
@Test
public void givenList_whenCallingFromIterableOperator_thenListItemsTransformedAsFluxAndEmitted(){
List<Integer> list = List.of(1, 2, 3);
Flux<Integer> flux = listToFluxUsingFromIterableOperator(list);
StepVerifier.create(flux)
.expectNext(1)
.expectNext(2)
.expectNext(3)
.expectComplete()
.verify();
}
✅ 验证要点:
- 使用StepVerifier验证发射元素
- expectNext确保元素顺序与原列表一致
- expectComplete确认流正常结束
4. create操作符
create操作符通过FluxSink API以编程方式创建流。虽然fromIterable适合大多数场景,但当列表由回调动态生成时,create更灵活。
首先定义回调接口:
public interface Callback<T> {
void onTrigger(T element);
}
模拟异步API调用返回列表:
private void asynchronousApiCall(Callback<List<Integer>> callback) {
Thread thread = new Thread(()-> {
List<Integer> list = List.of(1, 2,3);
callback.onTrigger(list);
});
thread.start();
}
在回调中使用FluxSink逐个发射元素:
@Test
public void givenList_whenCallingCreateOperator_thenListItemsTransformedAsFluxAndEmitted() {
Flux<Integer> flux = Flux.create(sink -> {
Callback<List<Integer>> callback = list -> {
list.forEach(sink::next);
sink.complete();
};
asynchronousApiCall(callback);
});
StepVerifier.create(flux)
.expectNext(1)
.expectNext(2)
.expectNext(3)
.expectComplete()
.verify();
}
⚠️ 关键点:
- 在回调中逐个调用sink.next()
- 显式调用*sink.complete()*结束流
- 适合异步数据源场景
5. 总结
本文探讨了将*List
操作符 | 适用场景 | 优点 |
---|---|---|
fromIterable | 已有静态集合 | 简单直接,代码简洁 |
create | 回调生成的动态集合 | 灵活控制发射时机和过程 |
✅ 最佳实践建议:
- 优先使用fromIterable处理静态集合
- 在异步回调场景下选择create操作符
- 避免在内存敏感场景一次性发射超大列表
完整源码可在GitHub仓库获取。