1. 概述

本文将探讨在 Java 8 中基于固定最大尺寸对 Stream 进行分区的多种技术。我们将从回顾 List 的分区方法开始,随后通过引入 Stream 特有的功能(如惰性求值和线程安全)来优化方案。

核心要点

  • ✅ 支持固定尺寸分区
  • ✅ 兼容并行流处理
  • ⚠️ 需注意线程安全实现
  • ✅ 提供完整代码示例

2. 分区 List

在 Java 中有多种分区 List的方法。最简单粗暴的方案是先根据目标批次大小和源列表尺寸计算出批次数:

static <T> Stream<List<T>> partitionList(List<T> source, int batchSize) {
    int nrOfFullBatches = (source.size() - 1) / batchSize;
    // ...
}

要将源列表分割成更小的子列表,首先需要计算每个批次的起始和结束索引。计算时需注意最后一个批次可能小于标准尺寸:

int startIndex = batch * batchSize;
int endIndex = (batch == nrOfFullBatches) ? source.size() : (batch + 1) * batchSize;

最后添加验证逻辑处理边界情况(如空列表或负批次大小):

static <T> Stream<List<T>> partitionList(List<T> source, int batchSize) {
    if (batchSize <= 0) {
        throw new IllegalArgumentException(String.format("The batchSize cannot be smaller than 0. Actual value: %s", batchSize));
    }
    if (source.isEmpty()) {
        return Stream.empty();
    }
    int nrOfFullBatches = (source.size() - 1) / batchSize;
    return IntStream.rangeClosed(0, nrOfFullBatches)
      .mapToObj(batch -> {
          int startIndex = batch * batchSize;
          int endIndex = (batch == nrOfFullBatches) ? source.size() : (batch + 1) * batchSize;
          return source.subList(startIndex, endIndex);
      });
}

测试用例验证:输入 1-8 的数字列表,批次大小为 3,期望得到三个子列表:

@Test
void whenPartitionList_thenReturnThreeSubLists() {
    List<Integer> source = List.of(1, 2, 3, 4, 5, 6, 7, 8);

    Stream<List<Integer>> result = partitionList(source, 3);

    assertThat(result).containsExactlyInAnyOrder(
      List.of(1, 2, 3),
      List.of(4, 5, 6),
      List.of(7, 8)
    );
}

3. 分区并行 Stream

Stream 具有惰性求值和并行处理等特性。要充分利用这些特性,需要创建自定义的 Collector。由于目标返回类型是子列表的列表,我们将复用 Collectors.toList() 的部分功能(称为下游收集器):

static <T> List<List<T>> partitionStream(Stream<T> source, int batchSize) {
    return source.collect(partitionBySize(batchSize, Collectors.toList()));
}

static <T, A, R> Collector<T, ?, R> partitionBySize(int batchSize, Collector<List<T>, A, R> downstream) {
    return Collector.of( ... );
}

通过静态工厂方法 Collector.of() 创建 Collector。各参数含义如下:

  • supplier:新收集器的供应函数
  • accumulator:新收集器的累加函数
  • combiner:新收集器的合并函数
  • finisher:新收集器的完成函数
  • characteristics:新收集器的特征集

3.1. Supplier 设计

使用临时对象累加数据并分割批次。这个累加器通常作为实现细节隐藏:

static class Accumulator<T, A> {
    private final List<T> values = new ArrayList<>();
    private final int batchSize;
    private A downstreamAccumulator;
    private final BiConsumer<A, List<T>> batchFullListener;

    // constructor
}

添加新值时,若列表大小达到批次限制,触发监听器并清空列表:

void add(T value) {
    values.add(value);
    if (values.size() == batchSize) {
        batchFullListener.accept(downstreamAccumulator, new ArrayList<>(values));
        values.clear();
    }
}

创建 Supplier 实例化 Accumulator,批次满时委托给下游累加器:

Supplier<Accumulator> supplier =  () -> new Accumulator<>(
  batchSize,
  downstream.supplier().get(),
  downstream.accumulator()::accept
);

3.2. Accumulator 实现

自定义 Collector 的第二个参数是接受 Accumulator 和新值的 BiConsumer。直接委托给 Accumulator 的 add 方法:

BiConsumer<Accumulator<T, A>, T> accumulator = (acc, value) -> acc.add(value);

3.3. Combiner 实现

Combiner 函数合并两个 Accumulator。先合并下游累加器,再将第二个累加器的值添加到第一个:

BinaryOperator<Accumulator<T, A>> combiner = (acc1, acc2) -> {
    acc1.downstreamAccumulator = downstream.combiner()
      .apply(acc1.downstreamAccumulator, acc2.downstreamAccumulator);
    acc2.values.forEach(acc1::add);
    return acc1;
};

重构后封装到 Accumulator 类中:

static class Accumulator<T, A> {
    // ... existing fields ...

    Accumulator<T, A> combine(Accumulator<T, A> other, BinaryOperator<A> accumulatorCombiner) {
        this.downstreamAccumulator = accumulatorCombiner.apply(downstreamAccumulator, other.downstreamAccumulator);
        other.values.forEach(this::add);
        return this;
    }
}

简化 Combiner 为单行代码:

BinaryOperator<Accumulator<T, A>> combiner = (acc1, acc2) -> acc1.combine(acc2, downstream.combiner());

3.4. Finisher 实现

将自定义 Accumulator 转换为最终结果(List 的 List)。依赖下游收集器聚合所有批次:

Function<Accumulator<T, A>, R> finisher = acc -> {
    if (!acc.values.isEmpty()) {
        downstream.accumulator().accept(acc.downstreamAccumulator, acc.values);
    }
    return downstream.finisher().apply(acc.downstreamAccumulator);
};

3.5. Collector 特征配置

收集器设计为线程安全,适用于并行流。并行处理导致无法保证元素顺序,因此设置 UNORDERED 特征:

static <T, A, R> Collector<T, ?, R> partitionBySize(int batchSize, Collector<List, A, R> downstream) {
    // ...
    return Collector.of(
      supplier,
      accumulator,
      combiner,
      finisher,
      Collector.Characteristics.UNORDERED
    );
}

3.6. 完整解决方案

整合所有组件的完整实现:

static <T> List<List<T>> partitionStream(Stream<T> source, int batchSize) {
    return source.collect(partitionBySize(batchSize, Collectors.toList()));
}

static <T, A, R> Collector<T, ?, R> partitionBySize(int batchSize, Collector<List<T>, A, R> downstream) {
    Supplier<Accumulator<T, A>> supplier = () -> new Accumulator<>(
      batchSize, 
      downstream.supplier().get(), 
      downstream.accumulator()::accept
    );

    BiConsumer<Accumulator<T, A>, T> accumulator = (acc, value) -> acc.add(value);

    BinaryOperator<Accumulator<T, A>> combiner = (acc1, acc2) -> acc1.combine(acc2, downstream.combiner());

    Function<Accumulator<T, A>, R> finisher = acc -> {
        if (!acc.values.isEmpty()) {
            downstream.accumulator().accept(acc.downstreamAccumulator, acc.values);
        }
        return downstream.finisher().apply(acc.downstreamAccumulator);
    };
    
    return Collector.of(supplier, accumulator, combiner, finisher, Collector.Characteristics.UNORDERED);
}

测试并行流分区(1-8 的数字,批次大小 3):

@Test
void whenPartitionStream_thenReturnThreeSubLists() {
    Stream<Integer> source = Stream.of(1, 2, 3, 4, 5, 6, 7, 8).parallel();

    List<List<Integer>> result = partitionStream(source, 3);

    assertThat(result)
      .hasSize(3)
      .satisfies(batch -> assertThat(batch).hasSize(3), atIndex(0))
      .satisfies(batch -> assertThat(batch).hasSize(3), atIndex(1))
      .satisfies(batch -> assertThat(batch).hasSize(2), atIndex(2));
}

4. 使用 Guava 分区 Stream

避免重复造轮子,直接使用成熟的第三方库(如 Google Guava)实现分区。Guava 提供了将 Stream 分解为同类型 List 的 Iterable 的简洁方法。

添加 Maven 依赖:

<dependency>
    <groupId>com.google.guava</groupId>
    <artifactId>guava</artifactId>
    <version>33.0.0-jre</version>
</dependency>

使用 Iterables.partition() 静态方法:

static <T> Iterable<List<T>> partitionUsingGuava(Stream<T> source, int batchSize) {
    return Iterables.partition(source::iterator, batchSize);
}

测试用例(返回类型变为 Iterable):

@Test
void whenPartitionParallelStreamWithGuava_thenReturnThreeSubLists() {
    Stream<Integer> source = Stream.of(1, 2, 3, 4, 5, 6, 7, 8).parallel();

    Iterable<List<Integer>> result = partitionUsingGuava(source, 3);

    assertThat(result)
      .map(ArrayList::new)
      .hasSize(3)
      .satisfies(batch -> assertThat(batch).asList().hasSize(3), atIndex(0))
      .satisfies(batch -> assertThat(batch).asList().hasSize(3), atIndex(1))
      .satisfies(batch -> assertThat(batch).asList().hasSize(2), atIndex(2));
}

5. 总结

本文探讨了 Java 中分区 Stream 的多种方案:

  1. List 分区:基于索引计算的传统方法
  2. 并行流分区:通过自定义 Collector 实现线程安全分区
  3. Guava 方案:利用 Iterables.partition() 简化实现

方案对比

方案 线程安全 并行支持 实现复杂度
List 分区
自定义 Collector
Guava

完整示例代码可在 GitHub 获取。


原始标题:Partition a Stream in Java