1. 线程池简介

线程是操作系统中非常宝贵的资源。默认情况下,每个线程会占用大约 1MB 的内存(栈大小)。

在某些 JVM 实现中,这段内存不仅需要分配,还需要初始化为零。此外,创建线程还需要调用系统级 API。所有这些操作使得线程的创建成为一个相对耗时的过程。

最后,如果我们希望支持优雅关闭(graceful shutdown),还需要处理线程提前中断的信息。

因此,在应用启动前预先创建好足够数量的线程是非常重要的,这样可以避免每次用户请求时都创建线程带来的性能损耗。同时,集中管理线程也更便于维护和控制。我们可以使用 ExecutorService 来实现这一目标。

在 Kotlin 中创建和管理线程池的方式与 Java 非常相似,但语法更简洁,而且借助 Kotlin 强大的构造函数机制,将线程池注入到业务组件中也更方便。


2. ExecutorService 的结构

一个典型的 ExecutorService 包含多个参数:

val workerPool: ExecutorService = ThreadPoolExecutor(
  corePoolSize,
  maximumPoolSize,
  keepAliveTime,
  TimeUnit.SECONDS,
  workQueue,
  threadFactory,
  handler
)

上述代码创建了一个线程池:

  • corePoolSize:核心线程数,线程池初始化时会创建这些线程。
  • maximumPoolSize:最大线程数,当任务队列非空时,线程池会创建新线程,直到达到这个上限。
  • keepAliveTime:线程空闲时间,超过这个时间的空闲线程将被终止并回收。
  • workQueue:任务队列,用于存放待执行的任务。
  • threadFactory:线程工厂,用于创建线程对象。
  • handler:拒绝策略,当任务无法被接受时的处理方式。

3. Executors 工具类

幸运的是,我们并不总是需要手动配置所有参数。Java 提供了 Executors 工具类,包含了一些常用的线程池模板。

3.1 固定大小线程池(Fixed Thread Pool)

适用于任务流稳定、负载均衡的场景

val workerPool: ExecutorService = Executors.newFixedThreadPool(
    Runtime.getRuntime().availableProcessors()
)

一个典型的应用场景是需要持续利用系统资源时。线程数量应根据任务中阻塞操作的比例进行调整。

如果任务不频繁或对性能要求不高,可以使用单线程池:

val workerPool: ExecutorService = Executors.newSingleThreadExecutor()

// Later, in some method
println("I am working hard")
workerPool.submit {
    Thread.sleep(100) // Imitate slow IO
    println("I am reporting on the progress")
}
println("Meanwhile I continue to work")

3.2 缓存线程池(Cached Thread Pool)

根据任务负载动态调整资源使用

val workerPool: ExecutorService = Executors.newCachedThreadPool()

它会复用已有线程,必要时最多创建 Integer.MAX_VALUE 个线程。每个线程空闲时间不超过 60 秒。这种机制非常灵活,但缺乏背压控制,在高并发下可能导致 OOM(内存溢出)

我们也可以手动创建类似行为的线程池:

val corePoolSize = 4
val maximumPoolSize = corePoolSize * 4
val keepAliveTime = 100L
val workQueue = SynchronousQueue<Runnable>()
val workerPool: ExecutorService = ThreadPoolExecutor(
    corePoolSize, maximumPoolSize, keepAliveTime, TimeUnit.SECONDS, workQueue
)

使用 SynchronousQueue 作为任务队列,当线程数达到上限后,新任务将被阻塞直到有线程可用。

3.3 工作窃取线程池(Work-Stealing Thread Pool)

这是 Java 8 引入的 ForkJoinPool 类型,**不是传统的 ThreadPoolExecutor**。它主要用于执行 ForkJoinTask 类型的任务,这些任务可以被拆分成子任务,并由线程池中的多个线程并发执行。

我们网站有一篇关于 ForkJoin API 的详细示例,有兴趣可以参考。

3.4 定时调度线程池(Scheduled Thread Pool)

适用于需要延迟执行或周期性执行任务的场景

val counter = AtomicInteger(0)
val worker = Executors.newSingleThreadScheduledExecutor()
worker.scheduleAtFixedRate({ counter.incrementAndGet() }, 0, 100, TimeUnit.MILLISECONDS)

如果某个任务执行过程中发生异常,执行它的线程会终止,但调度器不会中断。下次任务触发时,系统会重新创建线程。


4. 常见线程池定制选项

除了使用默认配置,我们还可以根据需要对线程池进行定制。

4.1 自定义线程工厂(ThreadFactory)

最常见的是为线程池中的线程命名,便于日志追踪

val worker = Executors.newFixedThreadPool(4, object : ThreadFactory {
    private val counter = AtomicInteger(0)
    override fun newThread(r: Runnable): Thread =
        Thread(null, r, "panda-thread-${counter.incrementAndGet()}")
})

这样可以清晰地看到线程池中线程的生命周期和归属。

4.2 任务队列与拒绝策略

任务队列是线程池的重要组成部分。如果使用有界队列,当队列满时需要定义拒绝策略;如果是无界队列,则需要注意内存溢出问题。

示例代码如下:

class Task(val started: Long) : Callable<Long> {
    override fun call() = (System.currentTimeMillis() - started).also { Thread.sleep(100) }
}

val workerQueue = LinkedBlockingQueue<Runnable>()
val worker = ThreadPoolExecutor(2, 4, 100, TimeUnit.SECONDS, workerQueue)
buildList {
    repeat(6) { add(Task(System.currentTimeMillis())) }
}.let(worker::invokeAll)
  .map(Future<Long>::get)
  .also { println(it) }

输出可能类似:[4, 4, 110, 110, 211, 215],说明任务在队列中排队等待执行。

当任务量超出线程池处理能力时,可以选择以下拒绝策略:

DiscardPolicy:直接丢弃任务
DiscardOldestPolicy:丢弃最早的任务,执行新任务
AbortPolicy:抛出异常
CallerRunsPolicy:由调用线程执行任务
自定义策略:实现 RejectedExecutionHandler


5. 线程池的关闭

关闭线程池有三种常见方式:

5.1 shutdown()

停止接收新任务,但会继续执行已提交的任务:

val worker = Executors.newFixedThreadPool(2)
buildList {
    repeat(5) { add(Task(System.currentTimeMillis())) }
}.map(worker::submit)
worker.shutdown()
worker.submit { println("any task, really") } // 会触发拒绝策略

最后一行将触发 AbortPolicy(默认策略)。

5.2 awaitTermination(timeout, unit)

等待线程池中所有任务执行完毕,超时返回:

worker.awaitTermination(10, TimeUnit.MILLISECONDS)

线程池关闭后,新任务将被拒绝,但已有任务继续执行。

5.3 shutdownNow()

立即中断所有线程,并返回未执行的任务列表:

val unrun: List<Runnable> = worker.shutdownNow()

Java 的线程中断是协作式的,因此线程是否真正停止取决于任务本身的实现。

⚠️ 注意shutdown()shutdownNow() 是非阻塞的,而 awaitTermination() 会阻塞当前线程直到超时或所有任务完成。


6. 总结

本文我们讨论了线程池的创建原因、常见类型及其使用方式。Java 提供的 Executors 工具类覆盖了大部分常用场景,但在需要更精细控制时,也可以通过 ThreadPoolExecutor 进行定制。

  • ThreadPoolExecutor 适用于执行一次性任务
  • ForkJoinPool 更适合复杂任务的并行拆解
  • ScheduledExecutorService 支持定时或周期任务

所有示例代码均可在 GitHub 上找到。


原始标题:Creating a Thread Pool in Kotlin