1. Overview
Coroutines are the preferred way to build non-blocking, concurrent applications in Kotlin. In this tutorial, we’ll learn about channels. They allow coroutines to communicate with each other.
2. What Is a Channel?
A channel is conceptually similar to a queue. One or more producer coroutines write to a channel. One or more consumer coroutines can read from the same channel. A channel has a suspending send function and a suspending receive function. This means that several coroutines can use channels to pass data to each other in a non-blocking fashion.
Let’s see an example of a Channel:
@Test
fun should_pass_data_from_one_coroutine_to_another() {
runBlocking {
// given
val channel = Channel<String>()
// when
launch { // coroutine1
channel.send("Hello World!")
}
val result = async { // coroutine2
channel.receive()
}
// then
assertThat(result.await()).isEqualTo("Hello World!")
}
}
At first, we create a channel. Next, we launch coroutine1 and send the value “Hello World!” to the channel. Finally, we create coroutine2 using the async coroutine builder. Coroutine2 returns a result when it finishes. The channel.receive() call inside the coroutine2 returns the value written by the coroutine1.
3. Types of Channels
There are four types of channels, and they differ in the number of values they can hold at a time. Let’s take a detailed look at each type.
3.1. Rendezvous Channel
A rendezvous channel has no buffer. The sending coroutine suspends until a receiver coroutine invokes receive on the channel. Similarly, a consuming coroutine suspends until a producer coroutine invokes send on the channel. We create a rendezvous channel using the default Channel constructor with no arguments.
Let’s see an example of this type of channel:
val basket = Channel<String>()
launch { // coroutine1
val fruits = listOf("Apple", "Orange")
for (fruit in fruits) {
println("coroutine1: Sending $fruit")
basket.send(fruit)
}
}
launch { // coroutine2
repeat(2) {
delay(100)
println("coroutine2: Received ${basket.receive()}")
}
}
Let’s see the output of this program:
coroutine1: Sending Apple
coroutine2: Received Apple
coroutine1: Sending Orange
coroutine2: Received Orange
Coroutine1 tries to send the value “Apple” and immediately suspends it as there are no receivers. Coroutine2 receives this value and suspends it as there are no more values to be received from the channel. Coroutine1 now un-suspends and sends the next value to the channel.
3.2. Buffered Channel
As the name suggests, a buffered channel has a predefined buffer. We can specify the capacity of the buffer in the Channel constructor.
Let’s change the previous example to see an example of a buffered channel:
val basket = Channel<String>(1)
launch { // coroutine1
val fruits = listOf("Apple", "Orange", "Banana")
for (fruit in fruits) {
println("coroutine1: Sending $fruit")
basket.send(fruit)
}
}
launch { // coroutine2
repeat(3) {
delay(100)
println("coroutine2: Received ${basket.receive()}")
}
}
Let’s see the output of this program:
coroutine1: Sending Apple
coroutine1: Sending Orange
coroutine2: Received Apple
coroutine1: Sending Banana
coroutine2: Received Orange
coroutine2: Received Banana
This time, coroutine1 writes “Apple” without suspending. But it suspends when trying to write “Orange”. This is because we created the channel with a buffer capacity of one.
So, it can hold one value in the buffer even if there are no receivers receiving this value at the moment, but coroutine1 must wait (suspend) before writing more values to the channel since the buffer is full.
Once coroutine2 reads the value from the buffer, coroutine1 un-suspends and writes the next value to the channel.
3.3. Unlimited Channel
An unlimited channel has a buffer of unlimited capacity. But, we should be aware that we may run into OutOfMemoryError if the buffer overloads and all of the available memory is exhausted. We can create an unlimited channel by providing the special constant UNLIMITED to the Channel constructor.
Let’s see an example of this channel:
val channel = Channel<Int>(UNLIMITED)
launch { // coroutine1
repeat(100) {
println("coroutine1: Sending $it")
channel.send(it)
}
}
launch { // coroutine2
repeat(100) {
println("coroutine2: Received ${channel.receive()}")
}
}
Let’s examine the output of this program:
coroutine1: Sending 0
coroutine1: Sending 1
...
coroutine1: Sending 98
coroutine1: Sending 99
coroutine2: Received 0
coroutine2: Received 1
...
coroutine2: Received 98
coroutine2: Received 99
As we can see, coroutine1 writes all 100 values to the channel without ever suspending, thanks to the unlimited buffer capacity.
3.4. Conflated Channel
In a conflated channel, the most recently written value overrides the previously written value. Therefore, the send method of the channel never suspends. The receive method receives only the latest value.
Let’s see an example of this type of channel:
val basket = Channel<String>(CONFLATED)
launch { // coroutine1
val fruits = listOf("Apple", "Orange", "Banana")
for (fruit in fruits) {
println("coroutine1: Sending $fruit")
basket.send(fruit)
}
}
launch { // coroutine2
println("coroutine2: Received ${basket.receive()}")
}
Let’s examine the output of this program:
coroutine1: Sending Apple
coroutine1: Sending Orange
coroutine1: Sending Banana
coroutine2: Received Banana
We see that coroutine1 sends three values to the channel, but coroutine2 receives only the last value. This is because coroutine2 is a slow consumer. By the time it reads from the basket, coroutine1 has overwritten previously written values.
4. Implementing Producer-Consumer With Channels
In concurrent programs, we often need to implement a program that produces a sequence of values. Another program consumes these values as and when they become available.
The two programs run simultaneously but they share a communication mechanism to pass values to each other. This is commonly known as the producer-consumer pattern.
Let’s see how we can implement the producer-consumer pattern using Kotlin coroutines and channels.
4.1. One Consumer Consuming From One Producer
We can use the produce coroutine builder method to create a producer coroutine.
Let’s see how we can create a producer:
fun CoroutineScope.produceFruits(): ReceiveChannel<String> = produce {
val fruits = listOf("Apple", "Orange", "Apple")
for (fruit in fruits) send(fruit)
}
Here we note that the produce coroutine returns a ReceiveChannel. The ReceiveChannel has only the receive method. It does not have a send method. This means that another coroutine can only read from this output channel.
We can use regularly for loop syntax to iterate over the values present in the ReceiveChannel.
Let’s now see how we can consume the values from the producer:
val fruitChannel = produceFruits()
for (fruit in fruitChannel) {
println(fruit)
}
Let’s see the output of this consumer:
Apple
Orange
Apple
End!
As we can see, the consumer code receives the values in the order they were produced by the producer. The producer and consumer coroutines run concurrently. They use fruitChannel to communicate with each other.
4.2. Several Consumers Consuming From One Producer
We can create several consumers that consume values produced by one producer. This way we can distribute work among several consumers.
Let’s create a producer that produces ten pizza orders per second:
fun CoroutineScope.producePizzaOrders(): ReceiveChannel<String> = produce {
var x = 1
while (true) {
send("Pizza Order No. ${x++}")
delay(100)
}
}
Let’s now create a pizza order processor – a consumer:
fun CoroutineScope.pizzaOrderProcessor(id: Int, orders: ReceiveChannel<String>) = launch {
for (order in orders) {
println("Processor #$id is processing $order")
}
}
This coroutine takes the orders channel as an input parameter. New pizza orders will arrive on this channel.
Now let’s run three instances of pizza order processor and distribute the work among them:
fun main() = runBlocking {
val pizzaOrders = producePizzaOrders()
repeat(3) {
pizzaOrderProcessor(it + 1, pizzaOrders)
}
delay(1000)
pizzaOrders.cancel()
}
Let’s examine the output of this program:
Processor #1 is processing Pizza Order No. 1
Processor #1 is processing Pizza Order No. 2
Processor #2 is processing Pizza Order No. 3
Processor #3 is processing Pizza Order No. 4
Processor #1 is processing Pizza Order No. 5
Processor #2 is processing Pizza Order No. 6
Processor #3 is processing Pizza Order No. 7
Processor #1 is processing Pizza Order No. 8
Processor #2 is processing Pizza Order No. 9
Processor #3 is processing Pizza Order No. 10
We see that the order processing work is almost equally distributed among the three processors.
4.3. One Consumer Consuming From Several Producers
We can write to a channel from several producer coroutines. A consumer coroutine can read all messages from that channel.
Let’s create two producers. One producer will fetch YouTube videos, and another will fetch tweets:
suspend fun fetchYoutubeVideos(channel: SendChannel<String>) {
val videos = listOf("cat video", "food video")
for (video in videos) {
delay(100)
channel.send(video)
}
}
suspend fun fetchTweets(channel: SendChannel<String>) {
val tweets = listOf("tweet: Earth is round", "tweet: Coroutines and channels are cool")
for (tweet in tweets) {
delay(100)
channel.send(tweet)
}
}
Now let’s launch both producers and consume the values they produce:
fun main() = runBlocking {
val aggregate = Channel<String>()
launch { fetchYoutubeVideos(aggregate) }
launch { fetchTweets(aggregate) }
repeat(4) {
println(aggregate.receive())
}
coroutineContext.cancelChildren()
}
Let’s check the output of this program:
cat video
tweet: Earth is round
food video
tweet: Coroutines and channels are cool
We see that we receive values produced by both producers in the aggregate channel.
5. Pipelines Using Channels
We can combine several producers and consumers in a chain to create a pipeline for data processing. Let’s take the example of a shop that makes pizzas. We can divide the pizza-making process into several steps. For the sake of simplicity, we’ll divide it into two steps – baking and topping.
Let’s now see how we can implement these steps using coroutines. The baking coroutine produces a basic baked pizza that is consumed by the topping coroutine. The topping coroutine applies the necessary toppings, and the output is ready for serving.
Let’s see a simple implementation of the baking and topping coroutines:
fun CoroutineScope.baking(orders: ReceiveChannel<PizzaOrder>) = produce {
for (order in orders) {
delay(200)
println("Baking ${order.orderNumber}")
send(order.copy(orderStatus = BAKED))
}
}
fun CoroutineScope.topping(orders: ReceiveChannel<PizzaOrder>) = produce {
for (order in orders) {
delay(50)
println("Topping ${order.orderNumber}")
send(order.copy(orderStatus = TOPPED))
}
}
Let’s create another coroutine for producing a given number of dummy pizza orders:
fun CoroutineScope.produceOrders(count: Int) = produce {
repeat(count) {
delay(50)
send(PizzaOrder(orderNumber = it + 1))
}
}
Finally, let’s combine all of these coroutines to create a pipeline:
fun main() = runBlocking {
val orders = produceOrders(3)
val readyOrders = topping(baking(orders))
for (order in readyOrders) {
println("Serving ${order.orderNumber}")
}
delay(3000)
coroutineContext.cancelChildren()
}
At first, we create three pizza orders. Then we pass the orders through the baking and topping coroutines in order. Finally, we iterate over the ready orders and serve each one as it arrives.
Let’s check the output:
Baking 1
Topping 1
Serving 1
Baking 2
Topping 2
Serving 2
Baking 3
Topping 3
Serving 3
As we can see, all steps of a pizza order preparation follow the order as expected.
6. Ticker Channels
Ticker channel is the coroutine equivalent of a traditional timer. It produces a Unit value at a specified regular interval. This type of channel is useful for performing a job at a regular interval.
Let’s take an example of a simple stock price fetcher. Our program will fetch the price of a given stock every five seconds. Let’s see the implementation using the ticker channel:
fun stockPrice(stock: String): Double {
log("Fetching stock price of $stock")
return Random.nextDouble(2.0, 3.0)
}
fun main() = runBlocking {
val tickerChannel = ticker(Duration.ofSeconds(5).toMillis())
repeat(3) {
tickerChannel.receive()
log(stockPrice("TESLA"))
}
delay(Duration.ofSeconds(11).toMillis())
tickerChannel.cancel()
}
Let’s check the output:
14:11:18 - Fetching stock price of TESLA
14:11:18 - 2.7380844072456583
14:11:23 - Fetching stock price of TESLA
14:11:23 - 2.3459508859536635
14:11:28 - Fetching stock price of TESLA
14:11:28 - 2.3137592916266994
Here we see that a new stock price is printed every five seconds. When we’re done, we stop the ticker channel by calling the cancel method on it.
7. Conclusion
In this tutorial, we’ve learned what channels are and how we can use them with coroutines to create asynchronous programming. Further, we also implemented the producer-consumer and pipeline patterns using coroutines and channels.
As usual, all the examples are available over on GitHub.