1. 概述

上一篇文章中,我们介绍了parallel-collectors,这是一个无需依赖的小型库,它为自定义线程池上的Stream API 提供了并行处理能力。

Project Loom 是将轻量级虚拟线程(以前称为纤)引入JVM的有组织努力,这项工作在JDK21中最终完成。

让我们看看如何在Parallel Collectors中利用这一点。

2. Maven依赖

如果我们想开始使用这个库,我们需要在Maven的pom.xml文件中添加一个条目:

<dependency>
    <groupId>com.pivovarit</groupId>
    <artifactId>parallel-collectors</artifactId>
    <version>3.0.0</version>
</dependency>

或者在Gradle构建文件中添加一行:

compile 'com.pivovarit:parallel-collectors:3.0.0'

最新版本可以在Maven中央仓库找到。

3. 使用操作系统线程与虚拟线程的并行处理

3.1. 操作系统线程并行性

让我们首先了解为什么使用虚拟线程进行并行处理是一个大问题。

我们将从一个简单的示例开始。我们需要一个要并行化的操作,它将是延迟的字符串连接:

private static String fetchById(int id) {
    try {
        Thread.sleep(1000);
    } catch (InterruptedException e) {
        // ignore shamelessly
    }
    return "user-" + id;
}

我们还需要测量执行时间的自定义代码:

private static <T> T timed(Supplier<T> supplier) {
    var before = Instant.now();
    T result = supplier.get();
    var after = Instant.now();
    log.info("Execution time: {} ms", Duration.between(before, after).toMillis());
    return result;
}

现在,让我们创建一个简单的并行Stream处理示例,其中我们创建n个元素,然后在n个线程上并行处理它们,每个线程的并行度为n:

@Test
public void processInParallelOnOSThreads() {
    int parallelProcesses = 5_000;
    var e = Executors.newFixedThreadPool(parallelProcesses);

    var result = timed(() -> Stream.iterate(0, i -> i + 1).limit(parallelProcesses)
      .collect(ParallelCollectors.parallel(i -> fetchById(i), toList(), e, parallelProcesses))
      .join());

    log.info("{}", result);
}

运行时,我们可以观察到它确实完成了任务,因为我们不需要等待5000秒才能得到结果:

Execution time: 1321 ms
[user-0, user-1, user-2, ...]

但是,如果我们尝试将并行处理的元素数量增加到20_000:

[2.795s][warning][os,thread] Failed to start thread "Unknown thread" - pthread_create failed (...)
[2.795s][warning][os,thread] Failed to start the native thread for java.lang.Thread "pool-1-thread-16111"

基于操作系统线程的方法不具有可扩展性,因为创建线程很昂贵,而且很快就会达到资源限制。

让我们来看看切换到虚拟线程会发生什么。

3.2. 虚拟线程并行性

在Java 21之前,很难为线程池配置提供合理的默认值。幸运的是,虚拟线程不需要任何配置——我们可以创建任意多的线程,它们会在一个共享的ForkJoinPool实例上进行内部调度,非常适合运行阻塞操作!

如果我们正在运行Parallel Collectors 3.x,我们可以轻松地利用虚拟线程:

@Test
public void processInParallelOnVirtualThreads() {
    int parallelProcesses = 5_000;

    var result = timed(() -> Stream.iterate(0, i -> i + 1).limit(parallelProcesses)
      .collect(ParallelCollectors.parallel(i -> fetchById(i), toList()))
      .join());
}

正如我们所见,这就像省略executorparallelism参数一样简单,因为虚拟线程是默认执行工具。

如果尝试运行它,我们会看到它实际上比原始示例完成得更快:

Execution time: 1101 ms
[user-0, user-1, user-2, ...]

这是因为我们创建了5000个虚拟线程,并且使用了一组非常有限的OS线程进行调度。

让我们尝试将并行度提高到20_000,这是经典Executor无法实现的:

Execution time: 1219 ms
[user-0, user-1, user-2, ...]

不仅成功执行,而且比在OS线程上四倍小的工作负载完成得更快!

让我们将并行度提高到100_000,看看会发生什么:

Execution time: 1587 ms
[user-0, user-1, user-2, ...]

虽然观察到了显著的开销,但仍然可以正常工作。

如果我们将并行度提高到1_000_000呢?

Execution time: 6416 ms
[user-0, user-1, user-2, ...]

2_000_000?

Execution time: 12906 ms
[user-0, user-1, user-2, ...]

5_000_000?

Execution time: 25952 ms
[user-0, user-1, user-2, ...]

如我们所见,我们可以轻松地扩展到以前无法用操作系统线程实现的高并行度。 这一优势,加上在较小并行工作负载上的性能提升,是使用虚拟线程对阻塞操作进行并行处理的主要好处。

3.3. 虚拟线程与Parallel Collectors的旧版本

最简单的方法是升级到库的最新版本来利用虚拟线程,但如果无法做到,我们也可以在运行在JDK21上的2.x.y版本中实现这一点。

诀窍是手动提供Executors.newVirtualThreadPerTaskExecutor()作为执行器,并设置最大并行度为Integer.MAX_VALUE

@Test
public void processInParallelOnVirtualThreadsParallelCollectors2() {
    int parallelProcesses = 100_000;

    var result = timed(() -> Stream.iterate(0, i -> i + 1).limit(parallelProcesses)
      .collect(ParallelCollectors.parallel(
        i -> fetchById(i), toList(), 
        Executors.newVirtualThreadPerTaskExecutor(), Integer.MAX_VALUE))
      .join());

    log.info("{}", result);
}

5. 总结

在这篇文章中,我们有机会看到如何轻松地使用Parallel Collectors库利用虚拟线程,它在性能上明显优于传统的基于操作系统线程的解决方案。我们的测试机器在大约16000个线程时达到了资源限制,而我们能够轻易地扩展到数百万个虚拟线程。

如往常一样,代码示例可以在GitHub上找到。