1. Overview
Kotlin Coroutines were introduced together with a structured concurrency approach that allows developers to write asynchronous code in a synchronous way. It was quite unusual to many adopters of the technology, especially for those who came from the Rx world, where the key point is operating on sequences or streams of data.
Taking that into account, Kotlin introduced flows and channels.
In this tutorial, we’ll briefly look through these APIs, and compare their differences.
2. What Is Flow?
A Flow is an asynchronous data stream that can be observed and manipulated. It’s similar to an Observable in RxJava.
Flows are composed of events, which can represent anything from user input to data from a network request.
The Flow API provides a set of intermediate operators to control the flow of data, including mapping, filtering, and combining events. Besides intermediate there are a set of terminal operators such as collect(), reduce(), single(), toList(), etc. They are applied to the upstream flow and trigger execution of all operations, which is also called collecting the flow, based on the name of the most basic terminal operator.
It’s worth mentioning that collecting always happens in a suspending manner without blocking the thread.
2.1. Cold Streams or Hot Streams
Usually flows represent cold streams. In brief, a cold stream does not produce any values if there’s no one to collect those values. So, typically flows are cold streams: they don’t carry any information and they start producing items only when collecting starts and triggers execution.
For example, let’s create a simple flow that emits five items with a small delay. When we try to collect this flow twice, we’ll receive the same data:
@Test
fun when_collected_flow_multiple_time_then_return_same_values() = runBlocking {
val coldStream = flow {
for (i in 1..5) {
delay(100L)
emit(i)
}
}
val collect1 = buildString {
coldStream.collect { append(it).append(", ") }
}.removeSuffix(", ")
val collect2 = buildString {
coldStream.collect { append(it).append(", ") }
}.removeSuffix(", ")
assertEquals("1, 2, 3, 4, 5", collect1)
assertEquals("1, 2, 3, 4, 5", collect2)
}
Here we see that two collect() operations on the same flow – coldStream – result in two independent copies of the data.
Conversely, hot streams start producing items immediately.
Starting from kotlinx.coroutines.1.4.0 Kotlin provides an updated Flow API that includes two subtypes StateFlow and SharedFlow that represent hot streams. This means that emitted items will be different from the same running source on each collection. These two subtypes were introduced to replace ConflatedBroadcastChannel because of API complexity and also to resolve some eliminate some logical inconsistencies for the cases when Channel was used for state management.
2.2. StateFlow and SharedFlow
These flow types can live without having an active consumer and waiting for one to start collecting the flow. This means that the data is produced independently outside of the stream and won’t be affected if the flow collection has started or not. It may never even be collected. In other words, these flows are effectively hot streams.
We can convert any cold flow to a hot one using the stateIn() and shareIn() operators to StateFlow and SharedFlow respectively.
3. What Is Channel?
Now, let’s talk about channels. Like flows, they also can work with data streams. Unlike flows, channels are always hot streams. It means that the data is always produced outside of the stream.
The main goal of channels is to provide a way for coroutines to communicate with each other. Channels can send and receive streams of data.
Let’s try this by creating a coroutine using the launch builder which emits five items using the send() function. We’ll receive all of them in another coroutine using the consumeEach() function:
@Test
fun given_channel_when_pass_data_from_one_coroutine_then_receive_in_another() = runBlocking {
val channel = Channel<Int>()
launch { // coroutine #1
for (i in 1..5) {
delay(100L)
channel.send(i)
}
channel.close()
}
val result = async { // coroutine #2
buildString {
channel.consumeEach {
append(it).append(", ")
}
}.removeSuffix(", ")
}
assertEquals("1, 2, 3, 4, 5", result.await())
}
Conceptually, the channel works similarly to BlockingQueue in Java. The key difference of Channel is that instead of blocking operations – put() and take() – it has respective suspending ones – send() and receive().
We can create different types of channels using the factory function Channel() where we need to specify its capacity.
We can also configure buffered channels with additional onBufferOverflow parameter. This allows us to control the behavior of the channel’s send() function when the buffer becomes full.
4. Difference Between Flows and Channels in Kotlin
So, now that we’re familiar with both concepts, let’s look at the differences between flows and channels in Kotlin.
4.1. Difference on API Level
The first and most obvious difference is that flows are usually cold, and channels are always hot data streams. Channels start emitting data immediately no matter whether anything actually listens/collects/receives their data. This is the usual case, though, after the introduction of StateFlow and SharedFlow, flows can also be hot.
Unlike Channel, StateFlow always has an initial value. There’s a clear separation between mutable and immutable implementations. So when we need to restrict the emission we can expose the immutable version to the outside, which isn’t possible with the Channel API.
Both SharedFlow and StateFlow are subtypes of the Flow interface. It means we don’t need to learn a new API but keep using the Flow API, which is simpler than the Channel API but still powerful.
4.2. Scoping
We may also notice another big difference between flows and channels. Channels aren’t scoped to a specific lifecycle, unlike flows. If we want to close/cancel a Channel, we need to explicitly use the methods provided by the Channel API. So, let’s say we need to have a data stream that survives across the screen lifecycle, then we have to use channels.
On the other hand, if we need to scope the emissions to a certain component’s lifecycle, flows are handy. For instance, if we’re using the MVVM architecture and viewModelScope, all the flows started with that scope will be canceled when ViewModel gets cleared.
4.3. Sequential Data Processing
Flows are a good fit for sequential data processing.
For example, we are working with multiple data sources (local and remote) in our app. A typical approach in this use case would be to check if there’s some data in local storage and return it. Then make an API call to fetch some new data, and store it locally. And finally, to return the updated data from the local storage.
We could start with a function to fetch data from remote data source:
fun getRemoteItems(): Flow<List<Item>> = callbackFlow {
val remoteItems = // get a list of items from a remote data source
trySend(remoteItems)
}
And for the local storage, we have a DAO interface:
interface ItemsDao {
fun getAllItems()
fun insertAllItems(items: List<Item>)
}
Given this input we can deliver the data to the user from multiple data sources:
fun getItems(): Flow<List<Item>> {
val items = itemsDao.getAllItems()
return getRemoteItems()
.onStart { emit(items) }
.onEach { result ->
if (result != items) {
itemsDao.insertAllItems(result)
}
}
}
The function emits data from local storage to the flow as the first item, while waiting for the remote source to send some newer data. If the data is different we’ll save it to the local storage. To see this data we have to collect it from the flow:
getItems()
.distinctUntilChanged()
.collect { items ->
println(items)
}
4.4. Producer-Consumer Pattern
When we’re talking about channels, there is always a sender and receiver (and most likely a buffer). So Channel plays the role of a communication mechanism for synchronization points in two or more concurrent executions.
In concurrent programming, we often can find a code where one entity that performs concurrent code (consumer) awaits some data from another entity (producer). We can recognize here a producer-consumer pattern.
Channels help us to implement this pattern in Kotlin when working with coroutines. So, we can satisfy any scenario that requires using a producer-consumer (or publisher-subscriber) pattern:
There is a convenient coroutine builder named produce { … } that makes it easy to do it right on the producer side, and an extension function consumeEach(), that replaces a for loop on the consumer side:
@Test
fun when_use_produce_then_consumeEach_receives_all_values() = runBlocking {
val channel = Channel<Int>()
produce<Int> {
for (i in 1..5) {
channel.send(i)
}
channel.close()
}
val result = async {
buildString {
channel.consumeEach {
append(it).append(", ")
}
}.removeSuffix(", ")
}
assertEquals(result.await(), "1, 2, 3, 4, 5")
}
There is an alternative way to create a channel – we can convert Flow using the produceIn() function. It creates a producer channel with a default buffer size, which is 0. To override this we need to use the buffer operator and specify the required capacity and/or onBufferOverflow parameter before calling produceIn():
@Test
fun when_flow_produceIn_then_consume_all_values() = runBlocking {
val channel = flow {
for (i in 1..5) {
delay(100)
emit(i)
}
}.buffer(
capacity = 2,
onBufferOverflow = BufferOverflow.SUSPEND
).produceIn(this)
val result = async {
buildString {
channel.consumeEach {
append(it).append(", ")
}
}.removeSuffix(", ")
}
assertEquals(result.await(), "1, 2, 3, 4, 5")
}
5. Conclusion
In this article, we’ve seen the difference between Flow and Channel concepts, their purposes, and usage.
We have learned that all operations inside a Flow execute sequentially, so if we need to process data sequentially, flows will be a good choice. While channels are a better fit for communication between different coroutines, or any use case that requires a producer-consumer approach.
Another significant difference is that flows require scoping the emission to a certain component’s lifecycle, while channels don’t have this requirement and we need to explicitly close/cancel the Channel to avoid leaks.
There are also SharedFlow and StateFlow, which combine the benefits of both concepts.
All examples and code snippets from this tutorial can be found over on GitHub.