1. Overview
In this tutorial, we’re going to review the use of Reactive Extensions (Rx) in idiomatic Kotlin using the RxKotlin library.
RxKotlin is not an implementation of Reactive Extensions, per se. Instead, it’s mostly a collection of extension methods. That is, RxKotlin augments the RxJava library with an API designed with Kotlin in mind.
Therefore, we’ll use concepts from our article, Introduction to RxJava, as well as the concept of Flowables we’ve presented in a dedicated article.
2. RxKotlin Setup
To use RxKotlin in our Maven project, we’ll need to add the rxkotlin dependency to our pom.xml:
<dependency>
<groupId>io.reactivex.rxjava2</groupId>
<artifactId>rxkotlin</artifactId>
<version>2.3.0</version>
</dependency>
Or, for a Gradle project, to our build.gradle:
implementation 'io.reactivex.rxjava2:rxkotlin:2.3.0'
Here, we’re using RxKotlin 2.x, which targets RxJava 2. Projects using RxJava 1 should use RxKotlin 1.x. The same concepts apply to both versions.
Note that RxKotlin depends on RxJava, but they don’t update the dependency frequently to the latest release. So, we recommend to explicitly include the specific RxJava version we’re going to depend on, as detailed in our RxJava article.
*3. Creating Observables in RxKotlin*
RxKotlin includes a number of extension methods to create Observable and Flowable objects from collections.
In particular, every type of array has a toObservable() method and a toFlowable() method:
val observable = listOf(1, 1, 2, 3).toObservable()
observable.test().assertValues(1, 1, 2, 3)
val flowable = listOf(1, 1, 2, 3).toFlowable()
flowable.buffer(2).test().assertValues(listOf(1, 1), listOf(2, 3))
*3.1. Completables*
RxKotlin also provides some methods to create Completable instances. In particular, *we can convert Actions, Callables, Futures, and zero-arity functions to Completable with the extension method toCompletable:*
var value = 0
val completable = { value = 3 }.toCompletable()
assertFalse(completable.test().isCancelled())
assertEquals(3, value)
4. Observable and Flowable to Map and Multimap
When we have an Observable or Flowable that produces Pair instances, we can transform them into a Single observable that produces a Map:
val list = listOf(Pair("a", 1), Pair("b", 2), Pair("c", 3), Pair("a", 4))
val observable = list.toObservable()
val map = observable.toMap()
assertEquals(mapOf(Pair("a", 4), Pair("b", 2), Pair("c", 3)), map.blockingGet())
As we can see in the previous example, toMap overwrites values emitted earlier with later values if they have the same key.
If we want to accumulate all the values associated with a key into a collection, we use toMultimap instead:
val list = listOf(Pair("a", 1), Pair("b", 2), Pair("c", 3), Pair("a", 4))
val observable = list.toObservable()
val map = observable.toMultimap()
assertEquals(
mapOf(Pair("a", listOf(1, 4)), Pair("b", listOf(2)), Pair("c", listOf(3))),
map.blockingGet())
*5. Combining Observables and Flowables*
One of the selling points of Rx is the possibility to combine Observables and Flowables in various ways. Indeed, RxJava provides a number of operators out of the box.
In addition to that, RxKotlin includes a few more extension methods for combining Observables and the like.
5.1. Combining Observable Emissions
When we have an Observable that emits other Observables, we can use one of the extension methods in RxKotlin to combine together the emitted values.
In particular, mergeAll* combines the observables with *flatMap:
val subject = PublishSubject.create<Observable<String>>()
val observable = subject.mergeAll()
Which would be the same as:
val observable = subject.flatMap { it }
The resulting Observable will emit all the values of the source Observables in an unspecified order.
Similarly, concatAll uses concatMap (the values are emitted in the same order as the sources), while switchLatest uses switchMap (values are emitted from the last emitted Observable).
As we’ve seen so far, all the above methods are provided for Flowable sources as well, with the same semantics.
*5.2. Combining Completables,* Maybes, and Singles**
When we have an Observable that emits instances of Completable, Maybe, or Single, we can combine those with the appropriate mergeAllXs method like, for example, mergeAllMaybes:
val subject = PublishSubject.create<Maybe<Int>>()
val observable = subject.mergeAllMaybes()
subject.onNext(Maybe.just(1))
subject.onNext(Maybe.just(2))
subject.onNext(Maybe.empty())
subject.onNext(Maybe.error(Exception("error")))
subject.onNext(Maybe.just(3))
observable.test().assertValues(1, 2).assertError(Exception::class.java)
*5.3. Combining Iterables of Observables*
For collections of Observable or Flowable instances instead, RxKotlin has a couple of other operators, merge and mergeDelayError. They both have the effect of *combining all the Observables or Flowables into one that will emit all the values in sequence:*
val observables = mutableListOf(Observable.just("first", "second"))
val observable = observables.merge()
observables.add(Observable.just("third", "fourth"))
observable.test().assertValues("first", "second", "third", "fourth")
The difference between the two operators — which are directly derived from the same-named operators in RxJava — is their treatment of errors.
The merge method emits errors as soon as they’re emitted by the source:
// ...
observables.add(Observable.error(Exception("e")))
observables.add(Observable.just("fifth"))
// ...
observable.test().assertValues("first", "second", "third", "fourth")
Whereas mergeDelayError emits them at the end of the stream:
// ...
observables.add(Observable.error(Exception("e")))
observables.add(Observable.just("fifth"))
// ...
observable.test().assertValues("first", "second", "third", "fourth", "fifth")
6. Handling Values of Different Types
Let’s now look at the extension methods in RxKotlin for dealing with values of different types.
These are variants of RxJava methods, that make use of Kotlin’s reified generics. In particular, we can:
- cast emitted values from one type to another, or
- filter out values that are not of a certain type
So, we could, for example, cast an Observable of Numbers to one of Ints:
val observable = Observable.just<Number>(1, 1, 2, 3)
observable.cast<Int>().test().assertValues(1, 1, 2, 3)
Here, the cast is unnecessary. However, when combining different observables together, we might need it.
With ofType, instead, we can filter out values that aren’t of the type we expect:
val observable = Observable.just(1, "and", 2, "and")
observable.ofType<Int>().test().assertValues(1, 2)
As always, cast and ofType are applicable to both Observables and Flowables.
Furthermore, Maybe supports these methods as well. The Single class, instead, only supports cast.
7. Other Helper Methods
Finally, RxKotlin includes several helper methods. Let’s have a quick look.
We can use subscribeBy instead of subscribe – it allows named parameters:
Observable.just(1).subscribeBy(onNext = { println(it) })
Similarly, for blocking subscriptions we can use blockingSubscribeBy.
Additionally, RxKotlin includes some methods that mimic those in RxJava but work around a limitation of Kotlin’s type inference.
For example, when using Observable#zip, specifying the zipper doesn’t look so great:
Observable.zip(Observable.just(1), Observable.just(2), BiFunction<Int, Int, Int> { a, b -> a + b })
So, RxKotlin adds Observables#zip for more idiomatic usage:
Observables.zip(Observable.just(1), Observable.just(2)) { a, b -> a + b }
Notice the final “s” in Observables. Similarly, we have Flowables, Singles, and Maybes.
8. Conclusions
In this article, we’ve thoroughly reviewed the RxKotlin library, which augments RxJava to make its API look more like idiomatic Kotlin.
For further information, please refer to the RxKotlin GitHub page. For more examples, we recommend RxKotlin tests.
The implementation of all these examples and code snippets can be found in the GitHub project as a Maven and Gradle project, so it should be easy to import and run as is.