1. 概述

在响应式编程中,经常需要将传统集合(Collections)转换为响应式流(Flux)。这是将现有数据结构集成到响应式管道中的关键步骤。

本文将探讨如何将元素集合转换为Flux流,特别聚焦于Project Reactor中的两种核心操作符:fromIterablecreate

2. 问题定义

Project Reactor中的两种主要发布器(Publisher)类型是FluxMono

  • Mono:最多发射一个值
  • Flux:可发射任意数量的值

当获取*List*时,有两种处理方式:

  1. 包装为*Mono<List>*:一次性发射整个列表
  2. 转换为*Flux*:允许订阅者按需分块处理数据

⚠️ 关键区别:对于大型列表,使用Flux能让订阅者按需请求数据,实现逐个或小批量处理,避免内存压力。

本文将重点讨论如何将已包含T类型元素的List转换为*Flux*,主要使用以下操作符:

  • fromIterable:直接从集合创建流
  • create:通过编程方式动态创建流

3. fromIterable操作符

先创建一个整数列表作为示例:

List<Integer> list = List.of(1, 2, 3);

fromIterableFlux的核心操作符,用于发射集合中的所有元素。通过*log()*操作符可以观察发射过程:

private <T> Flux<T> listToFluxUsingFromIterableOperator(List<T> list) {
    return Flux
      .fromIterable(list)
      .log();
}

测试转换效果:

@Test
public void givenList_whenCallingFromIterableOperator_thenListItemsTransformedAsFluxAndEmitted(){

    List<Integer> list = List.of(1, 2, 3);
    Flux<Integer> flux = listToFluxUsingFromIterableOperator(list);

    StepVerifier.create(flux)
      .expectNext(1)
      .expectNext(2)
      .expectNext(3)
      .expectComplete()
      .verify();
}

✅ 验证要点:

  • 使用StepVerifier验证发射元素
  • expectNext确保元素顺序与原列表一致
  • expectComplete确认流正常结束

4. create操作符

create操作符通过FluxSink API以编程方式创建流。虽然fromIterable适合大多数场景,但当列表由回调动态生成时,create更灵活。

首先定义回调接口:

public interface Callback<T>  {
    void onTrigger(T element);
}

模拟异步API调用返回列表:

private void asynchronousApiCall(Callback<List<Integer>> callback) {
    Thread thread = new Thread(()-> {
      List<Integer> list = List.of(1, 2,3);
      callback.onTrigger(list);
    });
    thread.start();
}

在回调中使用FluxSink逐个发射元素:

@Test
public void givenList_whenCallingCreateOperator_thenListItemsTransformedAsFluxAndEmitted() {

    Flux<Integer> flux = Flux.create(sink -> {
      Callback<List<Integer>> callback = list -> {
        list.forEach(sink::next);
        sink.complete();
      };
      asynchronousApiCall(callback);
    });

    StepVerifier.create(flux)
      .expectNext(1)
      .expectNext(2)
      .expectNext(3)
      .expectComplete()
      .verify();
}

⚠️ 关键点:

  • 在回调中逐个调用sink.next()
  • 显式调用*sink.complete()*结束流
  • 适合异步数据源场景

5. 总结

本文探讨了将*List转换为Flux*的两种核心方法:

操作符 适用场景 优点
fromIterable 已有静态集合 简单直接,代码简洁
create 回调生成的动态集合 灵活控制发射时机和过程

✅ 最佳实践建议:

  • 优先使用fromIterable处理静态集合
  • 在异步回调场景下选择create操作符
  • 避免在内存敏感场景一次性发射超大列表

完整源码可在GitHub仓库获取。


原始标题:How to convert List to Flux in Project Reactor | Baeldung