1. Overview

In this quick tutorial, we’ll show how to write a custom operator using RxJava.

We’ll discuss how to build this simple operator, as well as a transformer – both as a class or as a simple function.

2. Maven Configuration

First, we need to make sure we have the rxjava dependency in pom.xml:

<dependency>
    <groupId>io.reactivex</groupId>
    <artifactId>rxjava</artifactId>
    <version>1.3.0</version>
</dependency>

We can check the latest version of rxjava on Maven Central.

3. Custom Operator

We can create our custom operator by implementing Operator interface, in the following example we implemented a simple operator for removing non-alphanumeric characters from a String:

public class ToCleanString implements Operator<String, String> {

    public static ToCleanString toCleanString() {
        return new ToCleanString();
    }

    private ToCleanString() {
        super();
    }

    @Override
    public Subscriber<? super String> call(final Subscriber<? super String> subscriber) {
        return new Subscriber<String>(subscriber) {
            @Override
            public void onCompleted() {
                if (!subscriber.isUnsubscribed()) {
                    subscriber.onCompleted();
                }
            }

            @Override
            public void onError(Throwable t) {
                if (!subscriber.isUnsubscribed()) {
                    subscriber.onError(t);
                }
            }

            @Override
            public void onNext(String item) {
                if (!subscriber.isUnsubscribed()) {
                    final String result = item.replaceAll("[^A-Za-z0-9]", "");
                    subscriber.onNext(result);
                }
            }
        };
    }
}

In the above example, we need to check if the subscriber is subscribed before applying our operation and emitting the item to it as it will be unnecessary.

We are also restricting instance creation only to static factory methods to achieve a more user-friendly readability when chaining methods and using the static import.

And now, we can use lift operator to chain our custom operator easily with other operators:

observable.lift(toCleanString())....

Here is a simple test of our custom operator:

@Test
public void whenUseCleanStringOperator_thenSuccess() {
    List<String> list = Arrays.asList("john_1", "tom-3");
    List<String> results = new ArrayList<>();
    Observable<String> observable = Observable
      .from(list)
      .lift(toCleanString());
    observable.subscribe(results::add);

    assertThat(results, notNullValue());
    assertThat(results, hasSize(2));
    assertThat(results, hasItems("john1", "tom3"));
}

4. Transformer

We can also create our operator by implementing Transformer interface:

public class ToLength implements Transformer<String, Integer> {

    public static ToLength toLength() {
        return new ToLength();
    }

    private ToLength() {
        super();
    }

    @Override
    public Observable<Integer> call(Observable<String> source) {
        return source.map(String::length);
    }
}

Note that we use the transformer toLength to transform our observable from String to its length in Integer.

We will need a compose operator to use our transformer:

observable.compose(toLength())...

Here is a simple test:

@Test
public void whenUseToLengthOperator_thenSuccess() {
    List<String> list = Arrays.asList("john", "tom");
    List<Integer> results = new ArrayList<>();
    Observable<Integer> observable = Observable
      .from(list)
      .compose(toLength());
    observable.subscribe(results::add);

    assertThat(results, notNullValue());
    assertThat(results, hasSize(2));
    assertThat(results, hasItems(4, 3));
}

The lift(Operator) operates on observable’s subscribers but compose(Transformer) work on the observable itself.

When we create our custom operator, we should pick Transformer if we want to operate on the observable as a whole and choose Operator if we want to operate on the items emitted by the observable

5. Custom Operator as a Function

We can implement our custom operator as a function instead of public class :

Operator<String, String> cleanStringFn = subscriber -> {
    return new Subscriber<String>(subscriber) {
        @Override
        public void onCompleted() {
            if (!subscriber.isUnsubscribed()) {
                subscriber.onCompleted();
            }
        }

        @Override
        public void onError(Throwable t) {
            if (!subscriber.isUnsubscribed()) {
                subscriber.onError(t);
            }
        }

        @Override
        public void onNext(String str) {
            if (!subscriber.isUnsubscribed()) {
                String result = str.replaceAll("[^A-Za-z0-9]", "");
                subscriber.onNext(result);
            }
        }
    };
};

And here’s the simple test:

List<String> results = new ArrayList<>();
Observable.from(Arrays.asList("ap_p-l@e", "or-an?ge"))
  .lift(cleanStringFn)
  .subscribe(results::add);

assertThat(results, notNullValue());
assertThat(results, hasSize(2));
assertThat(results, hasItems("apple", "orange"));

Similarly for Transformer example:

@Test
public void whenUseFunctionTransformer_thenSuccess() {
    Transformer<String, Integer> toLengthFn = s -> s.map(String::length);

    List<Integer> results = new ArrayList<>();
    Observable.from(Arrays.asList("apple", "orange"))
      .compose(toLengthFn)
      .subscribe(results::add);

    assertThat(results, notNullValue());
    assertThat(results, hasSize(2));
    assertThat(results, hasItems(5, 6));
}

6. Conclusion

In this article, we showed how to write our RxJava operators.

And, as always, the full source code can be found over on GitHub.


« 上一篇: Apache Commons Chain
» 下一篇: Java Hoverfly 库用法