1. Overview
In this article, we’re going to focus on different types of Schedulers that we’re going to use in writing multithreading programs based on RxJava Observable’s subscribeOn and observeOn methods.
Schedulers give the opportunity to specify where and likely when to execute tasks related to the operation of an Observable chain.
We can obtain a Scheduler from the factory methods described in the class Schedulers.
2. Default Threading Behavior
By default, Rx is single-threaded which implies that an Observable and the chain of operators that we can apply to it will notify its observers on the same thread on which its subscribe() method is called.
The observeOn and subscribeOn methods take as an argument a Scheduler, that, as the name suggests, is a tool that we can use for scheduling individual actions.
We’ll create our implementation of a Scheduler by using the create**Worker method, which returns a Scheduler.Worker. A worker accepts actions and executes them sequentially on a single thread.
In a way, a worker is a Scheduler itself, but we’ll not refer to it as a Scheduler to avoid confusion.
2.1. Scheduling an Action
We can schedule a job on any Scheduler by creating a new worker and scheduling some actions:
Scheduler scheduler = Schedulers.immediate();
Scheduler.Worker worker = scheduler.createWorker();
worker.schedule(() -> result += "action");
Assert.assertTrue(result.equals("action"));
The action is then queued on the thread that the worker is assigned to.
2.2. Canceling an Action
Scheduler.Worker extends Subscription. Calling the unsubscribe method on a worker will result in the queue being emptied and all pending tasks being canceled. We can see that by example:
Scheduler scheduler = Schedulers.newThread();
Scheduler.Worker worker = scheduler.createWorker();
worker.schedule(() -> {
result += "First_Action";
worker.unsubscribe();
});
worker.schedule(() -> result += "Second_Action");
Assert.assertTrue(result.equals("First_Action"));
The second task is never executed because the one before it canceled the whole operation. Actions that were in the process of being executed will be interrupted.
3. Schedulers.newThread
This scheduler simply starts a new thread every time it is requested via subscribeOn() or observeOn().
It’s hardly ever a good choice, not only because of the latency involved when starting a thread but also because this thread is not reused:
Observable.just("Hello")
.observeOn(Schedulers.newThread())
.doOnNext(s ->
result2 += Thread.currentThread().getName()
)
.observeOn(Schedulers.newThread())
.subscribe(s ->
result1 += Thread.currentThread().getName()
);
Thread.sleep(500);
Assert.assertTrue(result1.equals("RxNewThreadScheduler-1"));
Assert.assertTrue(result2.equals("RxNewThreadScheduler-2"));
When the Worker is done, the thread simply terminates. This Scheduler can be used only when tasks are coarse-grained: it takes a lot of time to complete, but there are very few of them so that threads are unlikely to be reused at all.
Scheduler scheduler = Schedulers.newThread();
Scheduler.Worker worker = scheduler.createWorker();
worker.schedule(() -> {
result += Thread.currentThread().getName() + "_Start";
worker.schedule(() -> result += "_worker_");
result += "_End";
});
Thread.sleep(3000);
Assert.assertTrue(result.equals(
"RxNewThreadScheduler-1_Start_End_worker_"));
When we scheduled the worker on a NewThreadScheduler, we saw that the worker was bound to a particular thread.
4. Schedulers.immediate
Schedulers.immediate is a special scheduler that invokes a task within the client thread in a blocking way, rather than asynchronously and returns when the action is completed:
Scheduler scheduler = Schedulers.immediate();
Scheduler.Worker worker = scheduler.createWorker();
worker.schedule(() -> {
result += Thread.currentThread().getName() + "_Start";
worker.schedule(() -> result += "_worker_");
result += "_End";
});
Thread.sleep(500);
Assert.assertTrue(result.equals(
"main_Start_worker__End"));
In fact, subscribing to an Observable via immediate Scheduler typically has the same effect as not subscribing with any particular Scheduler at all:
Observable.just("Hello")
.subscribeOn(Schedulers.immediate())
.subscribe(s ->
result += Thread.currentThread().getName()
);
Thread.sleep(500);
Assert.assertTrue(result.equals("main"));
5. Schedulers.trampoline
The trampoline Scheduler is very similar to immediate because it also schedules tasks in the same thread, effectively blocking.
However, the upcoming task is executed when all previously scheduled tasks complete:
Observable.just(2, 4, 6, 8)
.subscribeOn(Schedulers.trampoline())
.subscribe(i -> result += "" + i);
Observable.just(1, 3, 5, 7, 9)
.subscribeOn(Schedulers.trampoline())
.subscribe(i -> result += "" + i);
Thread.sleep(500);
Assert.assertTrue(result.equals("246813579"));
Immediate invokes a given task right away, whereas trampoline waits for the current task to finish.
The trampoline‘s worker executes every task on the thread that scheduled the first task. The first call to schedule is blocking until the queue is emptied:
Scheduler scheduler = Schedulers.trampoline();
Scheduler.Worker worker = scheduler.createWorker();
worker.schedule(() -> {
result += Thread.currentThread().getName() + "Start";
worker.schedule(() -> {
result += "_middleStart";
worker.schedule(() ->
result += "_worker_"
);
result += "_middleEnd";
});
result += "_mainEnd";
});
Thread.sleep(500);
Assert.assertTrue(result
.equals("mainStart_mainEnd_middleStart_middleEnd_worker_"));
6. Schedulers.from
Schedulers are internally more complex than Executors from java.util.concurrent – so a separate abstraction was needed.
But because they are conceptually quite similar, unsurprisingly there is a wrapper that can turn Executor into Scheduler using the from factory method:
private ThreadFactory threadFactory(String pattern) {
return new ThreadFactoryBuilder()
.setNameFormat(pattern)
.build();
}
@Test
public void givenExecutors_whenSchedulerFrom_thenReturnElements()
throws InterruptedException {
ExecutorService poolA = newFixedThreadPool(
10, threadFactory("Sched-A-%d"));
Scheduler schedulerA = Schedulers.from(poolA);
ExecutorService poolB = newFixedThreadPool(
10, threadFactory("Sched-B-%d"));
Scheduler schedulerB = Schedulers.from(poolB);
Observable<String> observable = Observable.create(subscriber -> {
subscriber.onNext("Alfa");
subscriber.onNext("Beta");
subscriber.onCompleted();
});;
observable
.subscribeOn(schedulerA)
.subscribeOn(schedulerB)
.subscribe(
x -> result += Thread.currentThread().getName() + x + "_",
Throwable::printStackTrace,
() -> result += "_Completed"
);
Thread.sleep(2000);
Assert.assertTrue(result.equals(
"Sched-A-0Alfa_Sched-A-0Beta__Completed"));
}
SchedulerB is used for a short period of time, but it barely schedules a new action on schedulerA, which does all the work. Thus, multiple subscribeOn methods aren’t only ignored, but also introduce a small overhead.
7. Schedulers.io
This Scheduler is similar to the newThread except for the fact that already started threads are recycled and can possibly handle future requests.
This implementation works similarly to ThreadPoolExecutor from java.util.concurrent with an unbounded pool of threads. Every time a new worker is requested, either a new thread is started (and later kept idle for some time) or the idle one is reused:
Observable.just("io")
.subscribeOn(Schedulers.io())
.subscribe(i -> result += Thread.currentThread().getName());
Assert.assertTrue(result.equals("RxIoScheduler-2"));
We need to be careful with unbounded resources of any kind – in case of slow or unresponsive external dependencies like web services, io scheduler might start an enormous number of threads, leading to our very own application becoming unresponsive.
In practice, following Schedulers.io is almost always a better choice.
8. Schedulers.computation
Computation Scheduler by default limits the number of threads running in parallel to the value of availableProcessors(), as found in the Runtime.getRuntime() utility class.
So we should use a computation scheduler when tasks are entirely CPU-bound; that is, they require computational power and have no blocking code.
It uses an unbounded queue in front of every thread, so if the task is scheduled, but all cores are occupied, it will be queued. However, the queue just before each thread will keep growing:
Observable.just("computation")
.subscribeOn(Schedulers.computation())
.subscribe(i -> result += Thread.currentThread().getName());
Assert.assertTrue(result.equals("RxComputationScheduler-1"));
If for some reason, we need a different number of threads than the default, we can always use the rx.scheduler.max-computation-threads system property.
By taking fewer threads we can ensure that there is always one or more CPU cores idle, and even under heavy load, computation thread pool does not saturate the server. It’s simply not possible to have more computation threads than cores.
9. Schedulers.test
This Scheduler is used only for testing purposes, and we’ll never see it in production code. Its main advantage is the ability to advance the clock, simulating time passing by arbitrarily:
List<String> letters = Arrays.asList("A", "B", "C");
TestScheduler scheduler = Schedulers.test();
TestSubscriber<String> subscriber = new TestSubscriber<>();
Observable<Long> tick = Observable
.interval(1, TimeUnit.SECONDS, scheduler);
Observable.from(letters)
.zipWith(tick, (string, index) -> index + "-" + string)
.subscribeOn(scheduler)
.subscribe(subscriber);
subscriber.assertNoValues();
subscriber.assertNotCompleted();
scheduler.advanceTimeBy(1, TimeUnit.SECONDS);
subscriber.assertNoErrors();
subscriber.assertValueCount(1);
subscriber.assertValues("0-A");
scheduler.advanceTimeTo(3, TimeUnit.SECONDS);
subscriber.assertCompleted();
subscriber.assertNoErrors();
subscriber.assertValueCount(3);
assertThat(
subscriber.getOnNextEvents(),
hasItems("0-A", "1-B", "2-C"));
10. Default Schedulers
Some Observable operators in RxJava have alternate forms that allow us to set which Scheduler the operator will use for its operation. Others don’t operate on any particular Scheduler or operate on a particular default Scheduler.
For example, the delay operator takes upstream events and pushes them downstream after a given time. Obviously, it cannot hold the original thread during that period, so it must use a different Scheduler:
ExecutorService poolA = newFixedThreadPool(
10, threadFactory("Sched1-"));
Scheduler schedulerA = Schedulers.from(poolA);
Observable.just('A', 'B')
.delay(1, TimeUnit.SECONDS, schedulerA)
.subscribe(i -> result+= Thread.currentThread().getName() + i + " ");
Thread.sleep(2000);
Assert.assertTrue(result.equals("Sched1-A Sched1-B "));
Without supplying a custom schedulerA, all operators below delay would use the computation Scheduler.
Other important operators that support custom Schedulers are buffer, interval, range, timer, skip, take, timeout, and several others. If we don’t provide a Scheduler to such operators, computation scheduler is utilized, which is a safe default in most cases.
11. Conclusion
In truly reactive applications, for which all long-running operations are asynchronous, very few threads and thus Schedulers are needed.
Mastering schedulers are essential to writing scalable and safe code using RxJava. The difference between subscribeOn and observeOn is especially important under high load where every task must be executed precisely when we expect.
Last but not least, we must be sure that Schedulers used downstream can keep up with the load generated by Schedulers upstream. For more information, there is this article about backpressure.
The implementation of all these examples and code snippets can be found in the GitHub project – this is a Maven project, so it should be easy to import and run as it is.