Project Reactor 为 JVM 提供了完全非阻塞的编程基础,实现了 Reactive Streams 规范,并提供了 Flux 等可组合异步 API。Flux 作为 Reactive Streams 发布者,可发出 0 到 N 个元素后成功完成或报错。根据不同场景,Flux 提供了多种创建方式,其中 generate()
和 create()
是两个核心方法。
2. 理解 Flux
Flux 是一个能发出 0 到 N 个元素的 Reactive Streams 发布者,内置多个操作符用于生成、编排和转换序列。Flux 要么成功完成,要么以错误结束。
Flux API 提供两类方法:
- 静态工厂方法:创建 Flux 源或从回调生成序列
- 实例方法/操作符:构建异步处理流水线,生成异步序列
接下来我们重点分析 generate()
和 create()
方法的使用场景。
3. Maven 依赖
需要添加以下依赖(以 Maven 3.6.0 为例):
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-core</artifactId>
<version>3.6.0</version>
</dependency>
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-test</artifactId>
<version>3.6.0</version>
<scope>test</scope>
</dependency>
4. Flux Generate
generate()
方法提供简单粗暴的程序化 Flux 创建方式,通过生成函数按需计算并发出元素。主要适用于:
- 元素计算成本高昂(避免计算下游不需要的数据)
- 发出事件受应用状态影响
4.1 核心方法变体
三种重载形式:
generate(Consumer<SynchronousSink<T>> generator)
generate(Callable<S> stateSupplier, BiFunction<S, SynchronousSink<T>, S> generator)
generate(Callable<S> stateSupplier, BiFunction<S, SynchronousSink<T>, S> generator, Consumer<? super S> stateConsumer)
4.2 实战示例
使用带状态的 generate()
生成字符序列:
public class CharacterGenerator {
public Flux<Character> generateCharacters() {
return Flux.generate(() -> 97, (state, sink) -> {
char value = (char) state.intValue();
sink.next(value);
if (value == 'z') {
sink.complete();
}
return state + 1;
});
}
}
关键点解析:
- 初始状态:
Callable
函数返回97
(ASCII 'a') - 生成函数:
BiFunction
接收状态和SynchronousSink
- 踩坑警告:
SynchronousSink
是同步的,每次调用最多只能触发一次next()
- 每次调用生成一个字符并更新状态
- 到达 'z' 时调用
complete()
- 踩坑警告:
使用 StepVerifier
验证序列:
@Test
public void whenGeneratingCharacters_thenCharactersAreProduced() {
CharacterGenerator characterGenerator = new CharacterGenerator();
Flux<Character> characterFlux = characterGenerator.generateCharacters().take(3);
StepVerifier.create(characterFlux)
.expectNext('a', 'b', 'c')
.expectComplete()
.verify();
}
测试逻辑:
- 订阅者仅请求 3 个元素(
take(3)
) - 生成序列被截断为 'a','b','c'
expectNext()
验证预期元素expectComplete()
确认正常结束
5. Flux Create
create()
用于生成不受应用状态影响的 0 到 ∞ 个元素。与 generate()
的核心区别:
- 持续计算元素(而非按需)
- 下游系统消费速度决定元素处理方式:
- 默认缓冲未消费元素
- 可配置溢出策略(OverflowStrategy)
5.1 实战示例
创建字符序列生成器:
public class CharacterCreator {
public Consumer<List<Character>> consumer;
public Flux<Character> createCharacterSequence() {
return Flux.create(sink -> CharacterCreator.this.consumer = items -> items.forEach(sink::next));
}
}
关键特性:
- 使用
FluxSink
替代SynchronousSink
- 支持一次性发出多个元素(
forEach(sink::next)
)
多线程生产消费测试:
@Test
public void whenCreatingCharactersWithMultipleThreads_thenSequenceIsProducedAsynchronously() throws InterruptedException {
CharacterGenerator characterGenerator = new CharacterGenerator();
List<Character> sequence1 = characterGenerator.generateCharacters().take(3).collectList().block();
List<Character> sequence2 = characterGenerator.generateCharacters().take(2).collectList().block();
CharacterCreator characterCreator = new CharacterCreator();
Thread producerThread1 = new Thread(() -> characterCreator.consumer.accept(sequence1));
Thread producerThread2 = new Thread(() -> characterCreator.consumer.accept(sequence2));
List<Character> consolidated = new ArrayList<>();
characterCreator.createCharacterSequence().subscribe(consolidated::add);
producerThread1.start();
producerThread2.start();
producerThread1.join();
producerThread2.join();
assertThat(consolidated).containsExactlyInAnyOrder('a', 'b', 'c', 'a', 'b');
}
执行流程:
- 创建两个字符序列(3个和2个元素)
- 启动两个生产线程,通过
consumer.accept()
注入数据 - 订阅者消费数据到
consolidated
列表 - 验证结果:5个字符乱序合并(异步特性)
6. Flux Create vs. Flux Generate
对比维度 | Flux Create | Flux Generate |
---|---|---|
参数类型 | Consumer<FluxSink> |
Consumer<SynchronousSink> |
调用次数 | 消费者函数仅调用一次 | 按需多次调用消费者函数 |
元素发射 | 可立即发射 0..N 个元素 | 每次调用仅发射一个元素 |
状态感知 | 不感知下游状态(需手动溢出控制) | 根据下游需求生成元素 |
多线程支持 | 支持多线程并发发射 | 不适用(单元素同步发射) |
7. 总结
本文深入剖析了 Project Reactor 中 Flux 的 create()
和 generate()
方法:
- **
generate()
**:适合状态驱动的按需生成,严格同步控制 - **
create()
**:适合批量或异步生成,提供更灵活的溢出策略
选择建议:
- 状态敏感/计算昂贵的场景 → 优先
generate()
- 批量数据/异步生产场景 → 优先
create()
完整示例代码可参考 GitHub 仓库