1. Introduction

Kotlin Flows provides developers with an easy API to make handling asynchronous data streams very easy and convenient. In some scenarios, we may need to sequentially concatenate two Kotlin flows such that the elements of the second flow are emitted after all the elements of the first flow.

In this tutorial, we’ll investigate various approaches for sequential concatenation of Kotlin flows.

2. Using a Custom Flow Builder

We can create a custom flow builder to sequentially collect from the first flow and then from the second flow:

fun concatenateFlowsUsingCustomBuilder(flow1: Flow<Int>, flow2: Flow<Int>): Flow<Int> = flow {
    flow1.collect { emit(it) }
    flow2.collect { emit(it) }
}

This helper method uses a custom flow builder to sequentially collect elements from both flows. We use the collect() method to gather elements from flow1 and emit them using the emit method. After collection is complete for flow1, we repeat the same process for flow2. This ensures that flow2 starts emitting values only after flow1 has finished.

Let’s test it:

@Test
fun `concatenate two flows using  custom flow builder`() = runBlocking {
    val flow1 = flowOf(1, 2, 3)
    val flow2 = flowOf(4, 5, 6)
    val result = concatenateFlowsUsingCustomBuilder(flow1, flow2).toList()

    assertEquals(listOf(1, 2, 3, 4, 5, 6), result)
}

Now, using our helper method, we are able to see that our flows are sequentially concatenated by converting the results from the concatenation into a list and then checking it.

3. Using the flattenConcat() Method

Another alternative is to use the flattenConcat() method. Specifically, this operator flattens a flow of flows and concatenates them sequentially:

@Test
fun `concatenate two flows using flattenConcat method`() = runBlocking {
    val flow1 = flowOf(1, 2, 3)
    val flow2 = flowOf(4, 5, 6)
    val result = flowOf(flow1, flow2).flattenConcat().toList()
        
    assertEquals(listOf(1, 2, 3, 4, 5, 6), result)
}

Here, flowOf(flow1, flow2) creates a flow of flows. Next, we use the flattenConcat() method to concatenate these flows sequentially, ensuring that flow2 starts emitting values immediately after flow1 has finished emitting all its values.

Finally, we check that the list is correct.

4. Using the onCompletion() Method

Another approach to concatenate flows is by using the onCompletion() method. This enables us to chain another flow when the original flow completes emission:

@Test
fun `concatenate two flows using onCompletion method`() = runBlocking {
    val flow1 = flowOf(1, 2, 3)
    val flow2 = flowOf(4, 5, 6)
    val result = flow1.onCompletion { emitAll(flow2) }.toList()

    assertEquals(listOf(1, 2, 3, 4, 5, 6), result)
}

We call the onCompletion() method on flow1 to ensure that flow2 starts emitting values only after flow1 has completed.

Finally, we convert the results to a list using the toList() method.

5. Using the collect() and emitAll() Methods

Alternatively, we can manually collect all the values from one flow and emit them into another flow using the collect() and emitAll() methods:

fun concatenateFlowsUsingEmitAll(flow1: Flow<Int>, flow2: Flow<Int>): Flow<Int> = flow {
    flow1.collect { emit(it) }
    emitAll(flow2)
}

What this means is that we first sequentially gather all the emissions from the first flow. Then, only after it completes, we begin emitting the values from the second flow.

Consequently, this approach ensures a clear, step-by-step transfer of data from the first flow to the second, thereby maintaining the order of operations without interleaving elements from both flows.

Let’s try it out:

@Test
fun `concatenate two flows using collect and emitAll method`() = runBlocking {
    val flow1 = flowOf(1, 2, 3)
    val flow2 = flowOf(4, 5, 6)
    val result = concatenateFlowsUsingEmitAll(flow1, flow2).toList()

    assertEquals(listOf(1, 2, 3, 4, 5, 6), result)
}

6. Using Flux and concat() Methods

In this approach, we leverage the capabilities of Project Reactor’s Flux.concat() to concatenate two Kotlin Flows sequentially.

6.1. Dependency

To use the Project Reactor’s library, we have to include its dependency.

For Maven, we add this code to the pom.xml file:

<dependency>
    <groupId>org.jetbrains.kotlinx</groupId>
    <artifactId>kotlinx-coroutines-reactor</artifactId>
    <version>1.6.0</version>
</dependency>

<dependency>
    <groupId>io.projectreactor</groupId>
    <artifactId>reactor-core</artifactId>
    <version>3.4.11</version>
</dependency>

For Gradle, we need to add this code to the build.gradle file:

implementation "org.jetbrains.kotlinx:kotlinx-coroutines-reactor:1.6.0"
implementation "io.projectreactor:reactor-core:3.4.11"

6.2. Usage

Now, let’s use the Flux class to achieve our goal:

fun concatenateFlowsUsingReactive(flow1: Flow<Int>, flow2: Flow<Int>): Flow<Int> {
    val flux1 = flow1.asFlux()
    val flux2 = flow2.asFlux()
    return Flux.concat(flux1, flux2).asFlow()
}

This method converts Kotlin Flows into Reactor Flux objects using the asFlux() method. Next, we perform the concatenation using the concat() method, converting the result back to Kotlin Flows using asFlow():

@Test
fun `concatenate two flows using reactive`() = runBlocking {
    val flow1 = flowOf(1, 2, 3)
    val flow2 = flowOf(4, 5, 6)
    val result = concatenateFlowsUsingReactive(flow1, flow2).toList()

    assertEquals(listOf(1, 2, 3, 4, 5, 6), result)
}

7. Conclusion

In this article, we’ve explored various ways of concatenating two Kotlin Flows sequentially.

Firstly, we examined using a custom flow builder, which involves manually collecting and emitting elements from both flows. We also looked at the flattenConcat() method, which flattens a flow of flows for sequential emission.

Additionally, we covered the onCompletion() method to chain another flow upon completion of the first, and the collect() and emitAll() methods to gather and emit values in sequence.

Finally, we leveraged Project Reactor’s Flux.concat() to achieve sequential concatenation by converting flows to Reactor Flux objects and back.

When deciding which method to use, it’s important to consider the nature and size of the data being emitted. For instance, if we are dealing with large amounts of data or data that takes a long time to emit, approaches like flattenConcat() and Flux.concat() can provide more efficient handling due to their built-in optimizations for reactive streams. On the other hand, simpler use cases might benefit from the clarity and straightforwardness of custom flow builders or the onCompletion() method. By understanding the characteristics of our data and the capabilities of each approach, we can choose the best method to achieve optimal performance and maintainability.


« 上一篇: File Upload in Kotlin