1. 概述

本文将深入探讨如何在 Java 中实现 Stream 数据的批量处理。我们将通过原生 Java 特性和第三方库两种方式展示具体实现方案,帮助开发者掌握高效处理大数据流的技巧。

2. 什么是 Stream 数据批量处理?

Java 中的 Stream 数据批量处理是指将大型数据集分割成更小的、可管理的批次,并按顺序进行处理的技术。这种处理方式的数据源来自数据流。

批量处理的优势包括:

  • ✅ 提升数据处理效率
  • ✅ 处理无法一次性加载到内存的超大数据集
  • ✅ 支持多处理器并行处理

但实现时需注意以下挑战:

  • ⚠️ 批次大小设置:过小会增加处理开销,过大会导致处理延迟
  • ⚠️ 状态管理:分布式系统中跨批次的状态维护复杂度高
  • ⚠️ 容错机制:故障恢复需要保存大量中间状态

本文聚焦于 Java Stream 批量处理的核心实现,暂不深入解决上述复杂问题。

3. 使用 Java Streams API 进行批量处理

首先明确几个关键概念:

⚠️ 重要特性:数据流只能被消费一次,重复操作会抛出 IllegalStateException

Stream<String> coursesStream = Stream.of("Java", "Frontend", "Backend", "Fullstack");
Stream<Integer> coursesStreamLength = coursesStream.map(String::length);
// 第二次操作会抛出 java.lang.IllegalStateException
Stream<String> emphasisCourses = coursesStream.map(course -> course + "!");

准备测试数据:

// 数据流:0-33的整数
Stream<Integer> data = IntStream.range(0, 34).boxed();

// 批次大小
private final int BATCH_SIZE = 10;

// 预期批次结果
private final List<Integer> firstBatch = List.of(0, 1, 2, 3, 4, 5, 6, 7, 8, 9);
private final List<Integer> secondBatch = List.of(10, 11, 12, 13, 14, 15, 16, 17, 18, 19);
private final List<Integer> thirdBatch = List.of(20, 21, 22, 23, 24, 25, 26, 27, 28, 29);
private final List<Integer> fourthBatch = List.of(30, 31, 32, 33);

4. 使用 Iterator 方案

通过自定义 Iterator 实现批量处理:

public class CustomBatchIterator<T> implements Iterator<List<T>> {
    private final int batchSize;
    private List<T> currentBatch;
    private final Iterator<T> iterator;
    
    private CustomBatchIterator(Iterator<T> sourceIterator, int batchSize) {
        this.batchSize = batchSize;
        this.iterator = sourceIterator;
    }
    
    @Override
    public List<T> next() {
        prepareNextBatch();
        return currentBatch;
    }
    
    @Override
    public boolean hasNext() {
        return iterator.hasNext();
    }
    
    private void prepareNextBatch() {
        currentBatch = new ArrayList<>(batchSize);
        while (iterator.hasNext() && currentBatch.size() < batchSize) {
            currentBatch.add(iterator.next());
        }
    }
    
    // 静态工厂方法
    public static <T> Stream<List<T>> batchStreamOf(Stream<T> stream, int batchSize) {
        return stream(new CustomBatchIterator<>(stream.iterator(), batchSize));
    }
    
    private static <T> Stream<T> stream(Iterator<T> iterator) {
        return StreamSupport.stream(Spliterators.spliteratorUnknownSize(iterator, ORDERED), false);
    }
}

测试验证:

@Test
public void givenAStreamOfData_whenIsProcessingInBatchUsingSpliterator_thenFourBatchesAreObtained() {
    Collection<List<Integer>> result = new ArrayList<>();
    CustomBatchIterator.batchStreamOf(data, BATCH_SIZE).forEach(result::add);
    assertTrue(result.contains(firstBatch));
    assertTrue(result.contains(secondBatch));
    assertTrue(result.contains(thirdBatch));
    assertTrue(result.contains(fourthBatch));
}

5. 使用 Collection API 方案

更简洁的实现方式:

@Test
public void givenAStreamOfData_whenIsProcessingInBatchUsingCollectionAPI_thenFourBatchesAreObtained() {
    Collection<List<Integer>> result = data.collect(Collectors.groupingBy(it -> it / BATCH_SIZE))
      .values();
    assertTrue(result.contains(firstBatch));
    assertTrue(result.contains(secondBatch));
    assertTrue(result.contains(thirdBatch));
    assertTrue(result.contains(fourthBatch));
}

并行处理优化(注意执行顺序不可控):

@Test
public void givenAStreamOfData_whenIsProcessingInBatchParallelUsingCollectionAPI_thenFourBatchesAreObtained() {
    Collection<List<Integer>> result = data.parallel()
      .collect(Collectors.groupingBy(it -> it / BATCH_SIZE))
      .values();
    assertTrue(result.contains(firstBatch));
    assertTrue(result.contains(secondBatch));
    assertTrue(result.contains(thirdBatch));
    assertTrue(result.contains(fourthBatch));
}

6. RxJava 方案

添加依赖:

<dependency>
    <groupId>io.reactivex.rxjava3</groupId>
    <artifactId>rxjava</artifactId>
    <version>3.1.5</version>
</dependency>

实现代码:

@Test
public void givenAStreamOfData_whenIsProcessingInBatchUsingRxJavaV3_thenFourBatchesAreObtained() {
    Collection<List<Integer>> result = new ArrayList<>();
    Observable.fromStream(data)
      .buffer(BATCH_SIZE)
      .subscribe(result::add);
    assertTrue(result.contains(firstBatch));
    assertTrue(result.contains(secondBatch));
    assertTrue(result.contains(thirdBatch));
    assertTrue(result.contains(fourthBatch));
}

7. Vavr 方案

添加依赖:

<dependency>
    <groupId>io.vavr</groupId>
    <artifactId>vavr</artifactId>
    <version>1.0.0-alpha-4</version>
</dependency>

实现代码:

@Test
public void givenAStreamOfData_whenIsProcessingInBatchUsingVavr_thenFourBatchesAreObtained() {
    List<List<Integer>> result = Stream.ofAll(data)
      .toList()
      .grouped(BATCH_SIZE)
      .toList();
    assertTrue(result.contains(firstBatch));
    assertTrue(result.contains(secondBatch));
    assertTrue(result.contains(thirdBatch));
    assertTrue(result.contains(fourthBatch));
}

⚠️ 注意:这里的 List 来自 io.vavr.collection,而非 java.util.List

8. Reactor 方案

添加依赖:

<dependency>
    <groupId>io.projectreactor</groupId>
    <artifactId>reactor-core</artifactId>
    <version>3.5.1</version>
</dependency>

实现代码:

@Test
public void givenAStreamOfData_whenIsProcessingInBatchUsingReactor_thenFourBatchesAreObtained() {
    Collection<List<Integer>> result = new ArrayList<>();
    Flux.fromStream(data)
      .buffer(BATCH_SIZE)
      .subscribe(result::add);
    assertTrue(result.contains(firstBatch));
    assertTrue(result.contains(secondBatch));
    assertTrue(result.contains(thirdBatch));
    assertTrue(result.contains(fourthBatch));
}

9. Apache Commons 方案

添加依赖:

<dependency>
    <groupId>org.apache.commons</groupId>
    <artifactId>commons-collections4</artifactId>
    <version>4.4</version>
</dependency>

实现代码:

@Test
public void givenAStreamOfData_whenIsProcessingInBatchUsingApacheCommon_thenFourBatchesAreObtained() {
    Collection<List<Integer>> result = new ArrayList<>(ListUtils
      .partition(data.collect(Collectors.toList()), BATCH_SIZE));
    assertTrue(result.contains(firstBatch));
    assertTrue(result.contains(secondBatch));
    assertTrue(result.contains(thirdBatch));
    assertTrue(result.contains(fourthBatch));
}

⚠️ 注意:需要先将 Stream 转换为 List 再处理

10. Guava 方案

添加依赖:

<dependency>
    <groupId>com.google.guava</groupId>
    <artifactId>guava</artifactId>
    <version>33.0.0-jre</version>
</dependency>

实现代码:

@Test
public void givenAStreamOfData_whenIsProcessingInBatchUsingGuava_thenFourBatchesAreObtained() {
    Collection<List<Integer>> result = new ArrayList<>();
    Iterators.partition(data.iterator(), BATCH_SIZE).forEachRemaining(result::add);
    assertTrue(result.contains(firstBatch));
    assertTrue(result.contains(secondBatch));
    assertTrue(result.contains(thirdBatch));
    assertTrue(result.contains(fourthBatch));
}

11. Cyclops 方案

添加依赖:

<dependency>
    <groupId>com.oath.cyclops</groupId>
    <artifactId>cyclops</artifactId>
    <version>10.4.1</version>
</dependency>

实现代码:

@Test
public void givenAStreamOfData_whenIsProcessingInBatchUsingCyclops_thenFourBatchesAreObtained() {
    Collection<List<Integer>> result = new ArrayList<>();
    ReactiveSeq.fromStream(data)
      .grouped(BATCH_SIZE)
      .toList()
      .forEach(value -> result.add(value.collect(Collectors.toList())));
    assertTrue(result.contains(firstBatch));
    assertTrue(result.contains(secondBatch));
    assertTrue(result.contains(thirdBatch));
    assertTrue(result.contains(fourthBatch));
}

懒加载版本(使用 LazySeq):

@Test
public void givenAStreamOfData_whenIsProcessingInBatchUsingCyclopsLazy_thenFourBatchesAreObtained() {
    Collection<List<Integer>> result = new ArrayList<>();
    LazySeq.fromStream(data)
      .grouped(BATCH_SIZE)
      .toList()
      .forEach(value -> result.add(value.collect(Collectors.toList())));
    assertTrue(result.contains(firstBatch));
    assertTrue(result.contains(secondBatch));
    assertTrue(result.contains(thirdBatch));
    assertTrue(result.contains(fourthBatch));
}

12. 总结

本文系统介绍了 Java 中实现 Stream 数据批量处理的多种方案,涵盖:

  • 原生 Java API(Iterator、Collection API)
  • 主流第三方库(RxJava、Vavr、Reactor)
  • 实用工具库(Apache Commons、Guava、Cyclops)

每种方案都有其适用场景:

  • 简单场景:优先使用原生 Java API
  • 响应式编程:选择 RxJava 或 Reactor
  • 函数式编程:Vavr 或 Cyclops 更合适
  • 工具类需求:Apache Commons 或 Guava

所有示例代码可在 GitHub 获取完整实现。


原始标题:Batch Processing of Stream Data in Java