1. 概述
本文将演示如何在 RxJava 中实现自定义操作符。我们将探讨两种实现方式:通过 Operator
接口和 Transformer
接口,以及如何用函数式方式实现。这些技术能帮你扩展 RxJava 的功能,解决特定业务场景的需求。
2. Maven 配置
首先确保在 pom.xml
中添加 rxjava
依赖:
<dependency>
<groupId>io.reactivex</groupId>
<artifactId>rxjava</artifactId>
<version>1.3.0</version>
</dependency>
最新版本可在 Maven Central 查询。
3. 自定义 Operator
通过实现 Operator
接口创建自定义操作符。以下示例实现一个过滤非字母数字字符的操作符:
public class ToCleanString implements Operator<String, String> {
public static ToCleanString toCleanString() {
return new ToCleanString();
}
private ToCleanString() {
super();
}
@Override
public Subscriber<? super String> call(final Subscriber<? super String> subscriber) {
return new Subscriber<String>(subscriber) {
@Override
public void onCompleted() {
if (!subscriber.isUnsubscribed()) {
subscriber.onCompleted();
}
}
@Override
public void onError(Throwable t) {
if (!subscriber.isUnsubscribed()) {
subscriber.onError(t);
}
}
@Override
public void onNext(String item) {
if (!subscriber.isUnsubscribed()) {
final String result = item.replaceAll("[^A-Za-z0-9]", "");
subscriber.onNext(result);
}
}
};
}
}
⚠️ 关键点:
- 必须检查
subscriber.isUnsubscribed()
避免无效操作 - 使用静态工厂方法提升链式调用可读性
- 通过
lift()
操作符集成到流中:
observable.lift(toCleanString())....
测试用例验证功能:
@Test
public void whenUseCleanStringOperator_thenSuccess() {
List<String> list = Arrays.asList("john_1", "tom-3");
List<String> results = new ArrayList<>();
Observable<String> observable = Observable
.from(list)
.lift(toCleanString());
observable.subscribe(results::add);
assertThat(results, notNullValue());
assertThat(results, hasSize(2));
assertThat(results, hasItems("john1", "tom3"));
}
4. Transformer 方式
通过实现 Transformer
接口创建操作符:
public class ToLength implements Transformer<String, Integer> {
public static ToLength toLength() {
return new ToLength();
}
private ToLength() {
super();
}
@Override
public Observable<Integer> call(Observable<String> source) {
return source.map(String::length);
}
}
✅ 核心差异:
Transformer
操作整个 Observable 流Operator
操作单个发射项- 使用
compose()
而非lift()
:
observable.compose(toLength())...
测试用例:
@Test
public void whenUseToLengthOperator_thenSuccess() {
List<String> list = Arrays.asList("john", "tom");
List<Integer> results = new ArrayList<>();
Observable<Integer> observable = Observable
.from(list)
.compose(toLength());
observable.subscribe(results::add);
assertThat(results, notNullValue());
assertThat(results, hasSize(2));
assertThat(results, hasItems(4, 3));
}
5. 函数式实现
直接用 Lambda 表达式替代类实现,更简单粗暴:
Operator 函数式实现:
Operator<String, String> cleanStringFn = subscriber -> {
return new Subscriber<String>(subscriber) {
@Override
public void onCompleted() {
if (!subscriber.isUnsubscribed()) {
subscriber.onCompleted();
}
}
@Override
public void onError(Throwable t) {
if (!subscriber.isUnsubscribed()) {
subscriber.onError(t);
}
}
@Override
public void onNext(String str) {
if (!subscriber.isUnsubscribed()) {
String result = str.replaceAll("[^A-Za-z0-9]", "");
subscriber.onNext(result);
}
}
};
};
测试用例:
List<String> results = new ArrayList<>();
Observable.from(Arrays.asList("ap_p-l@e", "or-an?ge"))
.lift(cleanStringFn)
.subscribe(results::add);
assertThat(results, notNullValue());
assertThat(results, hasSize(2));
assertThat(results, hasItems("apple", "orange"));
Transformer 函数式实现:
@Test
public void whenUseFunctionTransformer_thenSuccess() {
Transformer<String, Integer> toLengthFn = s -> s.map(String::length);
List<Integer> results = new ArrayList<>();
Observable.from(Arrays.asList("apple", "orange"))
.compose(toLengthFn)
.subscribe(results::add);
assertThat(results, notNullValue());
assertThat(results, hasSize(2));
assertThat(results, hasItems(5, 6));
}
6. 总结
本文展示了三种实现 RxJava 自定义操作符的方式:
- Operator 接口:适合处理单个数据项
- Transformer 接口:适合处理整个数据流
- 函数式实现:简单场景下的轻量级方案
选择建议:
- 需要操作流整体 → 用
Transformer
- 需要操作单个项 → 用
Operator
- 简单逻辑 → 优先函数式实现
完整源码可在 GitHub 获取。