1. 概述
在 RxJava 中,多个订阅者(Subscriber)订阅同一个 Observable 时,默认行为并不总是符合预期。本文将深入探讨如何通过 ConnectableObservable
等机制,优雅地控制多个订阅者的行为,避免重复计算或资源浪费。
我们首先看默认行为带来的问题,然后逐步引入解决方案,包括 publish()
、connect()
、autoConnect()
和 refCount()
,并结合实际示例说明其适用场景。
2. 默认行为:每个订阅者独立触发数据流
当多个订阅者分别订阅一个普通的 Observable 时,每个订阅都会独立触发数据源的生成逻辑。这意味着如果数据获取是耗时操作(如网络请求、数据库查询),就会被重复执行,造成性能浪费。
来看一个简单例子:
private static Observable<Integer> getObservable() {
return Observable.create(subscriber -> {
subscriber.onNext(gettingValue(1));
subscriber.onNext(gettingValue(2));
subscriber.add(Subscriptions.create(() -> {
LOGGER.info("Clear resources");
}));
});
}
private static Integer gettingValue(int i) {
LOGGER.info("Getting " + i);
return i;
}
我们创建两个订阅者:
LOGGER.info("Subscribing");
Observable<Integer> obs = getObservable();
Subscription s1 = obs.subscribe(i -> LOGGER.info("subscriber#1 is printing " + i));
Subscription s2 = obs.subscribe(i -> LOGGER.info("subscriber#2 is printing " + i));
s1.unsubscribe();
s2.unsubscribe();
输出结果:
Subscribing
Getting 1
subscriber#1 is printing 1
Getting 2
subscriber#1 is printing 2
Getting 1
subscriber#2 is printing 1
Getting 2
subscriber#2 is printing 2
Clear resources
Clear resources
✅ 关键点总结:
gettingValue()
被调用了两次(每个订阅者一次)- 资源清理也执行了两次
- 数据获取是“冷”的,每个订阅者都触发一次完整流程
⚠️ 这种行为在需要共享数据源时是不可接受的。我们需要让多个订阅者共享同一个数据流实例。
3. 使用 ConnectableObservable 实现共享订阅
ConnectableObservable
是解决该问题的核心工具。它允许将一个普通的 Observable 转换为“可连接”的形式,只有显式调用 connect()
时才会真正启动数据流,从而实现多个订阅者共享同一份数据。
3.1 创建 ConnectableObservable:publish()
使用 publish()
方法将普通 Observable 转换为 ConnectableObservable
:
ConnectableObservable<Integer> obs = getObservable().publish();
⚠️ 注意:此时数据流并未启动,即使有订阅者也不会触发 onNext
。
3.2 手动控制启动:connect()
调用 connect()
方法才会真正激活数据流:
LOGGER.info("Subscribing");
obs.subscribe(i -> LOGGER.info("subscriber #1 is printing " + i));
obs.subscribe(i -> LOGGER.info("subscriber #2 is printing " + i));
Thread.sleep(1000);
LOGGER.info("Connecting");
Subscription s = obs.connect();
s.unsubscribe();
输出:
Subscribing
Connecting
Getting 1
subscriber #1 is printing 1
subscriber #2 is printing 1
Getting 2
subscriber #1 is printing 2
subscriber #2 is printing 2
Clear resources
✅ 效果:
gettingValue()
只执行一次- 资源清理只执行一次
- 两个订阅者共享同一份数据流
⚠️ 这种“先订阅,后连接”的模式非常适合需要确保所有订阅者都能收到完整数据序列的场景。
3.3 保持数据流一致性:connect() 时机控制
场景:Hot Observable(热数据流)
假设我们有一个基于鼠标点击的 Observable,它持续发射事件,与订阅时间无关:
private static Observable<Integer> getObservable() {
return Observable.create(subscriber -> {
frame.addMouseListener(new MouseAdapter() {
@Override
public void mouseClicked(MouseEvent e) {
subscriber.onNext(e.getX());
}
});
subscriber.add(Subscriptions.create(() -> {
LOGGER.info("Clear resources");
for (MouseListener listener : frame.getListeners(MouseListener.class)) {
frame.removeMouseListener(listener);
}
}));
});
}
默认行为(无 connect)
public static void defaultBehaviour() throws InterruptedException {
Observable<Integer> obs = getObservable();
LOGGER.info("subscribing #1");
Subscription subscription1 = obs.subscribe(i ->
LOGGER.info("subscriber#1 is printing x-coordinate " + i));
Thread.sleep(1000);
LOGGER.info("subscribing #2");
Subscription subscription2 = obs.subscribe(i ->
LOGGER.info("subscriber#2 is printing x-coordinate " + i));
// ... unsubscribe logic
}
输出(部分):
subscribing #1
subscriber#1 is printing x-coordinate 280
subscriber#1 is printing x-coordinate 242
subscribing #2
subscriber#1 is printing x-coordinate 343
subscriber#2 is printing x-coordinate 343
❌ 问题:第二个订阅者错过了前两次点击。
使用 connect() 保证一致性
public static void subscribeBeforeConnect() throws InterruptedException {
ConnectableObservable<Integer> obs = getObservable().publish();
LOGGER.info("subscribing #1");
Subscription subscription1 = obs.subscribe(i ->
LOGGER.info("subscriber#1 is printing x-coordinate " + i));
Thread.sleep(1000);
LOGGER.info("subscribing #2");
Subscription subscription2 = obs.subscribe(i ->
LOGGER.info("subscriber#2 is printing x-coordinate " + i));
Thread.sleep(1000);
LOGGER.info("connecting:");
Subscription s = obs.connect();
// ... unsubscribe
}
输出:
subscribing #1
subscribing #2
connecting:
subscriber#1 is printing x-coordinate 317
subscriber#2 is printing x-coordinate 317
✅ 效果:两个订阅者从 connect()
之后的事件开始,同步接收相同的数据流。
3.4 提前启动数据流:connect() 在 subscribe() 之前
有时我们希望数据流尽早开始,即使还没有订阅者。这在需要“预热”或后台处理数据时很有用。
public static void connectBeforeSubscribe() throws InterruptedException {
ConnectableObservable<Integer> obs = getObservable()
.doOnNext(x -> LOGGER.info("saving " + x))
.publish();
LOGGER.info("connecting:");
Subscription s = obs.connect(); // 数据流立即启动
Thread.sleep(1000);
LOGGER.info("subscribing #1");
obs.subscribe(i -> LOGGER.info("subscriber#1 is printing x-coordinate " + i));
Thread.sleep(1000);
LOGGER.info("subscribing #2");
obs.subscribe(i -> LOGGER.info("subscriber#2 is printing x-coordinate " + i));
s.unsubscribe();
}
输出:
connecting:
saving 306
saving 248
subscribing #1
saving 377
subscriber#1 is printing x-coordinate 377
✅ 关键机制:
connect()
返回的Subscription
代表一个“虚拟订阅者”- 该订阅者持续消费数据,即使没有真实订阅者
- 真实订阅者加入后,自动接收后续数据
3.5 自动连接:autoConnect()
autoConnect()
在第一个订阅者到来时自动调用 connect()
,无需手动控制。
public static void autoConnectAndSubscribe() throws InterruptedException {
Observable<Integer> obs = getObservable()
.doOnNext(x -> LOGGER.info("saving " + x))
.publish()
.autoConnect();
LOGGER.info("autoconnect()");
Thread.sleep(1000);
LOGGER.info("subscribing #1");
Subscription s1 = obs.subscribe(i ->
LOGGER.info("subscriber#1 is printing x-coordinate " + i));
Thread.sleep(1000);
LOGGER.info("subscribing #2");
Subscription s2 = obs.subscribe(i ->
LOGGER.info("subscriber#2 is printing x-coordinate " + i));
// ... unsubscribe
}
⚠️ 注意: autoConnect()
不会在最后一个订阅者取消后断开连接。即使所有真实订阅者都已取消,后台的“虚拟订阅者”仍会继续消费数据。
输出末尾:
unsubscribe 2
saving 278
saving 268
❌ 风险: 可能导致资源泄漏或不必要的后台处理。
3.6 引用计数自动连接:refCount()
refCount()
是 autoConnect()
的“智能”版本。它在第一个订阅者到来时自动连接,在最后一个订阅者取消时自动断开。
public static void refCountAndSubscribe() throws InterruptedException {
Observable<Integer> obs = getObservable()
.doOnNext(x -> LOGGER.info("saving " + x))
.publish()
.refCount();
LOGGER.info("refcount()");
Thread.sleep(1000);
LOGGER.info("subscribing #1");
Subscription subscription1 = obs.subscribe(i ->
LOGGER.info("subscriber#1 is printing x-coordinate " + i));
Thread.sleep(1000);
LOGGER.info("subscribing #2");
Subscription subscription2 = obs.subscribe(i ->
LOGGER.info("subscriber#2 is printing x-coordinate " + i));
Thread.sleep(1000);
LOGGER.info("unsubscribe#1");
subscription1.unsubscribe();
Thread.sleep(1000);
LOGGER.info("unsubscribe#2");
subscription2.unsubscribe();
}
输出末尾:
unsubscribe#2
clearing resources
✅ 优势:
- 自动管理连接生命周期
- 避免资源泄漏
- 最适合“按需启动、用完即停”的场景
4. 总结
方法 | 连接时机 | 断开时机 | 适用场景 |
---|---|---|---|
connect() |
手动调用 | 手动调用 unsubscribe() |
需要精确控制启动/停止 |
autoConnect() |
第一个订阅者 | 永不自动断开 | 后台持续运行,不关心订阅状态 |
refCount() |
第一个订阅者 | 最后一个订阅者取消 | 按需启动,资源敏感型应用 ✅ 推荐 |
📌 核心建议:
- 对于大多数需要共享数据流的场景,**优先使用
refCount()
**,简单粗暴且安全。 - 需要延迟启动或预热数据时,使用
connect()
手动控制。 - 谨慎使用
autoConnect()
,避免后台无限运行导致的性能问题。
完整示例代码已托管至 GitHub: https://github.com/techlead-pro/rxjava-examples