1. 概述

Java 8 引入了流(Stream)的概念,作为对大量数据进行批量操作的高效方式。在支持并发的环境中,可以获取并行流。

这些流可以带来性能提升,但代价是多线程开销。

在这个快速教程中,我们将探讨流API的一个主要限制,并了解如何让并行流与自定义线程池一起工作。或者,有一个库可以处理这个问题

2. 并行流

让我们从一个简单的例子开始:在任何类型的Collection上调用parallelStream()方法,这将返回一个可能并行的流:

@Test
public void givenList_whenCallingParallelStream_shouldBeParallelStream(){
    List<Long> aList = new ArrayList<>();
    Stream<Long> parallelStream = aList.parallelStream();
        
    assertTrue(parallelStream.isParallel());
}

在这种流中,默认的处理会使用ForkJoinPool.commonPool(),这是整个应用共享的线程池。

3. 自定义线程池

实际上,我们在处理流时可以传递一个自定义线程池。

以下示例演示如何使用一个自定义线程池,计算从1到1,000,000(包括)的所有long值的和:

@Test
public void giveRangeOfLongs_whenSummedInParallel_shouldBeEqualToExpectedTotal() 
  throws InterruptedException, ExecutionException {
    
    long firstNum = 1;
    long lastNum = 1_000_000;

    List<Long> aList = LongStream.rangeClosed(firstNum, lastNum).boxed()
      .collect(Collectors.toList());

    ForkJoinPool customThreadPool = new ForkJoinPool(4);
    long actualTotal = customThreadPool.submit(
      () -> aList.parallelStream().reduce(0L, Long::sum)).get();
 
    assertEquals((lastNum + firstNum) * lastNum / 2, actualTotal);
}

我们使用了ForkJoinPool构造函数,设置并行度为4。对于不同的环境,需要进行一些实验来确定最佳值,但一般来说,可以根据CPU核心数量来选择。

接下来,我们在并行流的内容上执行处理,通过reduce调用来求和它们。

这个简单的例子可能无法完全展示使用自定义线程池的全部好处,但在不希望占用默认线程池处理长时间运行任务(如网络数据源)或应用内其他组件正在使用默认线程池的情况下,其优势就很明显了。

如果运行上面的测试方法,它会通过。目前为止,一切正常。

然而,如果我们像在测试方法中那样,在正常方法中实例化ForkJoinPool类,可能会导致OutOfMemoryError

现在,让我们更深入地了解内存泄漏的原因。

4. 警惕内存泄漏

正如我们之前讨论的,默认线程池是整个应用共享的静态线程池实例。

因此,如果我们使用默认线程池,不会发生内存泄漏。

现在回顾我们的测试方法。在测试方法中,我们创建了一个ForkJoinPool对象。当测试方法结束时,自定义线程池对象不会被引用并被垃圾回收,而是等待分配新的任务

也就是说,每次我们调用测试方法,都会创建一个新的customThreadPool对象,而不会释放它。

解决这个问题的方法很简单:在执行完方法后,关闭customThreadPool对象:

try {
    long actualTotal = customThreadPool.submit(
      () -> aList.parallelStream().reduce(0L, Long::sum)).get();
    assertEquals((lastNum + firstNum) * lastNum / 2, actualTotal);
} finally {
    customThreadPool.shutdown();
}

5. 总结

我们简要介绍了如何使用自定义线程池运行并行流。在适当的环境和正确的并行度设置下,某些情况下可以获得性能提升。

如果创建自定义线程池,我们需要记住在完成后调用其shutdown()方法以避免内存泄漏。

本文中引用的完整代码示例可以在GitHub上找到。