1. 概述

本文将演示如何在 RxJava 中实现自定义操作符。我们将探讨两种实现方式:通过 Operator 接口和 Transformer 接口,以及如何用函数式方式实现。这些技术能帮你扩展 RxJava 的功能,解决特定业务场景的需求。

2. Maven 配置

首先确保在 pom.xml 中添加 rxjava 依赖:

<dependency>
    <groupId>io.reactivex</groupId>
    <artifactId>rxjava</artifactId>
    <version>1.3.0</version>
</dependency>

最新版本可在 Maven Central 查询。

3. 自定义 Operator

通过实现 Operator 接口创建自定义操作符。以下示例实现一个过滤非字母数字字符的操作符:

public class ToCleanString implements Operator<String, String> {

    public static ToCleanString toCleanString() {
        return new ToCleanString();
    }

    private ToCleanString() {
        super();
    }

    @Override
    public Subscriber<? super String> call(final Subscriber<? super String> subscriber) {
        return new Subscriber<String>(subscriber) {
            @Override
            public void onCompleted() {
                if (!subscriber.isUnsubscribed()) {
                    subscriber.onCompleted();
                }
            }

            @Override
            public void onError(Throwable t) {
                if (!subscriber.isUnsubscribed()) {
                    subscriber.onError(t);
                }
            }

            @Override
            public void onNext(String item) {
                if (!subscriber.isUnsubscribed()) {
                    final String result = item.replaceAll("[^A-Za-z0-9]", "");
                    subscriber.onNext(result);
                }
            }
        };
    }
}

⚠️ 关键点

  • 必须检查 subscriber.isUnsubscribed() 避免无效操作
  • 使用静态工厂方法提升链式调用可读性
  • 通过 lift() 操作符集成到流中:
observable.lift(toCleanString())....

测试用例验证功能:

@Test
public void whenUseCleanStringOperator_thenSuccess() {
    List<String> list = Arrays.asList("john_1", "tom-3");
    List<String> results = new ArrayList<>();
    Observable<String> observable = Observable
      .from(list)
      .lift(toCleanString());
    observable.subscribe(results::add);

    assertThat(results, notNullValue());
    assertThat(results, hasSize(2));
    assertThat(results, hasItems("john1", "tom3"));
}

4. Transformer 方式

通过实现 Transformer 接口创建操作符:

public class ToLength implements Transformer<String, Integer> {

    public static ToLength toLength() {
        return new ToLength();
    }

    private ToLength() {
        super();
    }

    @Override
    public Observable<Integer> call(Observable<String> source) {
        return source.map(String::length);
    }
}

核心差异

  • Transformer 操作整个 Observable 流
  • Operator 操作单个发射项
  • 使用 compose() 而非 lift()
observable.compose(toLength())...

测试用例:

@Test
public void whenUseToLengthOperator_thenSuccess() {
    List<String> list = Arrays.asList("john", "tom");
    List<Integer> results = new ArrayList<>();
    Observable<Integer> observable = Observable
      .from(list)
      .compose(toLength());
    observable.subscribe(results::add);

    assertThat(results, notNullValue());
    assertThat(results, hasSize(2));
    assertThat(results, hasItems(4, 3));
}

5. 函数式实现

直接用 Lambda 表达式替代类实现,更简单粗暴:

Operator 函数式实现

Operator<String, String> cleanStringFn = subscriber -> {
    return new Subscriber<String>(subscriber) {
        @Override
        public void onCompleted() {
            if (!subscriber.isUnsubscribed()) {
                subscriber.onCompleted();
            }
        }

        @Override
        public void onError(Throwable t) {
            if (!subscriber.isUnsubscribed()) {
                subscriber.onError(t);
            }
        }

        @Override
        public void onNext(String str) {
            if (!subscriber.isUnsubscribed()) {
                String result = str.replaceAll("[^A-Za-z0-9]", "");
                subscriber.onNext(result);
            }
        }
    };
};

测试用例:

List<String> results = new ArrayList<>();
Observable.from(Arrays.asList("ap_p-l@e", "or-an?ge"))
  .lift(cleanStringFn)
  .subscribe(results::add);

assertThat(results, notNullValue());
assertThat(results, hasSize(2));
assertThat(results, hasItems("apple", "orange"));

Transformer 函数式实现

@Test
public void whenUseFunctionTransformer_thenSuccess() {
    Transformer<String, Integer> toLengthFn = s -> s.map(String::length);

    List<Integer> results = new ArrayList<>();
    Observable.from(Arrays.asList("apple", "orange"))
      .compose(toLengthFn)
      .subscribe(results::add);

    assertThat(results, notNullValue());
    assertThat(results, hasSize(2));
    assertThat(results, hasItems(5, 6));
}

6. 总结

本文展示了三种实现 RxJava 自定义操作符的方式:

  1. Operator 接口:适合处理单个数据项
  2. Transformer 接口:适合处理整个数据流
  3. 函数式实现:简单场景下的轻量级方案

选择建议:

  • 需要操作流整体 → 用 Transformer
  • 需要操作单个项 → 用 Operator
  • 简单逻辑 → 优先函数式实现

完整源码可在 GitHub 获取。


原始标题:Implementing Custom Operators in RxJava

« 上一篇: Apache Commons Chain