1. Introduction
Each OS thread is an expensive resource. By default, one thread has 1MB of RAM associated with it: it’s the stack size.
In some JVM implementations, that piece of memory has to be not only allocated but also filled with zeroes. Apart from that, a system call must be made to allocate a native thread. All this makes the creation of a thread a rather lengthy procedure.
Finally, if we have to support a graceful shutdown, the information about premature thread cancellation has to come from somewhere.
Therefore it’s crucial to pre-create enough threads for our application so that starting a thread won’t affect the performance of each user accessing our software. It’s also a good idea to centralize the thread management. To do that, we can use ExecutorServices. Creating and managing thread pools in Kotlin doesn’t differ too much from Java. However, the syntax is slightly different, and the injection of executor services into a business logic component is much easier because we can leverage powerful Kotlin constructors.
2. The Anatomy of an Executor Service
A typical executor service contains many parts:
val workerPool: ExecutorService = ThreadPoolExecutor(
corePoolSize,
maximumPoolSize,
keepAliveTime,
TimeUnit.SECONDS,
workQueue,
threadFactory,
handler
)
This will create a thread pool with an initial number of threads corePoolSize, which the system will populate as soon as workQueue is non-empty. Each time we submit a new task to the queue, a new thread will be created until the number of threads reaches maximumPoolSize. A factory threadFactory will create these threads.
Threads that have been idle for keepAliveTime will terminate, and the garbage collector will collect them. If workQueue is bounded and overflows, the handler will handle such situations.
3. Executors API
Luckily, we don’t need to choose all of these parameters every time. A utility class Executors provides several handy presets. Let’s look at some of them.
3.1. A Fixed Thread Pool
A fixed thread pool is good when the flow of tasks is steady and always roughly at the same level:
val workerPool: ExecutorService = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors())
One obvious example of when a fixed thread pool can be useful is when we try to utilize the maximum of our resources all the time. The optimal number of threads varies depending on the amount of blocking non-CPU operations in the task.
Another frequent case is a single-thread pool. It comes in useful when we need a parallel execution, but the tasks are infrequent or not performance-critical:
val workerPool: ExecutorService = Executors.newSingleThreadExecutor()
// Later, in some method
// Do some in-thread CPU-intensive work
println("I am working hard")
workerPool.submit {
Thread.sleep(100) // Imitate slow IO
println("I am reporting on the progress")
}
println("Meanwhile I continue to work")
3.2. A Cached Thread Pool Is Load Adjustable
A cached thread pool will utilize resources according to the requirements of submitted tasks:
val workerPool: ExecutorService = Executors.newCachedThreadPool()
It will try to reuse existing threads for submitted tasks but will create up to Integer.MAX_VALUE threads if needed. These threads will live for 60 seconds before terminating. As such, it presents a very sharp tool that doesn’t include any backpressure mechanism. A sudden peak in load can bring the system down with an OutOfMemoryError.
We can achieve a similar effect but with better control by creating a ThreadPoolExecutor manually:
val corePoolSize = 4
val maximumPoolSize = corePoolSize * 4
val keepAliveTime = 100L
val workQueue = SynchronousQueue<Runnable>()
val workerPool: ExecutorService = ThreadPoolExecutor(
corePoolSize, maximumPoolSize, keepAliveTime, TimeUnit.SECONDS, workQueue
)
In the example, the system will create only up to maxThreads threads. As we used a SynchronousQueue for the task queue of the pool, mimicking the Executors, on hitting the limit of maxThreads simultaneous tasks, the next submitter is blocked until one of the threads is free again.
3.2. A Work-Stealing Thread Pool
A work-stealing thread pool is a special ExecutorService, which isn’t a version of ThreadPoolExecutors*.* It executes Runnables just like any other ExecutorService, but its true purpose is to execute ForkJoinTasks. Due to their inner structure, they can be broken into subtasks and executed by all threads in the pool at once. We have an example on our site with an in-depth discussion of the ForkJoin Java API.
3.3. A Scheduled Thread Pool
A scheduled thread pool is another point why we should create thread pools and not threads. A scheduled thread pool will execute a task (or several tasks) with a given delay or at a fixed rate:
val counter = AtomicInteger(0)
val worker = Executors.newSingleThreadScheduledExecutor()
worker.scheduleAtFixedRate({ counter.incrementAndGet() }, 0, 100, TimeUnit.MILLISECONDS)
If an error occurs during one of the task invocations, the thread which runs it will terminate, but the schedule will not break. For the next invocation, the system will create a new thread.
4. Common Thread Pool Customizations
We have already seen various versions of ThreadPoolExecutors*.* There are limits for the initial thread pool size and the maximum threads in the pool, which we can set. We also touched lightly on the task queue and the thread factory. Let’s see what other options we have for making a thread pool to fit our purposes better.
4.1. Creating Threads for the Pools
One of the most common modifications for the ThreadFactory is to change the name given to the threads in the thread pool so that it complies with the logging conventions of the team or company:
val worker = Executors.newFixedThreadPool(4, object : ThreadFactory {
private val counter = AtomicInteger(0)
override fun newThread(r: Runnable): Thread =
Thread(null, r, "panda-thread-${counter.incrementAndGet()}")
})
4.2. Work Queue and Rejection Policy
The work queue is a very important part of a thread pool. If it’s a bounded queue, then there might come a time when a producer has a task that the consumer has no capacity to consume. In that case, we need to define a rejection policy. If it’s an unbounded queue, then it can overflow, and we must watch it closely.
Let’s look at what happens when our queue is longer than our list of available threads:
class Task(val started: Long) : Callable<Long> {
override fun call() = (System.currentTimeMillis() - started).also { Thread.sleep(100) }
}
val workerQueue = LinkedBlockingQueue<Runnable>(/*unbounded*/)
val worker = ThreadPoolExecutor(/*2 threads*/)
buildList {
repeat(6) { add(Task(System.currentTimeMillis())) }
}.let(worker::invokeAll)
.map(Future<Long>::get)
.also { println(it) }
The output will produce a kind of a ladder with values falling into three distinct groups, each bigger than the previous by about 100, like this: [4, 4, 110, 110, 211, 215]. That means the thread pool kept the tasks in the queue until there were free threads to execute them.
So if we continue our example and offer more tasks than the thread pool can handle, we can choose from the following options:
- DiscardPolicy – just ignore the task
- DiscardOldestPolicy – ignore the oldest previously submitted task; execute this one instead
- AbortPolicy – throw an exception
- CallerRunsPolicy – the caller thread will run the task instead
- Our own custom policy – any implementation of RejectedExecutionHandler will do
If the worker queue has bounds or can’t accept new tasks for other reasons, the chosen rejection policy will come into effect.
5. Terminating a Thread Pool
Another case when the rejection policy might affect us is when we shut the thread pool down. There are three ways to terminate a thread pool. The first one is just shutdown():
val worker = Executors.newFixedThreadPool(2)
buildList {
repeat(5) { add(Task(System.currentTimeMillis())) }
}.map(worker::submit)
worker.shutdown()
worker.submit { println("any task, really") }
The last line will invoke the rejection policy, which in this case is the AbortPolicy.
The second way to shut down the thread pool is to awaitTermination() for several time units:
worker.awaitTermination(10, TimeUnit.MILLISECONDS)
In both cases, already submitted tasks continue to run, and new tasks are not accepted. The threads will die once they finish all running tasks.
The third way is to shutdownNow(). This will not only block submitting new tasks but also will immediately interrupt all the worker threads and return a list of tasks that were still in a queue:
val unrun: List<Runnable> = worker.shutdownNow()
While all the worker threads receive an interrupt signal, their behavior is unspecified, as the thread cancellation in Java is cooperative.
Both shutdown methods will return immediately, while awaitTermination() will block for the specified timeout.
6. Conclusion
In this tutorial, we reviewed why we need to create thread pools and how we can do it. Executors API covers the most frequent cases, but we can also customize thread pool settings if the task at hand requires it. ThreadPoolExecutor provides the engine for simple one-off tasks, while ForkJoinPool allows us to parallelize complex workflows adaptively. Scheduled thread pools can run delayed or repeated tasks.
As always, all the code can be found over on GitHub.