1. Introduction
In this tutorial, we’ll look into various types of coroutines we might want to create in our application and how to compose them together.
Often articles concerning coroutines only deal with the “Hello, world!” level of scenarios. However, our real-life tasks or already written code are much more complex. This is why we want to bridge that gap and provide some useful tools on how to deal with multiple aspects of a multi-threaded concurrent application.
2. A Task Closer to the Reality
Let’s consider a task that might arise during our actual work project. To meet the requirements for the task we must:
- make two requests to other endpoints, the order of the requests doesn’t matter
- apply some business logic
- write the result of the business logic procedure into the database via JDBC
- log some information for audit and debug
- return the response to the user
Let’s use coroutines where possible. Then some of the steps will lend themselves naturally to the coroutine approach, like two HTTP requests. Others, like a JDBC operation, are blocking and therefore will need more thinking:
fun processCallAsync(userInput: UserInput): Deferred<String> = async {
val responseA = client.get(urlA, userInput.query)
val responseB = client.get(urlB, userInput.query)
(responseA.body + responseB.body)
.also {
storeToDatabase(userInput, it)
logger.info("User input $userInput was processed with the result $it")
}
}
The important part of the business logic step is that it requires both responses, so it is enough to just concatenate the responses.
3. Understanding Context
Coroutines are a language construct, not an actual representation of what is happening inside the Virtual Machine. Like any other set of instructions in JVM-universe, coroutines must have a thread on which to run. Threads, in their turn, shouldn’t live unattached to anything but should reside in thread pools.
Apart from this, coroutines might need some business information to use during their runtime. One example of this is the MDC context. We can also redefine the uncaught exception handler and other settings.
All that can be done inside a construct that is called CoroutineContext. A CoroutineContext is usually provided by a CoroutineScope. A CoroutineScope manages the lifecycle of the coroutines it creates.
The most important part of a CoroutineContext is a CoroutineDispatcher: it defines the thread pool which provides the thread for the set of instructions which is the coroutine.
So, which parts of our “real” function do we want to execute in the same context and which we would like to separate?
There are three reasons to separate contexts:
- The function within a coroutine might block the thread which executes it, thus preventing other coroutines from using the same thread. This defeats the purpose of having cooperative concurrency. The context for a blocking task should contain its dispatcher.
- The lifecycle of a sub-coroutine might differ from its creator. We might want to proceed with it even if the creator fails, or we might want to make sure that the creator will proceed no matter what happens to the sub-coroutine. The context for such tasks shouldn’t be a child of the caller context.
- We are adding new keys to a CoroutineContext. This is possible only by creating a new child context.
4. Blocking Input-Output Tasks
The advantage of coroutines is that due to their collaborative nature they can satisfy the same set of requirements as traditional threaded code by a lesser number of threads. However, the computational load for the coroutine code isn’t lower than for the threaded code. In fact, since during runtime coroutines are compiled to a state-machine with a certain amount of additional instructions for switching between the coroutines, the computational load for coroutine-assisted code is slightly higher. So if tasks performed by coroutines are CPU-heavy we won’t get many advantages out of them.
The same is true for the IO-tasks if they block the thread in which they run. While the thread waits for the completion of an IO operation, it cannot do anything. This might even starve the CoroutineDispatcher out of threads, leading to Denial of Service.
4.1. A Separate Context for IO
It would be a good idea to at least offload blocking IO-tasks to a separate context. That way a sudden slowdown in disk or database operations won’t reduce the throughput of the main function:
launch(Dispatchers.IO) {
storeToDatabase(userInput, it)
}
Kotlin coroutine library offers a standard Dispatchers.IO dispatcher for such tasks. We must remember, however, that it is the default JVM ForkJoinPool, and it is relatively easy to starve it out of threads.
Another approach we can choose is to use FileChannels for file operations and R2DBC connectors to relational databases. That is to say, it is reasonable to go fully asynchronous in a coroutine-based application.
4.2. A Dedicated Executor for IO Tasks
Still, another way to address the problem would be to post our tasks to a queue and consume that queue with a single thread. Between posting a task and receiving an answer the producer thread is free:
class AsyncWorker(val writerFunction: (Pair<UserInput, String>) -> Int) {
private val executor = Executors.newSingleThreadExecutor()
fun submit(input: Pair<UserInput, String>): CompletableFuture =
CompletableFuture.supplyAsync({ writerFunction(input) }, executor)
}
fun storeToDatabaseAsync(userInput: UserInput, result: String): CompletableFuture<Int> =
asyncDbWriter.submit(userInput to result)
We may note how an Executor implements both the queue and its handling for us. When we want to see the result of this task, we can block the coroutine (but not its thread!) with an extension function await() from the helper library kotlinx-coroutines-jdk8:
storeToDatabaseAsync(userInput, result).await()
5. Map-Reduce Pattern in Asynchronous Tasks
Now that we have got the complex IO task out of the way let’s deal with the thing the coroutines were invented for: simultaneous asynchronous network calls. There is no reason whatsoever to stay blocked and keep all that context loaded while crucial hundreds of milliseconds slip away.
5.1. Concurrent Launching of a Coroutine
Kotlin promotes the concept of explicit concurrency: nothing we write in Kotlin will run in another thread unless we consciously and intentionally ask the system to do so. In our case it means that to get those two network calls on the wire at roughly the same time we need to wrap them into async {}:
val responseA = async { client.get(urlA, userInput.query) }
val responseB = async { client.get(urlB, userInput.query) }
return responseA.await().body + responseB.await().body
When we need the results, we execute await() on the Deferred objects that async calls gave us. This approach is reminiscent of a Map-Reduce pattern – first we “map” our queries to various URLs and then we “reduce” their responses to compose a final answer.
5.2. Bulk Concurrent Launching
However, our example is still a very simple one. Sometimes we need to call many sources known only at runtime, or just a lot of similar resources. In that case, we cannot create a variable for each such call*.* Instead, we can place all the calls via async{} first. They will execute as soon as possible, however, the coroutine will continue its execution until we awaitAll() on them:
val result: List<Response> = urls.map { url -> async { client.get(url, userInput.query) } }
.awaitAll()
We must remember, though, that awaitAll() will fail even if one of the requests fails. It will also cancel the scope we are running in. Alternatively, we can await each one of them in case we can tolerate some failures:
urls.map { url -> async { runCatching { client.get(url, userInput.query) } } }
.mapNotNull { deferred ->
deferred.await().fold(
onSuccess = { it },
onFailure = {
logger.error("Error during one of the calls", it)
null
}
)
}
Such an approach can be useful for aggregator sites: even if one of our providers (e.g., a car rental provider) hasn’t answered, we can still return the rest of the results.
Similarly to awaitAll(), there is a joinAll() call in case we aren’t interested in the result of the operations, just in their success, and therefore used launch{} to start them:
results.map { launch { storeToDatabase(userInput, it) } }.joinAll()
6. Fire and Forget Coroutines
It can also happen that the result of the operation isn’t interesting to us. For instance, it is an audit logging and we just want it done sometime after the operation completes. In such cases, we may use the launch {} function.
We might want to launch these fire-and-forget operations in the same context or another one. Another context might be a child of our original context or else it might be on its own. The difference is mainly in cancellation: should our main context be canceled, all of our coroutines within it will be canceled as well. The same is true for related context: the cancellation of a parent context means the cancellation of all its children:
launch {
logger.info("User input $userInput was processed with the result $it")
}
For the audit log, it makes sense to only complete it if the action overall is successful, so we will use the same context.
In other cases, we might want to make sure that the launched set of instructions actually succeeded before moving forward. This is easy because a launch {} call will return a Job instance, which we can then join():
launch(Dispatchers.IO) {
storeToDatabase(userInput, it)
}.join()
7. Bridging Normal and Suspending Functions
The coroutine approach is easiest when the whole application is built with it in mind, like a Ktor application. However, even traditional threaded applications may benefit from coroutines, if they do many non-CPU tasks. Then the question arises: how to launch a root coroutine, which launches everything else?
The simplest way is to just use runBlocking:
runBlocking {
processCallAsync(userInput).await()
}
Then the thread where we do this will block until the argument lambda finishes or throws. Or we can do it in a fire-and-forget style:
fun processCallAndForget(userInput: UserInput) = launch {
processCallAsync(userInput)
}
The launch call will return a Job object, which we might want to cancel or check if it is still active (isActive). In case we want to block until it finishes, we can do that as well:
val job = launch { /*coroutine code here*/ }
runBlocking {
job.join()
}
However, joining suspends and has to be wrapped in runBlocking.
8. Conclusion
In this article, we carefully considered the lifecycle of each subroutine within a coroutine section of our application. Some of them are fine as simple suspend functions and will run within the same context. Others might require their context: to execute concurrently or to survive past the coroutine that launched them.
We looked at basic approaches that can be used to split the main coroutine into several worker coroutines, and also how to join them back together and use their results. We also discussed how to call blocking code from a non-blocking context, and how to launch a non-blocking routine from traditional threaded code.
The code from the samples can be found over on GitHub.