1. 概述
Java 8 引入了流(Stream)的概念,作为对大量数据进行批量操作的高效方式。在支持并发的环境中,可以获取并行流。
这些流可以带来性能提升,但代价是多线程开销。
在这个快速教程中,我们将探讨流API的一个主要限制,并了解如何让并行流与自定义线程池一起工作。或者,有一个库可以处理这个问题。
2. 并行流
让我们从一个简单的例子开始:在任何类型的Collection
上调用parallelStream()
方法,这将返回一个可能并行的流:
@Test
public void givenList_whenCallingParallelStream_shouldBeParallelStream(){
List<Long> aList = new ArrayList<>();
Stream<Long> parallelStream = aList.parallelStream();
assertTrue(parallelStream.isParallel());
}
在这种流中,默认的处理会使用ForkJoinPool.commonPool()
,这是整个应用共享的线程池。
3. 自定义线程池
实际上,我们在处理流时可以传递一个自定义线程池。
以下示例演示如何使用一个自定义线程池,计算从1到1,000,000(包括)的所有long值的和:
@Test
public void giveRangeOfLongs_whenSummedInParallel_shouldBeEqualToExpectedTotal()
throws InterruptedException, ExecutionException {
long firstNum = 1;
long lastNum = 1_000_000;
List<Long> aList = LongStream.rangeClosed(firstNum, lastNum).boxed()
.collect(Collectors.toList());
ForkJoinPool customThreadPool = new ForkJoinPool(4);
long actualTotal = customThreadPool.submit(
() -> aList.parallelStream().reduce(0L, Long::sum)).get();
assertEquals((lastNum + firstNum) * lastNum / 2, actualTotal);
}
我们使用了ForkJoinPool
构造函数,设置并行度为4。对于不同的环境,需要进行一些实验来确定最佳值,但一般来说,可以根据CPU核心数量来选择。
接下来,我们在并行流的内容上执行处理,通过reduce
调用来求和它们。
这个简单的例子可能无法完全展示使用自定义线程池的全部好处,但在不希望占用默认线程池处理长时间运行任务(如网络数据源)或应用内其他组件正在使用默认线程池的情况下,其优势就很明显了。
如果运行上面的测试方法,它会通过。目前为止,一切正常。
然而,如果我们像在测试方法中那样,在正常方法中实例化ForkJoinPool
类,可能会导致OutOfMemoryError
。
现在,让我们更深入地了解内存泄漏的原因。
4. 警惕内存泄漏
正如我们之前讨论的,默认线程池是整个应用共享的静态线程池实例。
因此,如果我们使用默认线程池,不会发生内存泄漏。
现在回顾我们的测试方法。在测试方法中,我们创建了一个ForkJoinPool
对象。当测试方法结束时,自定义线程池对象不会被引用并被垃圾回收,而是等待分配新的任务。
也就是说,每次我们调用测试方法,都会创建一个新的customThreadPool
对象,而不会释放它。
解决这个问题的方法很简单:在执行完方法后,关闭customThreadPool
对象:
try {
long actualTotal = customThreadPool.submit(
() -> aList.parallelStream().reduce(0L, Long::sum)).get();
assertEquals((lastNum + firstNum) * lastNum / 2, actualTotal);
} finally {
customThreadPool.shutdown();
}
5. 总结
我们简要介绍了如何使用自定义线程池运行并行流。在适当的环境和正确的并行度设置下,某些情况下可以获得性能提升。
如果创建自定义线程池,我们需要记住在完成后调用其shutdown()
方法以避免内存泄漏。
本文中引用的完整代码示例可以在GitHub上找到。