1. 概述
在上一篇文章中,我们介绍了parallel-collectors,这是一个无需依赖的小型库,它为自定义线程池上的Stream
API 提供了并行处理能力。
Project Loom 是将轻量级虚拟线程(以前称为纤)引入JVM的有组织努力,这项工作在JDK21中最终完成。
让我们看看如何在Parallel Collectors中利用这一点。
2. Maven依赖
如果我们想开始使用这个库,我们需要在Maven的pom.xml
文件中添加一个条目:
<dependency>
<groupId>com.pivovarit</groupId>
<artifactId>parallel-collectors</artifactId>
<version>3.0.0</version>
</dependency>
或者在Gradle构建文件中添加一行:
compile 'com.pivovarit:parallel-collectors:3.0.0'
最新版本可以在Maven中央仓库找到。
3. 使用操作系统线程与虚拟线程的并行处理
3.1. 操作系统线程并行性
让我们首先了解为什么使用虚拟线程进行并行处理是一个大问题。
我们将从一个简单的示例开始。我们需要一个要并行化的操作,它将是延迟的字符串连接:
private static String fetchById(int id) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
// ignore shamelessly
}
return "user-" + id;
}
我们还需要测量执行时间的自定义代码:
private static <T> T timed(Supplier<T> supplier) {
var before = Instant.now();
T result = supplier.get();
var after = Instant.now();
log.info("Execution time: {} ms", Duration.between(before, after).toMillis());
return result;
}
现在,让我们创建一个简单的并行Stream
处理示例,其中我们创建n个元素,然后在n个线程上并行处理它们,每个线程的并行度为n:
@Test
public void processInParallelOnOSThreads() {
int parallelProcesses = 5_000;
var e = Executors.newFixedThreadPool(parallelProcesses);
var result = timed(() -> Stream.iterate(0, i -> i + 1).limit(parallelProcesses)
.collect(ParallelCollectors.parallel(i -> fetchById(i), toList(), e, parallelProcesses))
.join());
log.info("{}", result);
}
运行时,我们可以观察到它确实完成了任务,因为我们不需要等待5000秒才能得到结果:
Execution time: 1321 ms
[user-0, user-1, user-2, ...]
但是,如果我们尝试将并行处理的元素数量增加到20_000:
[2.795s][warning][os,thread] Failed to start thread "Unknown thread" - pthread_create failed (...)
[2.795s][warning][os,thread] Failed to start the native thread for java.lang.Thread "pool-1-thread-16111"
基于操作系统线程的方法不具有可扩展性,因为创建线程很昂贵,而且很快就会达到资源限制。
让我们来看看切换到虚拟线程会发生什么。
3.2. 虚拟线程并行性
在Java 21之前,很难为线程池配置提供合理的默认值。幸运的是,虚拟线程不需要任何配置——我们可以创建任意多的线程,它们会在一个共享的ForkJoinPool实例上进行内部调度,非常适合运行阻塞操作!
如果我们正在运行Parallel Collectors 3.x,我们可以轻松地利用虚拟线程:
@Test
public void processInParallelOnVirtualThreads() {
int parallelProcesses = 5_000;
var result = timed(() -> Stream.iterate(0, i -> i + 1).limit(parallelProcesses)
.collect(ParallelCollectors.parallel(i -> fetchById(i), toList()))
.join());
}
正如我们所见,这就像省略executor
和parallelism
参数一样简单,因为虚拟线程是默认执行工具。
如果尝试运行它,我们会看到它实际上比原始示例完成得更快:
Execution time: 1101 ms
[user-0, user-1, user-2, ...]
这是因为我们创建了5000个虚拟线程,并且使用了一组非常有限的OS线程进行调度。
让我们尝试将并行度提高到20_000,这是经典Executor
无法实现的:
Execution time: 1219 ms
[user-0, user-1, user-2, ...]
不仅成功执行,而且比在OS线程上四倍小的工作负载完成得更快!
让我们将并行度提高到100_000,看看会发生什么:
Execution time: 1587 ms
[user-0, user-1, user-2, ...]
虽然观察到了显著的开销,但仍然可以正常工作。
如果我们将并行度提高到1_000_000呢?
Execution time: 6416 ms
[user-0, user-1, user-2, ...]
2_000_000?
Execution time: 12906 ms
[user-0, user-1, user-2, ...]
5_000_000?
Execution time: 25952 ms
[user-0, user-1, user-2, ...]
如我们所见,我们可以轻松地扩展到以前无法用操作系统线程实现的高并行度。 这一优势,加上在较小并行工作负载上的性能提升,是使用虚拟线程对阻塞操作进行并行处理的主要好处。
3.3. 虚拟线程与Parallel Collectors的旧版本
最简单的方法是升级到库的最新版本来利用虚拟线程,但如果无法做到,我们也可以在运行在JDK21上的2.x.y版本中实现这一点。
诀窍是手动提供Executors.newVirtualThreadPerTaskExecutor()
作为执行器,并设置最大并行度为Integer.MAX_VALUE
:
@Test
public void processInParallelOnVirtualThreadsParallelCollectors2() {
int parallelProcesses = 100_000;
var result = timed(() -> Stream.iterate(0, i -> i + 1).limit(parallelProcesses)
.collect(ParallelCollectors.parallel(
i -> fetchById(i), toList(),
Executors.newVirtualThreadPerTaskExecutor(), Integer.MAX_VALUE))
.join());
log.info("{}", result);
}
5. 总结
在这篇文章中,我们有机会看到如何轻松地使用Parallel Collectors库利用虚拟线程,它在性能上明显优于传统的基于操作系统线程的解决方案。我们的测试机器在大约16000个线程时达到了资源限制,而我们能够轻易地扩展到数百万个虚拟线程。
如往常一样,代码示例可以在GitHub上找到。