1. 引言

虚拟线程是JDK 21正式引入的一项实用功能,旨在提升高吞吐量应用的性能。

但JDK没有内置使用虚拟线程的任务调度工具,因此我们需要自己编写基于虚拟线程运行的任务调度器。

本文将使用Thread.sleep()方法和ScheduledExecutorService类,为虚拟线程创建自定义调度器。

2. 什么是虚拟线程?

虚拟线程在JEP-444中被引入,作为Thread类的轻量级版本,最终提升了高吞吐量应用的并发能力。

虚拟线程占用的空间远小于常规操作系统线程(或平台线程)。因此,我们可以在应用中同时创建比平台线程多得多的虚拟线程。这无疑增加了最大并发单元数,从而提高了应用的吞吐量。

关键点在于:虚拟线程并不比平台线程更快。它们在应用中只是数量更多,从而能执行更多并行任务。

虚拟线程成本低廉,因此我们不需要像对待平台线程、网络或数据库连接这类昂贵资源那样使用资源池技术。在现代计算机上,我们几乎可以无限创建虚拟线程而不会遇到内存问题。

最后,虚拟线程是动态的,而平台线程大小固定。因此,虚拟线程比平台线程更适合处理小型任务,比如简单的HTTP或数据库调用。

3. 调度虚拟线程

我们已经看到虚拟线程的一大优势是轻量且廉价。在普通机器上,我们可以轻松创建数十万个虚拟线程而不会出现内存溢出错误。因此,像对待平台线程这类昂贵资源那样对虚拟线程进行池化并没有太大意义。

使用线程池会带来额外开销:需要为池中的可用线程排队任务,这增加了复杂性并可能降低速度。此外,Java中的大多数线程池都受限于平台线程数量,而这个数量总是小于程序中可能的虚拟线程数量。

因此,我们必须避免将虚拟线程与线程池API(如ForkJoinPoolThreadPoolExecutor)一起使用。相反,我们应该为每个任务创建一个新的虚拟线程。

目前,Java没有提供标准API来调度虚拟线程,就像我们使用ScheduledExecutorServiceschedule()方法那样。所以,要让虚拟线程有效执行定时任务,我们需要自己编写调度器。

3.1. 使用Thread.sleep()调度虚拟线程

最直接的自定义调度器实现方式是使用Thread.sleep()方法让当前线程等待执行:

static Future<?> schedule(Runnable task, int delay, TemporalUnit unit, ExecutorService executorService) {
    return executorService.submit(() -> {
        try {
            Thread.sleep(Duration.of(delay, unit));
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }

        task.run();
    });
}

schedule()方法接收待调度的taskdelay时间和一个ExecutorService。然后我们通过ExecutorServicesubmit()启动任务。在try块中,我们调用Thread.sleep()使执行任务的线程等待指定延迟。线程在等待时可能被中断,因此我们通过处理InterruptedException来中断当前线程执行。

等待结束后,我们调用接收到的taskrun()方法。

要使用自定义schedule()方法调度虚拟线程,需要向其传递一个虚拟线程的执行器服务:

ExecutorService virtualThreadExecutor = Executors.newVirtualThreadPerTaskExecutor();

try (virtualThreadExecutor) {
    var taskResult = schedule(() -> 
      System.out.println("在调度的虚拟线程上运行!"), 5, ChronoUnit.SECONDS,
      virtualThreadExecutor);

    try {
        Thread.sleep(10 * 1000); // 睡眠10秒等待任务结果
    } catch (InterruptedException e) {
        Thread.currentThread()
          .interrupt();
    }

    System.out.println(taskResult.get());
}

首先,我们实例化一个ExecutorService,为每个提交的任务创建一个新的虚拟线程。然后,我们将virtualThreadExecutor变量包装在try-with-resources语句中,保持执行器服务开启直到使用完毕。或者,使用完执行器服务后,我们可以通过shutdown()正确关闭它。

我们调用schedule()在5秒后运行任务,然后等待10秒再尝试获取任务执行结果。

3.2. 使用SingleThreadExecutor调度虚拟线程

我们了解了如何使用sleep()为虚拟线程调度任务。另一种方法是为每个提交的任务在虚拟线程执行器中实例化一个新的单线程调度器:

static Future<?> schedule(Runnable task, int delay, TimeUnit unit, ExecutorService executorService) {
    return executorService.submit(() -> {
        ScheduledExecutorService singleThreadScheduler = Executors.newSingleThreadScheduledExecutor();

        try (singleThreadScheduler) {
            singleThreadScheduler.schedule(task, delay, unit);
        }
    });
}

代码同样使用作为参数传递的虚拟线程ExecutorService提交任务。但现在,我们为每个任务通过newSingleThreadScheduledExecutor()方法实例化一个单线程的ScheduledExecutorService

然后在try-with-resources块中,我们使用单线程执行器的schedule()方法调度任务。该方法接受taskdelay作为参数,不像sleep()那样会抛出受检的InterruptedException

最后,我们可以使用schedule()向虚拟线程执行器调度任务:

ExecutorService virtualThreadExecutor = Executors.newVirtualThreadPerTaskExecutor();

try (virtualThreadExecutor) {
    var taskResult = schedule(() -> 
      System.out.println("在调度的虚拟线程上运行!"), 5, TimeUnit.SECONDS,
      virtualThreadExecutor);

    try {
        Thread.sleep(10 * 1000); // 睡眠10秒等待任务结果
    } catch (InterruptedException e) {
        Thread.currentThread()
          .interrupt();
    }

    System.out.println(taskResult.get());
}

这与3.1节schedule()方法的用法类似,但这里我们传递的是TimeUnit而不是ChronoUnit

3.3. sleep() vs. 单线程调度执行器对比

sleep()调度方式中,我们只是调用一个方法在真正运行任务前等待。因此: ✅ 代码逻辑直观,易于理解 ✅ 调试更简单

而使用单线程调度执行器的方式: ❌ 依赖库的调度器代码,调试和排查问题可能更困难

此外,如果选择sleep(),我们只能调度任务在固定延迟后运行。而使用ScheduledExecutorService,我们可以访问三种调度方法:schedule()scheduleAtFixedRate()scheduleWithFixedDelay()

ScheduledExecutorServiceschedule()方法仅添加延迟,类似sleep()。而scheduleAtFixedRate()scheduleWithFixedDelay()方法增加了周期性调度功能,使我们能按固定时间间隔重复执行任务。因此,使用Java内置的ScheduledExecutorService库在任务调度上提供了更多灵活性。

4. 结论

本文介绍了虚拟线程相比传统平台线程的几大优势,并展示了如何使用Thread.sleep()ScheduledExecutorService为虚拟线程调度任务。

完整源代码可在GitHub获取。


原始标题:How to use virtual threads with ScheduledExecutorService | Baeldung