Project Reactor 为 JVM 提供了完全非阻塞的编程基础,实现了 Reactive Streams 规范,并提供了 Flux 等可组合异步 API。Flux 作为 Reactive Streams 发布者,可发出 0 到 N 个元素后成功完成或报错。根据不同场景,Flux 提供了多种创建方式,其中 generate()create() 是两个核心方法。

2. 理解 Flux

Flux 是一个能发出 0 到 N 个元素的 Reactive Streams 发布者,内置多个操作符用于生成、编排和转换序列。Flux 要么成功完成,要么以错误结束。

Flux API 提供两类方法:

  • 静态工厂方法:创建 Flux 源或从回调生成序列
  • 实例方法/操作符:构建异步处理流水线,生成异步序列

接下来我们重点分析 generate()create() 方法的使用场景。

3. Maven 依赖

需要添加以下依赖(以 Maven 3.6.0 为例):

<dependency>
    <groupId>io.projectreactor</groupId>
    <artifactId>reactor-core</artifactId>
    <version>3.6.0</version>
</dependency>
<dependency>
    <groupId>io.projectreactor</groupId>
    <artifactId>reactor-test</artifactId>
    <version>3.6.0</version>
    <scope>test</scope>
</dependency>

4. Flux Generate

generate() 方法提供简单粗暴的程序化 Flux 创建方式,通过生成函数按需计算并发出元素。主要适用于:

  • 元素计算成本高昂(避免计算下游不需要的数据)
  • 发出事件受应用状态影响

4.1 核心方法变体

三种重载形式:

  • generate(Consumer<SynchronousSink<T>> generator)
  • generate(Callable<S> stateSupplier, BiFunction<S, SynchronousSink<T>, S> generator)
  • generate(Callable<S> stateSupplier, BiFunction<S, SynchronousSink<T>, S> generator, Consumer<? super S> stateConsumer)

4.2 实战示例

使用带状态的 generate() 生成字符序列:

public class CharacterGenerator {
    
    public Flux<Character> generateCharacters() {
        
        return Flux.generate(() -> 97, (state, sink) -> {
            char value = (char) state.intValue();
            sink.next(value);
            if (value == 'z') {
                sink.complete();
            }
            return state + 1;
        });
    }
}

关键点解析:

  1. 初始状态:Callable 函数返回 97(ASCII 'a')
  2. 生成函数:BiFunction 接收状态和 SynchronousSink
    • 踩坑警告SynchronousSink 是同步的,每次调用最多只能触发一次 next()
    • 每次调用生成一个字符并更新状态
    • 到达 'z' 时调用 complete()

使用 StepVerifier 验证序列:

@Test
public void whenGeneratingCharacters_thenCharactersAreProduced() {
    CharacterGenerator characterGenerator = new CharacterGenerator();
    Flux<Character> characterFlux = characterGenerator.generateCharacters().take(3);

    StepVerifier.create(characterFlux)
      .expectNext('a', 'b', 'c')
      .expectComplete()
      .verify();
}

测试逻辑:

  • 订阅者仅请求 3 个元素(take(3)
  • 生成序列被截断为 'a','b','c'
  • expectNext() 验证预期元素
  • expectComplete() 确认正常结束

5. Flux Create

create() 用于生成不受应用状态影响的 0 到 ∞ 个元素。与 generate() 的核心区别:

  • 持续计算元素(而非按需)
  • 下游系统消费速度决定元素处理方式:
    • 默认缓冲未消费元素
    • 可配置溢出策略(OverflowStrategy)

5.1 实战示例

创建字符序列生成器:

public class CharacterCreator {
    public Consumer<List<Character>> consumer;

    public Flux<Character> createCharacterSequence() {
        return Flux.create(sink -> CharacterCreator.this.consumer = items -> items.forEach(sink::next));
    }
}

关键特性:

  • 使用 FluxSink 替代 SynchronousSink
  • 支持一次性发出多个元素(forEach(sink::next)

多线程生产消费测试:

@Test
public void whenCreatingCharactersWithMultipleThreads_thenSequenceIsProducedAsynchronously() throws InterruptedException {
    CharacterGenerator characterGenerator = new CharacterGenerator();
    List<Character> sequence1 = characterGenerator.generateCharacters().take(3).collectList().block();
    List<Character> sequence2 = characterGenerator.generateCharacters().take(2).collectList().block();

    CharacterCreator characterCreator = new CharacterCreator();
    Thread producerThread1 = new Thread(() -> characterCreator.consumer.accept(sequence1));
    Thread producerThread2 = new Thread(() -> characterCreator.consumer.accept(sequence2));

    List<Character> consolidated = new ArrayList<>();
    characterCreator.createCharacterSequence().subscribe(consolidated::add);

    producerThread1.start();
    producerThread2.start();
    producerThread1.join();
    producerThread2.join();

    assertThat(consolidated).containsExactlyInAnyOrder('a', 'b', 'c', 'a', 'b');
}

执行流程:

  1. 创建两个字符序列(3个和2个元素)
  2. 启动两个生产线程,通过 consumer.accept() 注入数据
  3. 订阅者消费数据到 consolidated 列表
  4. 验证结果:5个字符乱序合并(异步特性)

6. Flux Create vs. Flux Generate

对比维度 Flux Create Flux Generate
参数类型 Consumer<FluxSink> Consumer<SynchronousSink>
调用次数 消费者函数仅调用一次 按需多次调用消费者函数
元素发射 可立即发射 0..N 个元素 每次调用仅发射一个元素
状态感知 不感知下游状态(需手动溢出控制) 根据下游需求生成元素
多线程支持 支持多线程并发发射 不适用(单元素同步发射)

7. 总结

本文深入剖析了 Project Reactor 中 Flux 的 create()generate() 方法:

  • **generate()**:适合状态驱动的按需生成,严格同步控制
  • **create()**:适合批量或异步生成,提供更灵活的溢出策略

选择建议:

  • 状态敏感/计算昂贵的场景 → 优先 generate()
  • 批量数据/异步生产场景 → 优先 create()

完整示例代码可参考 GitHub 仓库


原始标题:Difference Between Flux.create and Flux.generate

« 上一篇: Java Weekly, 第443期