1. 概述

在这篇文章中,我们将探讨如何测试使用 RxJava 编写的代码。

在RxJava中,典型的流程由一个Observable和一个Observer组成。Observable是一个数据源,它是一个元素序列。一个或多个观察者订阅它以接收发出的事件。

通常情况下,观察者和Observable在异步方式下在不同的线程中执行——这使得传统的测试方式变得困难。

幸运的是,RxJava提供了TestSubscriber类,让我们能够测试异步、事件驱动的流程。

2. 使用传统方法测试RxJava

我们先来看一个例子:我们有一个字母序列,想要与从1开始的整数序列进行zip操作。

我们的测试应该断言,监听zipObservable发出的事件的订阅者会接收到字母和整数的zip结果。

用传统方式编写这样的测试意味着我们需要维护一个结果列表,并从观察者更新这个列表。向整数列表添加元素意味着Observable和观察者需要在同一线程中工作——它们不能异步工作。

因此,我们将错过RxJava的一大优势:事件在单独线程中的处理。

以下是这种受限版本测试的大致样子:

List<String> letters = Arrays.asList("A", "B", "C", "D", "E");
List<String> results = new ArrayList<>();
Observable<String> observable = Observable
  .from(letters)
  .zipWith(
     Observable.range(1, Integer.MAX_VALUE), 
     (string, index) -> index + "-" + string);

observable.subscribe(results::add);

assertThat(results, notNullValue());
assertThat(results, hasSize(5));
assertThat(results, hasItems("1-A", "2-B", "3-C", "4-D", "5-E"));

我们通过将元素添加到results列表中来聚合观察者的结果。观察者和Observable在同一线程中工作,所以我们的断言块正确地等待subscribe()方法完成。

3. 使用TestSubscriber测试RxJava

RxJava自带了一个TestSubscriber类,让我们能够编写与异步事件处理相关的测试。这是一个正常的观察者,它订阅Observable

在测试中,我们可以检查TestSubscriber的状态并对其状态进行断言:

List<String> letters = Arrays.asList("A", "B", "C", "D", "E");
TestSubscriber<String> subscriber = new TestSubscriber<>();

Observable<String> observable = Observable
  .from(letters)
  .zipWith(
    Observable.range(1, Integer.MAX_VALUE), 
    ((string, index) -> index + "-" + string));

observable.subscribe(subscriber);

subscriber.assertCompleted();
subscriber.assertNoErrors();
subscriber.assertValueCount(5);
assertThat(
  subscriber.getOnNextEvents(),
  hasItems("1-A", "2-B", "3-C", "4-D", "5-E"));

我们将TestSubscriber实例传递给Observablesubscribe()方法。然后我们可以检查这个订阅者的状态。

TestSubscriber有一些非常有用的断言方法,我们将使用它们来验证我们的期望。订阅者应该接收观察者发出的5个元素,我们通过调用assertValueCount()方法来确认这一点。

我们可以调用getOnNextEvents()方法来检查订阅者接收到的所有事件。

调用assertCompleted()方法检查观察者订阅的流是否已完成。assertNoErrors()方法断言订阅流时没有错误。

4. 测试预期异常

有时,在Observable发出事件或观察者处理事件的过程中,会发生错误。TestSubscriber有一个特殊的方法来检查错误状态,即带异常类型参数的assertError()方法:

List<String> letters = Arrays.asList("A", "B", "C", "D", "E");
TestSubscriber<String> subscriber = new TestSubscriber<>();

Observable<String> observable = Observable
  .from(letters)
  .zipWith(Observable.range(1, Integer.MAX_VALUE), ((string, index) -> index + "-" + string))
  .concatWith(Observable.error(new RuntimeException("error in Observable")));

observable.subscribe(subscriber);

subscriber.assertError(RuntimeException.class);
subscriber.assertNotCompleted();

我们使用concatWith()方法将两个Observable合并在一起创建Observable。第二个Observable在发出下一个事件时抛出一个RuntimeException。我们可以通过调用assertError()方法检查TestSubscriber上的异常类型。

当观察者接收到错误时,它停止处理,最终处于未完成状态。可以使用assertNotCompleted()方法检查这个状态。

**5. 测试基于时间的`Observable```

假设我们有一个每秒发出一次事件的Observable,并且想使用TestSubscriber测试这种行为。

我们可以使用Observable.interval()方法定义一个基于时间的Observable,并传入一个TimeUnit作为参数:

List<String> letters = Arrays.asList("A", "B", "C", "D", "E");
TestScheduler scheduler = new TestScheduler();
TestSubscriber<String> subscriber = new TestSubscriber<>();
Observable<Long> tick = Observable.interval(1, TimeUnit.SECONDS, scheduler);

Observable<String> observable = Observable.from(letters)
  .zipWith(tick, (string, index) -> index + "-" + string);

observable.subscribeOn(scheduler)
  .subscribe(subscriber);

tick``Observable每秒将发出一个新的值。

在测试开始时,我们在时间零点,所以TestSubscriber不会完成:

subscriber.assertNoValues();
subscriber.assertNotCompleted();

为了在测试中模拟时间流逝,我们需要使用[TestScheduler](http://reactivex.io/RxJava/javadoc/rx/schedulers/TestScheduler.html)类。我们可以使用advanceTimeBy()方法在TestScheduler上模拟一秒的流逝:

scheduler.advanceTimeBy(1, TimeUnit.SECONDS);

advanceTimeBy()方法会让Observable产生一个事件。我们可以调用assertValueCount()方法来确认一个事件被产生。

我们的letters列表中有5个元素,因此当我们想让Observable发出所有事件时,需要经过6秒的处理。为了模拟这6秒,我们可以使用advanceTimeTo()方法:

scheduler.advanceTimeTo(6, TimeUnit.SECONDS);
 
subscriber.assertCompleted();
subscriber.assertNoErrors();
subscriber.assertValueCount(5);
assertThat(subscriber.getOnNextEvents(), hasItems("0-A", "1-B", "2-C", "3-D", "4-E"));

模拟了流逝的时间后,我们可以对TestSubscriber执行断言。我们可以调用assertValueCount()方法来断言所有事件都被产生了。

6. 总结

在这篇文章中,我们探讨了如何在RxJava中测试观察者和Observable。我们研究了测试发出的事件、错误和基于时间的Observable的方式。

所有这些示例和代码片段的实现可以在GitHub项目中找到——这是一个Maven项目,导入并运行起来应该很容易。


« 上一篇: Java实现遗传算法
» 下一篇: Java 8中Infinite Stream