1. 引言
RxJava 的流行催生了众多扩展其功能的第三方库。这些库大多旨在解决开发者使用 RxJava 时遇到的典型问题,**RxRelay** 就是其中之一。
2. 关于 Subject 的踩坑
简单来说,Subject 是 Observable 和 Observer 之间的桥梁。作为 Observer,它能订阅一个或多个 Observable 并接收事件;同时作为 Observable,它能向订阅者重放事件或发射新事件。更多 Subject 的细节可参考这篇文章。
Subject 的一个常见坑是:一旦接收到 onComplete()
或 onError()
,它就彻底无法再传递数据。有时这是预期行为,但有时不是。当需要避免这种终止状态时,就该考虑使用 RxRelay 了。
3. Relay 基础
*本质上 Relay 就是去掉 onComplete()
和 onError()
能力的 Subject,因此它能持续发射数据。*
这让我们在不同类型 API 之间搭建桥梁时,无需担心意外触发终止状态。
使用 RxRelay 需添加依赖:
<dependency>
<groupId>com.jakewharton.rxrelay2</groupId>
<artifactId>rxrelay</artifactId>
<version>1.2.0</version>
</dependency>
4. Relay 类型详解
库中提供了三种 Relay 类型,我们快速过一遍。
4.1. PublishRelay
✅ 特性:订阅后只接收后续发射的事件
⚠️ 无缓冲机制,行为类似冷 Observable
public void whenObserverSubscribedToPublishRelay_itReceivesEmittedEvents() {
PublishRelay<Integer> publishRelay = PublishRelay.create();
TestObserver<Integer> firstObserver = TestObserver.create();
TestObserver<Integer> secondObserver = TestObserver.create();
publishRelay.subscribe(firstObserver);
firstObserver.assertSubscribed();
publishRelay.accept(5);
publishRelay.accept(10);
publishRelay.subscribe(secondObserver);
secondObserver.assertSubscribed();
publishRelay.accept(15);
firstObserver.assertValues(5, 10, 15);
// 第二个观察者只收到最后的事件
secondObserver.assertValue(15);
}
4.2. BehaviorRelay
✅ 特性:订阅时立即接收最近的事件,以及之后所有事件
❌ 无默认值时首次订阅会收到空事件
创建方式对比:
方法 | 行为 | 使用场景 |
---|---|---|
createDefault(1) |
提供默认值 | 需要兜底数据 |
create() |
无默认值 | 允许空状态 |
// 带默认值版本
public void whenObserverSubscribedToBehaviorRelay_itReceivesDefaultValue() {
BehaviorRelay<Integer> behaviorRelay = BehaviorRelay.createDefault(1);
TestObserver<Integer> firstObserver = new TestObserver<>();
behaviorRelay.subscribe(firstObserver);
firstObserver.assertValue(1);
}
// 无默认值版本
public void whenObserverSubscribedToBehaviorRelayWithoutDefaultValue_itIsEmpty() {
BehaviorRelay<Integer> behaviorRelay = BehaviorRelay.create();
TestObserver<Integer> firstObserver = new TestObserver<>();
behaviorRelay.subscribe(firstObserver);
firstObserver.assertEmpty();
}
完整示例:
public void whenObserverSubscribedToBehaviorRelay_itReceivesEmittedEvents() {
BehaviorRelay<Integer> behaviorRelay = BehaviorRelay.create();
TestObserver<Integer> firstObserver = TestObserver.create();
TestObserver<Integer> secondObserver = TestObserver.create();
behaviorRelay.accept(5);
behaviorRelay.subscribe(firstObserver);
behaviorRelay.accept(10);
behaviorRelay.subscribe(secondObserver);
behaviorRelay.accept(15);
firstObserver.assertValues(5, 10, 15);
secondObserver.assertValues(10, 15);
}
4.3. ReplayRelay
✅ 特性:缓冲所有事件,新订阅者会收到完整历史记录
⚠️ 内存占用需警惕!支持设置缓冲大小/时间上限
三种创建方式:
无限制缓冲(默认)
public void whenObserverSubscribedToReplayRelay_itReceivesEmittedEvents() { ReplayRelay<Integer> replayRelay = ReplayRelay.create(); TestObserver<Integer> firstObserver = TestObserver.create(); TestObserver<Integer> secondObserver = TestObserver.create(); replayRelay.subscribe(firstObserver); replayRelay.accept(5); replayRelay.accept(10); replayRelay.accept(15); replayRelay.subscribe(secondObserver); firstObserver.assertValues(5, 10, 15); secondObserver.assertValues(5, 10, 15); }
限制缓冲大小(简单粗暴防OOM)
public void whenObserverSubscribedToReplayRelayWithLimitedSize_itReceivesEmittedEvents() { ReplayRelay<Integer> replayRelay = ReplayRelay.createWithSize(2); TestObserver<Integer> firstObserver = TestObserver.create(); replayRelay.accept(5); replayRelay.accept(10); replayRelay.accept(15); // 超出缓冲大小,5被丢弃 replayRelay.accept(20); // 超出缓冲大小,10被丢弃 replayRelay.subscribe(firstObserver); firstObserver.assertValues(15, 20); // 只收到最后两个 }
限制事件存活时间
public void whenObserverSubscribedToReplayRelayWithMaxAge_thenItReceivesEmittedEvents() { SingleScheduler scheduler = new SingleScheduler(); ReplayRelay<Integer> replayRelay = ReplayRelay.createWithTime(2000, TimeUnit.MILLISECONDS, scheduler); TestObserver<Integer> firstObserver = TestObserver.create(); replayRelay.accept(5); replayRelay.accept(10); Thread.sleep(3000); // 事件过期 replayRelay.subscribe(firstObserver); firstObserver.assertEmpty(); // 收不到任何事件 }
5. 自定义 Relay
所有 Relay 类型都继承自抽象类 Relay
,这意味着我们可以轻松实现自定义版本。只需实现三个核心方法:
accept()
:处理数据发送hasObservers()
:检查观察者数量subscribeActual()
:管理订阅逻辑
示例:随机发送事件的 Relay
(简单粗暴实现,仅作演示)
public class RandomRelay extends Relay<Integer> {
Random random = new Random();
List<Observer<? super Integer>> observers = new ArrayList<>();
@Override
public void accept(Integer integer) {
if (observers.isEmpty()) return;
int observerIndex = random.nextInt(observers.size());
observers.get(observerIndex).onNext(integer);
}
@Override
public boolean hasObservers() {
return !observers.isEmpty();
}
@Override
protected void subscribeActual(Observer<? super Integer> observer) {
observers.add(observer);
observer.onSubscribe(Disposables.fromRunnable(
() -> System.out.println("资源已释放")));
}
}
测试验证(确保只有随机一个观察者收到事件):
public void whenTwoObserversSubscribedToRandomRelay_thenOnlyOneReceivesEvent() {
RandomRelay randomRelay = new RandomRelay();
TestObserver<Integer> firstObserver = TestObserver.create();
TestObserver<Integer> secondObserver = TestObserver.create();
randomRelay.subscribe(firstObserver);
randomRelay.subscribe(secondObserver);
randomRelay.accept(5);
// 必有一个观察者收到事件,另一个为空
if (firstObserver.values().isEmpty()) {
secondObserver.assertValue(5);
} else {
firstObserver.assertValue(5);
secondObserver.assertEmpty();
}
}
6. 总结
本文介绍了 RxRelay 这个特殊的 Subject 变体——它通过移除终止事件的能力,解决了数据流意外中断的问题。核心优势包括:
- ✅ 避免因
onComplete()
/onError()
导致的数据流终止 - ✅ 三种内置类型满足不同场景需求
- ✅ 支持灵活的自定义扩展