1. 线程池简介
线程是操作系统中非常宝贵的资源。默认情况下,每个线程会占用大约 1MB 的内存(栈大小)。
在某些 JVM 实现中,这段内存不仅需要分配,还需要初始化为零。此外,创建线程还需要调用系统级 API。所有这些操作使得线程的创建成为一个相对耗时的过程。
最后,如果我们希望支持优雅关闭(graceful shutdown),还需要处理线程提前中断的信息。
因此,在应用启动前预先创建好足够数量的线程是非常重要的,这样可以避免每次用户请求时都创建线程带来的性能损耗。同时,集中管理线程也更便于维护和控制。我们可以使用 ExecutorService
来实现这一目标。
在 Kotlin 中创建和管理线程池的方式与 Java 非常相似,但语法更简洁,而且借助 Kotlin 强大的构造函数机制,将线程池注入到业务组件中也更方便。
2. ExecutorService 的结构
一个典型的 ExecutorService
包含多个参数:
val workerPool: ExecutorService = ThreadPoolExecutor(
corePoolSize,
maximumPoolSize,
keepAliveTime,
TimeUnit.SECONDS,
workQueue,
threadFactory,
handler
)
上述代码创建了一个线程池:
corePoolSize
:核心线程数,线程池初始化时会创建这些线程。maximumPoolSize
:最大线程数,当任务队列非空时,线程池会创建新线程,直到达到这个上限。keepAliveTime
:线程空闲时间,超过这个时间的空闲线程将被终止并回收。workQueue
:任务队列,用于存放待执行的任务。threadFactory
:线程工厂,用于创建线程对象。handler
:拒绝策略,当任务无法被接受时的处理方式。
3. Executors 工具类
幸运的是,我们并不总是需要手动配置所有参数。Java 提供了 Executors
工具类,包含了一些常用的线程池模板。
3.1 固定大小线程池(Fixed Thread Pool)
适用于任务流稳定、负载均衡的场景:
val workerPool: ExecutorService = Executors.newFixedThreadPool(
Runtime.getRuntime().availableProcessors()
)
一个典型的应用场景是需要持续利用系统资源时。线程数量应根据任务中阻塞操作的比例进行调整。
如果任务不频繁或对性能要求不高,可以使用单线程池:
val workerPool: ExecutorService = Executors.newSingleThreadExecutor()
// Later, in some method
println("I am working hard")
workerPool.submit {
Thread.sleep(100) // Imitate slow IO
println("I am reporting on the progress")
}
println("Meanwhile I continue to work")
3.2 缓存线程池(Cached Thread Pool)
根据任务负载动态调整资源使用:
val workerPool: ExecutorService = Executors.newCachedThreadPool()
它会复用已有线程,必要时最多创建 Integer.MAX_VALUE
个线程。每个线程空闲时间不超过 60 秒。这种机制非常灵活,但缺乏背压控制,在高并发下可能导致 OOM(内存溢出)。
我们也可以手动创建类似行为的线程池:
val corePoolSize = 4
val maximumPoolSize = corePoolSize * 4
val keepAliveTime = 100L
val workQueue = SynchronousQueue<Runnable>()
val workerPool: ExecutorService = ThreadPoolExecutor(
corePoolSize, maximumPoolSize, keepAliveTime, TimeUnit.SECONDS, workQueue
)
使用 SynchronousQueue
作为任务队列,当线程数达到上限后,新任务将被阻塞直到有线程可用。
3.3 工作窃取线程池(Work-Stealing Thread Pool)
这是 Java 8 引入的 ForkJoinPool
类型,**不是传统的 ThreadPoolExecutor
**。它主要用于执行 ForkJoinTask
类型的任务,这些任务可以被拆分成子任务,并由线程池中的多个线程并发执行。
我们网站有一篇关于 ForkJoin API 的详细示例,有兴趣可以参考。
3.4 定时调度线程池(Scheduled Thread Pool)
适用于需要延迟执行或周期性执行任务的场景:
val counter = AtomicInteger(0)
val worker = Executors.newSingleThreadScheduledExecutor()
worker.scheduleAtFixedRate({ counter.incrementAndGet() }, 0, 100, TimeUnit.MILLISECONDS)
如果某个任务执行过程中发生异常,执行它的线程会终止,但调度器不会中断。下次任务触发时,系统会重新创建线程。
4. 常见线程池定制选项
除了使用默认配置,我们还可以根据需要对线程池进行定制。
4.1 自定义线程工厂(ThreadFactory)
最常见的是为线程池中的线程命名,便于日志追踪:
val worker = Executors.newFixedThreadPool(4, object : ThreadFactory {
private val counter = AtomicInteger(0)
override fun newThread(r: Runnable): Thread =
Thread(null, r, "panda-thread-${counter.incrementAndGet()}")
})
这样可以清晰地看到线程池中线程的生命周期和归属。
4.2 任务队列与拒绝策略
任务队列是线程池的重要组成部分。如果使用有界队列,当队列满时需要定义拒绝策略;如果是无界队列,则需要注意内存溢出问题。
示例代码如下:
class Task(val started: Long) : Callable<Long> {
override fun call() = (System.currentTimeMillis() - started).also { Thread.sleep(100) }
}
val workerQueue = LinkedBlockingQueue<Runnable>()
val worker = ThreadPoolExecutor(2, 4, 100, TimeUnit.SECONDS, workerQueue)
buildList {
repeat(6) { add(Task(System.currentTimeMillis())) }
}.let(worker::invokeAll)
.map(Future<Long>::get)
.also { println(it) }
输出可能类似:[4, 4, 110, 110, 211, 215]
,说明任务在队列中排队等待执行。
当任务量超出线程池处理能力时,可以选择以下拒绝策略:
✅ DiscardPolicy:直接丢弃任务
✅ DiscardOldestPolicy:丢弃最早的任务,执行新任务
✅ AbortPolicy:抛出异常
✅ CallerRunsPolicy:由调用线程执行任务
✅ 自定义策略:实现 RejectedExecutionHandler
5. 线程池的关闭
关闭线程池有三种常见方式:
5.1 shutdown()
停止接收新任务,但会继续执行已提交的任务:
val worker = Executors.newFixedThreadPool(2)
buildList {
repeat(5) { add(Task(System.currentTimeMillis())) }
}.map(worker::submit)
worker.shutdown()
worker.submit { println("any task, really") } // 会触发拒绝策略
最后一行将触发 AbortPolicy
(默认策略)。
5.2 awaitTermination(timeout, unit)
等待线程池中所有任务执行完毕,超时返回:
worker.awaitTermination(10, TimeUnit.MILLISECONDS)
线程池关闭后,新任务将被拒绝,但已有任务继续执行。
5.3 shutdownNow()
立即中断所有线程,并返回未执行的任务列表:
val unrun: List<Runnable> = worker.shutdownNow()
Java 的线程中断是协作式的,因此线程是否真正停止取决于任务本身的实现。
⚠️ 注意:shutdown()
和 shutdownNow()
是非阻塞的,而 awaitTermination()
会阻塞当前线程直到超时或所有任务完成。
6. 总结
本文我们讨论了线程池的创建原因、常见类型及其使用方式。Java 提供的 Executors
工具类覆盖了大部分常用场景,但在需要更精细控制时,也可以通过 ThreadPoolExecutor
进行定制。
ThreadPoolExecutor
适用于执行一次性任务ForkJoinPool
更适合复杂任务的并行拆解ScheduledExecutorService
支持定时或周期任务
所有示例代码均可在 GitHub 上找到。