1. 概述
本文将深入探讨如何在 Java 中实现 Stream 数据的批量处理。我们将通过原生 Java 特性和第三方库两种方式展示具体实现方案,帮助开发者掌握高效处理大数据流的技巧。
2. 什么是 Stream 数据批量处理?
Java 中的 Stream 数据批量处理是指将大型数据集分割成更小的、可管理的批次,并按顺序进行处理的技术。这种处理方式的数据源来自数据流。
批量处理的优势包括:
- ✅ 提升数据处理效率
- ✅ 处理无法一次性加载到内存的超大数据集
- ✅ 支持多处理器并行处理
但实现时需注意以下挑战:
- ⚠️ 批次大小设置:过小会增加处理开销,过大会导致处理延迟
- ⚠️ 状态管理:分布式系统中跨批次的状态维护复杂度高
- ⚠️ 容错机制:故障恢复需要保存大量中间状态
本文聚焦于 Java Stream 批量处理的核心实现,暂不深入解决上述复杂问题。
3. 使用 Java Streams API 进行批量处理
首先明确几个关键概念:
- Java 8 引入的 Streams API 是核心工具
- 我们主要使用
Stream
类
⚠️ 重要特性:数据流只能被消费一次,重复操作会抛出 IllegalStateException
:
Stream<String> coursesStream = Stream.of("Java", "Frontend", "Backend", "Fullstack");
Stream<Integer> coursesStreamLength = coursesStream.map(String::length);
// 第二次操作会抛出 java.lang.IllegalStateException
Stream<String> emphasisCourses = coursesStream.map(course -> course + "!");
准备测试数据:
// 数据流:0-33的整数
Stream<Integer> data = IntStream.range(0, 34).boxed();
// 批次大小
private final int BATCH_SIZE = 10;
// 预期批次结果
private final List<Integer> firstBatch = List.of(0, 1, 2, 3, 4, 5, 6, 7, 8, 9);
private final List<Integer> secondBatch = List.of(10, 11, 12, 13, 14, 15, 16, 17, 18, 19);
private final List<Integer> thirdBatch = List.of(20, 21, 22, 23, 24, 25, 26, 27, 28, 29);
private final List<Integer> fourthBatch = List.of(30, 31, 32, 33);
4. 使用 Iterator 方案
通过自定义 Iterator
实现批量处理:
public class CustomBatchIterator<T> implements Iterator<List<T>> {
private final int batchSize;
private List<T> currentBatch;
private final Iterator<T> iterator;
private CustomBatchIterator(Iterator<T> sourceIterator, int batchSize) {
this.batchSize = batchSize;
this.iterator = sourceIterator;
}
@Override
public List<T> next() {
prepareNextBatch();
return currentBatch;
}
@Override
public boolean hasNext() {
return iterator.hasNext();
}
private void prepareNextBatch() {
currentBatch = new ArrayList<>(batchSize);
while (iterator.hasNext() && currentBatch.size() < batchSize) {
currentBatch.add(iterator.next());
}
}
// 静态工厂方法
public static <T> Stream<List<T>> batchStreamOf(Stream<T> stream, int batchSize) {
return stream(new CustomBatchIterator<>(stream.iterator(), batchSize));
}
private static <T> Stream<T> stream(Iterator<T> iterator) {
return StreamSupport.stream(Spliterators.spliteratorUnknownSize(iterator, ORDERED), false);
}
}
测试验证:
@Test
public void givenAStreamOfData_whenIsProcessingInBatchUsingSpliterator_thenFourBatchesAreObtained() {
Collection<List<Integer>> result = new ArrayList<>();
CustomBatchIterator.batchStreamOf(data, BATCH_SIZE).forEach(result::add);
assertTrue(result.contains(firstBatch));
assertTrue(result.contains(secondBatch));
assertTrue(result.contains(thirdBatch));
assertTrue(result.contains(fourthBatch));
}
5. 使用 Collection API 方案
更简洁的实现方式:
@Test
public void givenAStreamOfData_whenIsProcessingInBatchUsingCollectionAPI_thenFourBatchesAreObtained() {
Collection<List<Integer>> result = data.collect(Collectors.groupingBy(it -> it / BATCH_SIZE))
.values();
assertTrue(result.contains(firstBatch));
assertTrue(result.contains(secondBatch));
assertTrue(result.contains(thirdBatch));
assertTrue(result.contains(fourthBatch));
}
✅ 并行处理优化(注意执行顺序不可控):
@Test
public void givenAStreamOfData_whenIsProcessingInBatchParallelUsingCollectionAPI_thenFourBatchesAreObtained() {
Collection<List<Integer>> result = data.parallel()
.collect(Collectors.groupingBy(it -> it / BATCH_SIZE))
.values();
assertTrue(result.contains(firstBatch));
assertTrue(result.contains(secondBatch));
assertTrue(result.contains(thirdBatch));
assertTrue(result.contains(fourthBatch));
}
6. RxJava 方案
添加依赖:
<dependency>
<groupId>io.reactivex.rxjava3</groupId>
<artifactId>rxjava</artifactId>
<version>3.1.5</version>
</dependency>
实现代码:
@Test
public void givenAStreamOfData_whenIsProcessingInBatchUsingRxJavaV3_thenFourBatchesAreObtained() {
Collection<List<Integer>> result = new ArrayList<>();
Observable.fromStream(data)
.buffer(BATCH_SIZE)
.subscribe(result::add);
assertTrue(result.contains(firstBatch));
assertTrue(result.contains(secondBatch));
assertTrue(result.contains(thirdBatch));
assertTrue(result.contains(fourthBatch));
}
7. Vavr 方案
添加依赖:
<dependency>
<groupId>io.vavr</groupId>
<artifactId>vavr</artifactId>
<version>1.0.0-alpha-4</version>
</dependency>
实现代码:
@Test
public void givenAStreamOfData_whenIsProcessingInBatchUsingVavr_thenFourBatchesAreObtained() {
List<List<Integer>> result = Stream.ofAll(data)
.toList()
.grouped(BATCH_SIZE)
.toList();
assertTrue(result.contains(firstBatch));
assertTrue(result.contains(secondBatch));
assertTrue(result.contains(thirdBatch));
assertTrue(result.contains(fourthBatch));
}
⚠️ 注意:这里的 List
来自 io.vavr.collection
,而非 java.util.List
8. Reactor 方案
添加依赖:
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-core</artifactId>
<version>3.5.1</version>
</dependency>
实现代码:
@Test
public void givenAStreamOfData_whenIsProcessingInBatchUsingReactor_thenFourBatchesAreObtained() {
Collection<List<Integer>> result = new ArrayList<>();
Flux.fromStream(data)
.buffer(BATCH_SIZE)
.subscribe(result::add);
assertTrue(result.contains(firstBatch));
assertTrue(result.contains(secondBatch));
assertTrue(result.contains(thirdBatch));
assertTrue(result.contains(fourthBatch));
}
9. Apache Commons 方案
添加依赖:
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-collections4</artifactId>
<version>4.4</version>
</dependency>
实现代码:
@Test
public void givenAStreamOfData_whenIsProcessingInBatchUsingApacheCommon_thenFourBatchesAreObtained() {
Collection<List<Integer>> result = new ArrayList<>(ListUtils
.partition(data.collect(Collectors.toList()), BATCH_SIZE));
assertTrue(result.contains(firstBatch));
assertTrue(result.contains(secondBatch));
assertTrue(result.contains(thirdBatch));
assertTrue(result.contains(fourthBatch));
}
⚠️ 注意:需要先将 Stream 转换为 List 再处理
10. Guava 方案
添加依赖:
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>33.0.0-jre</version>
</dependency>
实现代码:
@Test
public void givenAStreamOfData_whenIsProcessingInBatchUsingGuava_thenFourBatchesAreObtained() {
Collection<List<Integer>> result = new ArrayList<>();
Iterators.partition(data.iterator(), BATCH_SIZE).forEachRemaining(result::add);
assertTrue(result.contains(firstBatch));
assertTrue(result.contains(secondBatch));
assertTrue(result.contains(thirdBatch));
assertTrue(result.contains(fourthBatch));
}
11. Cyclops 方案
添加依赖:
<dependency>
<groupId>com.oath.cyclops</groupId>
<artifactId>cyclops</artifactId>
<version>10.4.1</version>
</dependency>
实现代码:
@Test
public void givenAStreamOfData_whenIsProcessingInBatchUsingCyclops_thenFourBatchesAreObtained() {
Collection<List<Integer>> result = new ArrayList<>();
ReactiveSeq.fromStream(data)
.grouped(BATCH_SIZE)
.toList()
.forEach(value -> result.add(value.collect(Collectors.toList())));
assertTrue(result.contains(firstBatch));
assertTrue(result.contains(secondBatch));
assertTrue(result.contains(thirdBatch));
assertTrue(result.contains(fourthBatch));
}
✅ 懒加载版本(使用 LazySeq
):
@Test
public void givenAStreamOfData_whenIsProcessingInBatchUsingCyclopsLazy_thenFourBatchesAreObtained() {
Collection<List<Integer>> result = new ArrayList<>();
LazySeq.fromStream(data)
.grouped(BATCH_SIZE)
.toList()
.forEach(value -> result.add(value.collect(Collectors.toList())));
assertTrue(result.contains(firstBatch));
assertTrue(result.contains(secondBatch));
assertTrue(result.contains(thirdBatch));
assertTrue(result.contains(fourthBatch));
}
12. 总结
本文系统介绍了 Java 中实现 Stream 数据批量处理的多种方案,涵盖:
- 原生 Java API(Iterator、Collection API)
- 主流第三方库(RxJava、Vavr、Reactor)
- 实用工具库(Apache Commons、Guava、Cyclops)
每种方案都有其适用场景:
- ✅ 简单场景:优先使用原生 Java API
- ✅ 响应式编程:选择 RxJava 或 Reactor
- ✅ 函数式编程:Vavr 或 Cyclops 更合适
- ✅ 工具类需求:Apache Commons 或 Guava
所有示例代码可在 GitHub 获取完整实现。