1. 简介
在 RxJava 入门 之后,我们来深入探讨过滤操作符。本文将重点介绍过滤、跳过、时间过滤和一些更高级的过滤操作。
2. 基础过滤
当处理 Observable
时,有时需要从发射的数据项中筛选出特定子集。RxJava 提供了多种过滤能力来实现这个需求。我们从最基础的 filter
操作符开始。
2.1 filter
操作符
简单粗暴地说,filter
操作符通过 Predicate
条件过滤 Observable
,只保留满足条件的数据项。
下面演示如何筛选奇数:
Observable<Integer> sourceObservable = Observable.range(1, 10);
TestSubscriber<Integer> subscriber = new TestSubscriber();
Observable<Integer> filteredObservable = sourceObservable
.filter(i -> i % 2 != 0);
filteredObservable.subscribe(subscriber);
subscriber.assertValues(1, 3, 5, 7, 9);
2.2 take
操作符
take
操作符只保留前 n 个数据项,后续数据直接丢弃。
示例:只取前 3 个元素:
Observable<Integer> sourceObservable = Observable.range(1, 10);
TestSubscriber<Integer> subscriber = new TestSubscriber();
Observable<Integer> filteredObservable = sourceObservable.take(3);
filteredObservable.subscribe(subscriber);
subscriber.assertValues(1, 2, 3);
2.3 takeWhile
操作符
takeWhile
持续发射数据直到遇到第一个不满足 Predicate
的元素,之后立即终止。
示例:只取小于 4 的连续元素:
Observable<Integer> sourceObservable = Observable.just(1, 2, 3, 4, 3, 2, 1);
TestSubscriber<Integer> subscriber = new TestSubscriber();
Observable<Integer> filteredObservable = sourceObservable
.takeWhile(i -> i < 4);
filteredObservable.subscribe(subscriber);
subscriber.assertValues(1, 2, 3);
2.4 takeFirst
操作符
当需要获取第一个满足条件的元素时,takeFirst()
是最佳选择。
示例:获取第一个大于 5 的数:
Observable<Integer> sourceObservable = Observable
.just(1, 2, 3, 4, 5, 7, 6);
TestSubscriber<Integer> subscriber = new TestSubscriber();
Observable<Integer> filteredObservable = sourceObservable
.takeFirst(x -> x > 5);
filteredObservable.subscribe(subscriber);
subscriber.assertValue(7);
2.5 first
和 firstOrDefault
操作符
类似行为可通过 first
实现:
Observable<Integer> sourceObservable = Observable.range(1, 10);
TestSubscriber<Integer> subscriber = new TestSubscriber();
Observable<Integer> filteredObservable = sourceObservable.first();
filteredObservable.subscribe(subscriber);
subscriber.assertValue(1);
但若需指定空数据时的默认值,应使用 firstOrDefault
:
Observable<Integer> sourceObservable = Observable.empty();
Observable<Integer> filteredObservable = sourceObservable.firstOrDefault(-1);
filteredObservable.subscribe(subscriber);
subscriber.assertValue(-1);
2.6 takeLast
操作符
takeLast
只保留最后 n 个元素,其他全部丢弃。
示例:只取最后 3 个元素:
Observable<Integer> sourceObservable = Observable.range(1, 10);
TestSubscriber<Integer> subscriber = new TestSubscriber();
Observable<Integer> filteredObservable = sourceObservable.takeLast(3);
filteredObservable.subscribe(subscriber);
subscriber.assertValues(8, 9, 10);
⚠️ 注意:此操作符会延迟所有元素发射,直到源 Observable
完成。
2.7 last
和 lastOrDefault
除了 takeLast(1)
,last
可直接获取最后一个元素,并支持条件过滤:
Observable<Integer> sourceObservable = Observable.range(1, 10);
TestSubscriber<Integer> subscriber = new TestSubscriber();
Observable<Integer> filteredObservable = sourceObservable
.last(i -> i % 2 != 0);
filteredObservable.subscribe(subscriber);
subscriber.assertValue(9);
空数据或无匹配元素时,使用 lastOrDefault
返回默认值:
Observable<Integer> sourceObservable = Observable.range(1, 10);
TestSubscriber<Integer> subscriber = new TestSubscriber();
Observable<Integer> filteredObservable =
sourceObservable.lastOrDefault(-1, i -> i > 10);
filteredObservable.subscribe(subscriber);
subscriber.assertValue(-1);
2.8 elementAt
和 elementAtOrDefault
操作符
elementAt
通过索引获取指定位置的元素:
Observable<Integer> sourceObservable = Observable
.just(1, 2, 3, 5, 7, 11);
TestSubscriber<Integer> subscriber = new TestSubscriber();
Observable<Integer> filteredObservable = sourceObservable.elementAt(4);
filteredObservable.subscribe(subscriber);
subscriber.assertValue(7);
❌ 超出索引会抛 IndexOutOfBoundException
。改用 elementAtOrDefault
可避免:
Observable<Integer> sourceObservable = Observable
.just(1, 2, 3, 5, 7, 11);
TestSubscriber<Integer> subscriber = new TestSubscriber();
Observable<Integer> filteredObservable
= sourceObservable.elementAtOrDefault(7, -1);
filteredObservable.subscribe(subscriber);
subscriber.assertValue(-1);
2.9 ofType
操作符
当 Observable
发射混合类型数据时,ofType
可按类型过滤:
示例:只保留字符串类型:
Observable sourceObservable = Observable.just(1, "two", 3, "five", 7, 11);
TestSubscriber subscriber = new TestSubscriber();
Observable filteredObservable = sourceObservable.ofType(String.class);
filteredObservable.subscribe(subscriber);
subscriber.assertValues("two", "five");
3. 跳过操作
与过滤相反,RxJava 提供了跳过操作符作为过滤的补充。我们从 skip
开始。
3.1 skip
操作符
skip
直接丢弃前 n 个元素,保留后续所有元素。
示例:跳过前 4 个元素:
Observable<Integer> sourceObservable = Observable.range(1, 10);
TestSubscriber<Integer> subscriber = new TestSubscriber();
Observable<Integer> filteredObservable = sourceObservable.skip(4);
filteredObservable.subscribe(subscriber);
subscriber.assertValues(5, 6, 7, 8, 9, 10);
3.2 skipWhile
操作符
skipWhile
丢弃所有满足条件的初始元素,直到遇到第一个不满足条件的元素,之后保留所有后续元素。
示例:跳过所有小于 4 的初始元素:
Observable<Integer> sourceObservable = Observable
.just(1, 2, 3, 4, 5, 4, 3, 2, 1);
TestSubscriber<Integer> subscriber = new TestSubscriber();
Observable<Integer> filteredObservable = sourceObservable
.skipWhile(i -> i < 4);
filteredObservable.subscribe(subscriber);
subscriber.assertValues(4, 5, 4, 3, 2, 1);
3.3 skipLast
操作符
skipLast
丢弃最后 n 个元素,只保留前面的元素。
示例:跳过最后 5 个元素:
Observable<Integer> sourceObservable = Observable.range(1, 10);
TestSubscriber<Integer> subscriber = new TestSubscriber();
Observable<Integer> filteredObservable = sourceObservable.skipLast(5);
filteredObservable.subscribe(subscriber);
subscriber.assertValues(1, 2, 3, 4, 5);
3.4 distinct
和 distinctUntilChanged
操作符
distinct
过滤所有重复元素,只保留首次出现的元素:
Observable<Integer> sourceObservable = Observable
.just(1, 1, 2, 2, 1, 3, 3, 1);
TestSubscriber<Integer> subscriber = new TestSubscriber();
Observable<Integer> distinctObservable = sourceObservable.distinct();
distinctObservable.subscribe(subscriber);
subscriber.assertValues(1, 2, 3);
distinctUntilChanged
只过滤连续重复的元素:
Observable<Integer> sourceObservable = Observable
.just(1, 1, 2, 2, 1, 3, 3, 1);
TestSubscriber<Integer> subscriber = new TestSubscriber();
Observable<Integer> distinctObservable = sourceObservable.distinctUntilChanged();
distinctObservable.subscribe(subscriber);
subscriber.assertValues(1, 2, 1, 3, 1);
3.5 ignoreElements
操作符
ignoreElements
直接丢弃所有元素,只保留终止通知(onComplete/onError):
Observable<Integer> sourceObservable = Observable.range(1, 10);
TestSubscriber<Integer> subscriber = new TestSubscriber();
Observable<Integer> ignoredObservable = sourceObservable.ignoreElements();
ignoredObservable.subscribe(subscriber);
subscriber.assertNoValues();
4. 时间过滤操作符
在处理 Observable
序列时,时间维度常被忽略,但RxJava 提供了基于时间的过滤能力。先定义一个每秒发射一个元素的测试 Observable
:
TestScheduler testScheduler = new TestScheduler();
Observable<Integer> timedObservable = Observable
.just(1, 2, 3, 4, 5, 6)
.zipWith(Observable.interval(
0, 1, TimeUnit.SECONDS, testScheduler), (item, time) -> item);
TestScheduler
是特殊调度器,可手动推进时间,便于测试。
4.1 sample
和 throttleLast
操作符
sample
按固定时间间隔采样,发射每个周期内的最后一个元素:
示例:每 2.5 秒采样一次:
TestSubscriber<Integer> subscriber = new TestSubscriber();
Observable<Integer> sampledObservable = timedObservable
.sample(2500L, TimeUnit.MILLISECONDS, testScheduler);
sampledObservable.subscribe(subscriber);
testScheduler.advanceTimeBy(7, TimeUnit.SECONDS);
subscriber.assertValues(3, 5, 6);
✅ throttleLast
与 sample
行为完全一致。
4.2 throttleFirst
操作符
throttleFirst
发射每个采样周期内的第一个元素(而非最后一个):
示例:每 4.1 秒取第一个元素:
TestSubscriber<Integer> subscriber = new TestSubscriber();
Observable<Integer> filteredObservable = timedObservable
.throttleFirst(4100L, TimeUnit.MILLISECONDS, testScheduler); // 原文此处单位错误,应为 MILLISECONDS
filteredObservable.subscribe(subscriber);
testScheduler.advanceTimeBy(7, TimeUnit.SECONDS);
subscriber.assertValues(1, 6);
4.3 debounce
和 throttleWithTimeout
操作符
debounce
只发射指定时间窗口内最后一个元素,丢弃中间所有元素:
示例:设置 2 秒防抖:
TestSubscriber<Integer> subscriber = new TestSubscriber();
Observable<Integer> filteredObservable = timedObservable
.debounce(2000L, TimeUnit.MILLISECONDS, testScheduler);
filteredObservable.subscribe(subscriber);
testScheduler.advanceTimeBy(7, TimeUnit.SECONDS);
subscriber.assertValue(6);
✅ throttleWithTimeout
与 debounce
行为一致。
4.4 timeout
操作符
timeout
在指定时间内未收到元素时,立即终止并抛 TimeoutException
:
示例:设置 500 毫秒超时:
TestSubscriber<Integer> subscriber = new TestSubscriber();
Observable<Integer> filteredObservable = timedObservable
.timeout(500L, TimeUnit.MILLISECONDS, testScheduler);
filteredObservable.subscribe(subscriber);
testScheduler.advanceTimeBy(7, TimeUnit.SECONDS);
subscriber.assertError(TimeoutException.class);
subscriber.assertValues(1);
5. 多 Observable 过滤
**RxJava 支持基于第二个 Observable
动态过滤源 Observable
**。先定义一个延迟 3 秒发射的 Observable
:
Observable<Integer> delayedObservable = Observable.just(1)
.delay(3, TimeUnit.SECONDS, testScheduler);
5.1 takeUntil
操作符
takeUntil
在第二个 Observable
发射元素或终止时,立即停止源 Observable
的发射:
TestSubscriber<Integer> subscriber = new TestSubscriber();
Observable<Integer> filteredObservable = timedObservable
.takeUntil(delayedObservable); // 原文此处代码误写为 skipUntil
filteredObservable.subscribe(subscriber);
testScheduler.advanceTimeBy(7, TimeUnit.SECONDS);
subscriber.assertValues(1, 2, 3);
5.2 skipUntil
操作符
skipUntil
丢弃源 Observable
的所有元素,直到第二个 Observable
发射元素,之后开始正常发射:
TestSubscriber<Integer> subscriber = new TestSubscriber();
Observable<Integer> filteredObservable = timedObservable
.skipUntil(delayedObservable);
filteredObservable.subscribe(subscriber);
testScheduler.advanceTimeBy(7, TimeUnit.SECONDS);
subscriber.assertValues(4, 5, 6);
6. 总结
本文全面介绍了 RxJava 的过滤操作符,每个操作符都提供了简洁的示例。实际开发中,根据场景选择合适的操作符能显著简化代码逻辑。
所有示例代码可在 GitHub 获取。