1. Overview
Using parallel operations on Kotlin collections allows us to process elements in a collection concurrently, taking advantage of multiple processor cores to improve performance. This can be very useful for computationally intensive tasks such as filtering, mapping, and data reduction.
In this article, we’ll discuss some approaches to perform parallel operations on Kotlin collections.
2. Parallel Operations on Collections
To explain how parallel operations work, we’ll use the following collection:
data class Person(val name: String, val age: Int, var isAdult: Boolean? = null)
private val people = listOf(
Person("Martin", 12),
Person("Ahmad", 42),
Person("Alina", 13),
Person("Alice", 30),
Person("Bob", 16),
Person("Charlie", 40)
)
In our examples, we’ll assign adult status (isAdult = true) if age is greater than or equal to 18 and isAdult = false if it is less than 18.
To make the parallel operations clearer, we’ll also display the system time and thread name (by default in SLF4J logger):
private fun Person.setAdult(){
this.isAdult = this.age >= 18
logger.info(this.toString())
}
We expect the output to be a collection of elements of people over the age of 15 and sorted by age:
private fun List<Person>.assertOver15AndSortedByAge() {
assertThat(this).containsExactly(
Person("Bob", 16, false),
Person("Alice", 30, true),
Person("Charlie", 40, true),
Person("Ahmad", 42, true)
)
}
We’ll use this extension function List
2.1. Using Coroutines
Coroutines can be relied on for parallel operations because they’re non-blocking, lightweight, flexible and allow us to run multiple tasks concurrently:
val filteredPeople = people
.map {
person - >
async (Dispatchers.IO) {
person.setAdult()
person
}
}.awaitAll()
.filter { it.age > 15 }
.sortedBy { it.age }
filteredPeople.assertOver15AndSortedByAge()
In people.map { person -> … }, we create a new coroutine for each person object using async { … }.
This allows coroutines to execute concurrently with other coroutines and the main thread.
Without Dispatchers.IO, async will run the coroutine in the same dispatcher as the parent coroutine, which is usually Dispatchers.Main or another sequential dispatcher.
We can look at the log output to see each operation running on a different coroutine thread:
15:02:25.931 [main @coroutine#1] INFO - Using Coroutines
15:02:25.944 [DefaultDispatcher-worker-1 @coroutine#2] INFO - Person(name=Martin, age=12, isAdult=false)
15:02:25.945 [DefaultDispatcher-worker-6 @coroutine#5] INFO - Person(name=Alice, age=30, isAdult=true)
15:02:25.945 [DefaultDispatcher-worker-7 @coroutine#6] INFO - Person(name=Bob, age=16, isAdult=false)
15:02:25.944 [DefaultDispatcher-worker-3 @coroutine#3] INFO - Person(name=Ahmad, age=42, isAdult=true)
15:02:25.944 [DefaultDispatcher-worker-4 @coroutine#7] INFO - Person(name=Charlie, age=40, isAdult=true)
15:02:25.944 [DefaultDispatcher-worker-2 @coroutine#4] INFO - Person(name=Alina, age=13, isAdult=false)
15:02:25.950 [main @coroutine#1] INFO - Total time taken: 17 ms
The awaitAll() ensures that all the asynchronous coroutines created in the map step are completed. This ensures that the filteredPeople list contains the results of all parallel processing.
2.2. Using Kotlin Flow
Coroutines Flow — often called Kotlin Flow or simply Flow — is an additional library built on top of Coroutines to handle streaming data asynchronously.
We can use flatMapMerge() to process elements in a Flow in parallel:
val filteredPeople = people.asFlow()
.flowOn(Dispatchers.IO)
.flatMapMerge {
person - >
flow {
person.setAdult()
emit(person)
}
}
.filter { it.age > 15 }.toList()
.sortedBy { it.age }
filteredPeople.assertOver15AndSortedByAge()
The code concurrently processes each person object in the people list using Flow:
15:03:06.538 [main @coroutine#1] INFO - Using Kotlin Flow
15:03:06.585 [main @coroutine#4] INFO - Person(name=Martin, age=12, isAdult=false)
15:03:06.587 [main @coroutine#5] INFO - Person(name=Ahmad, age=42, isAdult=true)
15:03:06.587 [main @coroutine#6] INFO - Person(name=Alina, age=13, isAdult=false)
15:03:06.587 [main @coroutine#7] INFO - Person(name=Alice, age=30, isAdult=true)
15:03:06.588 [main @coroutine#8] INFO - Person(name=Bob, age=16, isAdult=false)
15:03:06.588 [main @coroutine#9] INFO - Person(name=Charlie, age=40, isAdult=true)
15:03:06.591 [main @coroutine#1] INFO - Total time taken: 50 ms
But we must note that flatMapMerge() is an experimental feature in Kotlin Coroutines that isn’t yet stable or may change in future versions. So to be able to use it, we must add an annotation:
@OptIn(ExperimentalCoroutinesApi::class)
As usual, we can add annotations to classes or functions.
2.3. Using RxJava or RxKotlin
RxJava is a Java-based reactive programming library which is an implementation of reactive extensions. Meanwhile, RxKotlin is a Kotlin extension for RxJava:
val observable = Observable.fromIterable(people)
.flatMap(
{
Observable.just(it)
.subscribeOn(Schedulers.computation())
.doOnNext { person ->
person.setAdult()
}
},
people.size // Uses maxConcurrency for the number of elements
)
.filter { it.age > 15 }
.toList()
.map { it.sortedBy { person -> person.age } }
.blockingGet()
observable.assertOver15AndSortedByAge()
First, we’ll convert the original people list into an Observable object:
Observable.fromIterable(people)
However, RxKotlin provides a more concise extension function as an alternative:
people.toObservable()
The flatMap() applies a transformation to each person emitted by the Observable. In this case, it creates a new Observable emitting the same person object.
Then, for controlled parallel operations, it’s highly recommended to explicitly set the maxConcurrency parameter in the flatMap(). This allows us to define the maximum number of concurrent inner Observables, ensuring predictable resource utilization.
Let’s see each operation run in a different thread in the log output:
22:39:24.557 [main] INFO - Using RxKotlin
22:39:24.561 [RxComputationThreadPool-7] INFO - Person(name=Martin, age=12, isAdult=false)
22:39:24.561 [RxComputationThreadPool-2] INFO - Person(name=Alice, age=30, isAdult=true)
22:39:24.561 [RxComputationThreadPool-8] INFO - Person(name=Ahmad, age=42, isAdult=true)
22:39:24.561 [RxComputationThreadPool-3] INFO - Person(name=Bob, age=16, isAdult=false)
22:39:24.561 [RxComputationThreadPool-1] INFO - Person(name=Alina, age=13, isAdult=false)
22:39:24.561 [RxComputationThreadPool-4] INFO - Person(name=Charlie, age=40, isAdult=true)
22:39:26.069 [main] INFO - Total time taken: 1511 ms
We can see different thread names for each operation. This indicates that the operations are running in parallel.
2.4. Using Java Stream API
In Java 8, the Stream API introduced a powerful mechanism for processing collections of data in a declarative and functional manner.
We can use parallelStream() that is available for Collection types (like List, Set, etc.) that creates a parallel Stream from the elements of the Collection:
val filteredPeople = people.parallelStream()
.map { person ->
person.setAdult()
person
}.filter { it.age > 15 }
.sorted { p1, p2 -> p1.age.compareTo(p2.age) }
.collect(Collectors.toList())
filteredPeople.assertOver15AndSortedByAge()
When we call parallelStream(), the elements of the Collection are divided into several sub-Stream instances.
Each sub-Stream is then processed concurrently on a separate thread:
13:03:44.683 [main] INFO - Using Stream API
13:03:44.688 [main] INFO - Person(name=Alice, age=30, isAdult=true)
13:03:44.688 [ForkJoinPool.commonPool-worker-1] INFO - Person(name=Ahmad, age=42, isAdult=true)
13:03:44.688 [ForkJoinPool.commonPool-worker-2] INFO - Person(name=Charlie, age=40, isAdult=true)
13:03:44.688 [ForkJoinPool.commonPool-worker-4] INFO - Person(name=Bob, age=16, isAdult=false)
13:03:44.688 [main] INFO - Person(name=Alina, age=13, isAdult=false)
13:03:44.688 [ForkJoinPool.commonPool-worker-3] INFO - Person(name=Martin, age=12, isAdult=false)
13:03:44.689 [main] INFO - Total time taken: 5 ms
Finally, the results from each sub-Stream are combined to produce the terminal result of the Stream operations.
2.5. Using ExecutorService
Now we’ll use ExecutorService, an interface in Java that provides a way to execute tasks (Runnable or Callable) asynchronously.
First, we must create a pool of threads whose size is equal to the number of people elements:
val executor = Executors.newFixedThreadPool(people.size)
Then, we collect Future instances:
val futures: List<Future<Person>> = people
.map { person ->
executor.submit(Callable {
person.setAdult()
person
})
}
We map each person in the people list to a Future using executor.submit().
Each submitted task will execute the setAdult() method on the person object and return the person object itself.
Next, we’ll transform List<Future
val results = futures
.map { it.get() }
.filter { it.age > 15 }
.sortedBy { it.age }
Let’s take a look at the log:
16:30:52.792 [main] INFO - Using ExecutorService
16:30:52.821 [pool-1-thread-3] INFO - Person(name=Alina, age=13, isAdult=false)
16:30:52.821 [pool-1-thread-4] INFO - Person(name=Alice, age=30, isAdult=true)
16:30:52.821 [pool-1-thread-1] INFO - Person(name=Martin, age=12, isAdult=false)
16:30:52.821 [pool-1-thread-6] INFO - Person(name=Charlie, age=40, isAdult=true)
16:30:52.822 [pool-1-thread-2] INFO - Person(name=Ahmad, age=42, isAdult=true)
16:30:52.822 [pool-1-thread-5] INFO - Person(name=Bob, age=16, isAdult=false)
16:30:52.829 [main] INFO - Total time taken: 33 ms
Finally, let’s not forget to stop the ExecutorService to free up the threads:
executor.shutdown()
After stopping the thread pool, no new tasks can be submitted.
3. Conclusion
In this tutorial, we discussed various approaches to perform parallel operations on Kotlin collections.
Coroutines and Kotlin Flow with their expressive Kotlin style can do this well. If we want to use third-party libraries, RxJava or RxKotlin are mature and reliable alternatives too. Alternatively, Java also has APIs for handling this, such as Stream and ExecutorService.
As always, the source code for the examples is available over on GitHub.