1. Overview
In this article, we’ll discover some utility operators for working with Observables in RxJava and how to implement custom ones.
An operator is a function that takes and alters the behavior of an upstream Observable
Operators wrap existing Observables and enhance them typically by intercepting subscription. This might sound complicated, but it is actually quite flexible and not that difficult to grasp.
2. Do
There are multiple actions that could alter the Observable lifecycle events.
The doOnNext* operator modifies the Observable source so that it invokes an action when the onNext is called.***
The doOnCompleted operator registers an action which is called if the resulting Observable terminates normally, calling Observer‘s onCompleted method*:*
Observable.range(1, 10)
.doOnNext(r -> receivedTotal += r)
.doOnCompleted(() -> result = "Completed")
.subscribe();
assertTrue(receivedTotal == 55);
assertTrue(result.equals("Completed"));
The doOnEach operator modifies the Observable source so that it notifies an Observer for each item and establishes a callback that will be called each time an item is emitted.
The doOnSubscribe operator registers an action which is called whenever an Observer subscribes to the resulting Observable.
There’s also the doOnUnsubscribe operator which does the opposite of doOnSubscribe:
Observable.range(1, 10)
.doOnEach(new Observer<Integer>() {
@Override
public void onCompleted() {
System.out.println("Complete");
}
@Override
public void onError(Throwable e) {
e.printStackTrace();
}
@Override
public void onNext(Integer value) {
receivedTotal += value;
}
})
.doOnSubscribe(() -> result = "Subscribed")
.subscribe();
assertTrue(receivedTotal == 55);
assertTrue(result.equals("Subscribed"));
When an Observable completes with an error, we can use the doOnError operator to perform an action.
DoOnTerminate operator registers an action that will be invoked when an Observable completes, either successfully or with an error:
thrown.expect(OnErrorNotImplementedException.class);
Observable.empty()
.single()
.doOnError(throwable -> { throw new RuntimeException("error");})
.doOnTerminate(() -> result += "doOnTerminate")
.doAfterTerminate(() -> result += "_doAfterTerminate")
.subscribe();
assertTrue(result.equals("doOnTerminate_doAfterTerminate"));
There’s also a FinallyDo operator – which was deprecated in favor of doAfterTerminate. It registers an action when an Observable completes.
3. ObserveOn vs SubscribeOn
By default, an Observable along with the operator chain will operate on the same thread on which its Subscribe method is called.
The ObserveOn operator specifies a different Scheduler that the Observable will use for sending notifications to Observers:
Observable.range(1, 5)
.map(i -> i * 100)
.doOnNext(i -> {
emittedTotal += i;
System.out.println("Emitting " + i
+ " on thread " + Thread.currentThread().getName());
})
.observeOn(Schedulers.computation())
.map(i -> i * 10)
.subscribe(i -> {
receivedTotal += i;
System.out.println("Received " + i + " on thread "
+ Thread.currentThread().getName());
});
Thread.sleep(2000);
assertTrue(emittedTotal == 1500);
assertTrue(receivedTotal == 15000);
We see that elements were produced in the main thread and were pushed all the way to the first map call.
But after that, the observeOn redirected the processing to a computation thread, which was used when processing map and the final Subscriber.
One problem that may arise with observeOn is the bottom stream can produce emissions faster than the top stream can process them. This can cause issues with backpressure that we may have to consider.
To specify on which Scheduler the Observable should operate, we can use the subscribeOn operator:
Observable.range(1, 5)
.map(i -> i * 100)
.doOnNext(i -> {
emittedTotal += i;
System.out.println("Emitting " + i
+ " on thread " + Thread.currentThread().getName());
})
.subscribeOn(Schedulers.computation())
.map(i -> i * 10)
.subscribe(i -> {
receivedTotal += i;
System.out.println("Received " + i + " on thread "
+ Thread.currentThread().getName());
});
Thread.sleep(2000);
assertTrue(emittedTotal == 1500);
assertTrue(receivedTotal == 15000);
SubscribeOn instructs the source Observable which thread to use for emitting items – only this thread will push items to the Subscriber. It can be placed in any place in the stream because it affects the subscription only.
Effectively, we can only use one subscribeOn, but we can have any number of observeOn operators. We can switch emissions from one thread to another with ease by using observeOn.
4. Single and SingleOrDefault
The operator Single returns an Observable that emits the single item emitted by the source Observable:
Observable.range(1, 1)
.single()
.subscribe(i -> receivedTotal += i);
assertTrue(receivedTotal == 1);
If the source Observable produces zero or more than one element, an exception will be thrown:
Observable.empty()
.single()
.onErrorReturn(e -> receivedTotal += 10)
.subscribe();
assertTrue(receivedTotal == 10);
On the other hand, the operator SingleOrDefault is very similar to Single, meaning that it also returns an Observable that emits the single item from the source, but additionally, we can specify a default value:
Observable.empty()
.singleOrDefault("Default")
.subscribe(i -> result +=i);
assertTrue(result.equals("Default"));
But if the Observable source emits more than one item, it still throws an IllegalArgumentExeption:
Observable.range(1, 3)
.singleOrDefault(5)
.onErrorReturn(e -> receivedTotal += 10)
.subscribe();
assertTrue(receivedTotal == 10);
S simple conclusion:
- If it is expected that the source Observable may have none or one element, then SingleOrDefault should be used
- If we’re dealing with potentially more than one item emitted in our Observable and we only want to emit either the first or the last value, we can use other operators like first or last
5. Timestamp
The Timestamp operator attaches a timestamp to each item emitted by the source Observable before reemitting that item in its own sequence. The timestamp indicates at what time the item was emitted:
Observable.range(1, 10)
.timestamp()
.map(o -> result = o.getClass().toString() )
.last()
.subscribe();
assertTrue(result.equals("class rx.schedulers.Timestamped"));
6. Delay
This operator modifies its source Observable by pausing for a particular increment of time before emitting each of the source Observable’s items.
It offsets the entire sequence using the provided value:
Observable source = Observable.interval(1, TimeUnit.SECONDS)
.take(5)
.timestamp();
Observable delayedObservable
= source.delay(2, TimeUnit.SECONDS);
source.subscribe(
value -> System.out.println("source :" + value),
t -> System.out.println("source error"),
() -> System.out.println("source completed"));
delayedObservable.subscribe(
value -> System.out.println("delay : " + value),
t -> System.out.println("delay error"),
() -> System.out.println("delay completed"));
Thread.sleep(8000);
There is an alternative operator, with which we can delay the subscription to the source Observable called delaySubscription.
The Delay operator runs on the computation Scheduler by default, but we can choose a different Scheduler by passing it in as an optional third parameter to delaySubscription.
7. Repeat
Repeat simply intercepts completion notification from upstream and rather than passing it downstream it resubscribes.
Therefore, it is not guaranteed that repeat will keep cycling through the same sequence of events, but it happens to be the case when upstream is a fixed stream:
Observable.range(1, 3)
.repeat(3)
.subscribe(i -> receivedTotal += i);
assertTrue(receivedTotal == 18);
8. Cache
The cache operator stands between the subscribe and our custom Observable.
When the first subscriber appears, cache delegates subscription to the underlying Observable and forwards all notifications (events, completions, or errors) downstream.
However, at the same time, it keeps a copy of all notifications internally. When a subsequent subscriber wants to receive pushed notifications, cache no longer delegates to the underlying Observable but instead feeds cached values:
Observable<Integer> source =
Observable.<Integer>create(subscriber -> {
System.out.println("Create");
subscriber.onNext(receivedTotal += 5);
subscriber.onCompleted();
}).cache();
source.subscribe(i -> {
System.out.println("element 1");
receivedTotal += 1;
});
source.subscribe(i -> {
System.out.println("element 2");
receivedTotal += 2;
});
assertTrue(receivedTotal == 8);
9. Using
When an observer subscribes to the Observable returned from the using(), it’ll use the Observable factory function to create the Observable the observer will… observe, while at the same time using the resource factory function to create whichever resource we’ve designed it to make.
When the observer unsubscribes from the Observable, or when the Observable terminates, using will call the third function to dispose of the created resource:
Observable<Character> values = Observable.using(
() -> "resource",
r -> {
return Observable.create(o -> {
for (Character c : r.toCharArray()) {
o.onNext(c);
}
o.onCompleted();
});
},
r -> System.out.println("Disposed: " + r)
);
values.subscribe(
v -> result += v,
e -> result += e
);
assertTrue(result.equals("resource"));
10. Conclusion
In this article, we talked how to use RxJava utility operators and also how to explore their most important features.
The true power of RxJava lies in its operators. Declarative transformations of streams of data are safe yet expressive and flexible.
With a strong foundation in functional programming, operators play deciding role in RxJava adoption. Mastering built-in operators is a key to success in this library.
The full source code for the project including all the code samples used here can be found over on GitHub.