1. 概述

在 RxJava 中,多个订阅者(Subscriber)订阅同一个 Observable 时,默认行为并不总是符合预期。本文将深入探讨如何通过 ConnectableObservable 等机制,优雅地控制多个订阅者的行为,避免重复计算或资源浪费。

我们首先看默认行为带来的问题,然后逐步引入解决方案,包括 publish()connect()autoConnect()refCount(),并结合实际示例说明其适用场景。

2. 默认行为:每个订阅者独立触发数据流

当多个订阅者分别订阅一个普通的 Observable 时,每个订阅都会独立触发数据源的生成逻辑。这意味着如果数据获取是耗时操作(如网络请求、数据库查询),就会被重复执行,造成性能浪费。

来看一个简单例子:

private static Observable<Integer> getObservable() {
    return Observable.create(subscriber -> {
        subscriber.onNext(gettingValue(1));
        subscriber.onNext(gettingValue(2));

        subscriber.add(Subscriptions.create(() -> {
            LOGGER.info("Clear resources");
        }));
    });
}

private static Integer gettingValue(int i) {
    LOGGER.info("Getting " + i);
    return i;
}

我们创建两个订阅者:

LOGGER.info("Subscribing");

Observable<Integer> obs = getObservable();

Subscription s1 = obs.subscribe(i -> LOGGER.info("subscriber#1 is printing " + i));
Subscription s2 = obs.subscribe(i -> LOGGER.info("subscriber#2 is printing " + i));

s1.unsubscribe();
s2.unsubscribe();

输出结果:

Subscribing
Getting 1
subscriber#1 is printing 1
Getting 2
subscriber#1 is printing 2
Getting 1
subscriber#2 is printing 1
Getting 2
subscriber#2 is printing 2
Clear resources
Clear resources

关键点总结:

  • gettingValue() 被调用了两次(每个订阅者一次)
  • 资源清理也执行了两次
  • 数据获取是“冷”的,每个订阅者都触发一次完整流程

⚠️ 这种行为在需要共享数据源时是不可接受的。我们需要让多个订阅者共享同一个数据流实例

3. 使用 ConnectableObservable 实现共享订阅

ConnectableObservable 是解决该问题的核心工具。它允许将一个普通的 Observable 转换为“可连接”的形式,只有显式调用 connect() 时才会真正启动数据流,从而实现多个订阅者共享同一份数据。

3.1 创建 ConnectableObservable:publish()

使用 publish() 方法将普通 Observable 转换为 ConnectableObservable

ConnectableObservable<Integer> obs = getObservable().publish();

⚠️ 注意:此时数据流并未启动,即使有订阅者也不会触发 onNext

3.2 手动控制启动:connect()

调用 connect() 方法才会真正激活数据流:

LOGGER.info("Subscribing");
obs.subscribe(i -> LOGGER.info("subscriber #1 is printing " + i));
obs.subscribe(i -> LOGGER.info("subscriber #2 is printing " + i));

Thread.sleep(1000);
LOGGER.info("Connecting");

Subscription s = obs.connect();
s.unsubscribe();

输出:

Subscribing
Connecting
Getting 1
subscriber #1 is printing 1
subscriber #2 is printing 1
Getting 2
subscriber #1 is printing 2
subscriber #2 is printing 2
Clear resources

效果:

  • gettingValue() 只执行一次
  • 资源清理只执行一次
  • 两个订阅者共享同一份数据流

⚠️ 这种“先订阅,后连接”的模式非常适合需要确保所有订阅者都能收到完整数据序列的场景。

3.3 保持数据流一致性:connect() 时机控制

场景:Hot Observable(热数据流)

假设我们有一个基于鼠标点击的 Observable,它持续发射事件,与订阅时间无关:

private static Observable<Integer> getObservable() {
    return Observable.create(subscriber -> {
        frame.addMouseListener(new MouseAdapter() {
            @Override
            public void mouseClicked(MouseEvent e) {
                subscriber.onNext(e.getX());
            }
        });
        subscriber.add(Subscriptions.create(() -> {
            LOGGER.info("Clear resources");
            for (MouseListener listener : frame.getListeners(MouseListener.class)) {
                frame.removeMouseListener(listener);
            }
        }));
    });
}
默认行为(无 connect)
public static void defaultBehaviour() throws InterruptedException {
    Observable<Integer> obs = getObservable();

    LOGGER.info("subscribing #1");
    Subscription subscription1 = obs.subscribe(i -> 
        LOGGER.info("subscriber#1 is printing x-coordinate " + i));
    Thread.sleep(1000);
    LOGGER.info("subscribing #2");
    Subscription subscription2 = obs.subscribe(i -> 
        LOGGER.info("subscriber#2 is printing x-coordinate " + i));
    // ... unsubscribe logic
}

输出(部分):

subscribing #1
subscriber#1 is printing x-coordinate 280
subscriber#1 is printing x-coordinate 242
subscribing #2
subscriber#1 is printing x-coordinate 343
subscriber#2 is printing x-coordinate 343

❌ 问题:第二个订阅者错过了前两次点击。

使用 connect() 保证一致性
public static void subscribeBeforeConnect() throws InterruptedException {
    ConnectableObservable<Integer> obs = getObservable().publish();

    LOGGER.info("subscribing #1");
    Subscription subscription1 = obs.subscribe(i -> 
        LOGGER.info("subscriber#1 is printing x-coordinate " + i));
    Thread.sleep(1000);
    LOGGER.info("subscribing #2");
    Subscription subscription2 = obs.subscribe(i -> 
        LOGGER.info("subscriber#2 is printing x-coordinate " + i));
    
    Thread.sleep(1000);
    LOGGER.info("connecting:");
    Subscription s = obs.connect();
    // ... unsubscribe
}

输出:

subscribing #1
subscribing #2
connecting:
subscriber#1 is printing x-coordinate 317
subscriber#2 is printing x-coordinate 317

✅ 效果:两个订阅者从 connect() 之后的事件开始,同步接收相同的数据流

3.4 提前启动数据流:connect() 在 subscribe() 之前

有时我们希望数据流尽早开始,即使还没有订阅者。这在需要“预热”或后台处理数据时很有用。

public static void connectBeforeSubscribe() throws InterruptedException {
    ConnectableObservable<Integer> obs = getObservable()
        .doOnNext(x -> LOGGER.info("saving " + x))
        .publish();

    LOGGER.info("connecting:");
    Subscription s = obs.connect(); // 数据流立即启动
    Thread.sleep(1000);

    LOGGER.info("subscribing #1");
    obs.subscribe(i -> LOGGER.info("subscriber#1 is printing x-coordinate " + i));
    Thread.sleep(1000);

    LOGGER.info("subscribing #2");
    obs.subscribe(i -> LOGGER.info("subscriber#2 is printing x-coordinate " + i));
    
    s.unsubscribe();
}

输出:

connecting:
saving 306
saving 248
subscribing #1
saving 377
subscriber#1 is printing x-coordinate 377

关键机制:

  • connect() 返回的 Subscription 代表一个“虚拟订阅者”
  • 该订阅者持续消费数据,即使没有真实订阅者
  • 真实订阅者加入后,自动接收后续数据

3.5 自动连接:autoConnect()

autoConnect() 在第一个订阅者到来时自动调用 connect(),无需手动控制。

public static void autoConnectAndSubscribe() throws InterruptedException {
    Observable<Integer> obs = getObservable()
        .doOnNext(x -> LOGGER.info("saving " + x))
        .publish()
        .autoConnect();

    LOGGER.info("autoconnect()");
    Thread.sleep(1000);
    LOGGER.info("subscribing #1");
    Subscription s1 = obs.subscribe(i -> 
        LOGGER.info("subscriber#1 is printing x-coordinate " + i));
    Thread.sleep(1000);
    LOGGER.info("subscribing #2");
    Subscription s2 = obs.subscribe(i -> 
        LOGGER.info("subscriber#2 is printing x-coordinate " + i));
    // ... unsubscribe
}

⚠️ 注意: autoConnect() 不会在最后一个订阅者取消后断开连接。即使所有真实订阅者都已取消,后台的“虚拟订阅者”仍会继续消费数据。

输出末尾:

unsubscribe 2
saving 278
saving 268

风险: 可能导致资源泄漏不必要的后台处理

3.6 引用计数自动连接:refCount()

refCount()autoConnect() 的“智能”版本。它在第一个订阅者到来时自动连接,在最后一个订阅者取消时自动断开。

public static void refCountAndSubscribe() throws InterruptedException {
    Observable<Integer> obs = getObservable()
        .doOnNext(x -> LOGGER.info("saving " + x))
        .publish()
        .refCount();

    LOGGER.info("refcount()");
    Thread.sleep(1000);
    LOGGER.info("subscribing #1");
    Subscription subscription1 = obs.subscribe(i -> 
        LOGGER.info("subscriber#1 is printing x-coordinate " + i));
    Thread.sleep(1000);
    LOGGER.info("subscribing #2");
    Subscription subscription2 = obs.subscribe(i -> 
        LOGGER.info("subscriber#2 is printing x-coordinate " + i));
    
    Thread.sleep(1000);
    LOGGER.info("unsubscribe#1");
    subscription1.unsubscribe();
    Thread.sleep(1000);
    LOGGER.info("unsubscribe#2");
    subscription2.unsubscribe();
}

输出末尾:

unsubscribe#2
clearing resources

优势:

  • 自动管理连接生命周期
  • 避免资源泄漏
  • 最适合“按需启动、用完即停”的场景

4. 总结

方法 连接时机 断开时机 适用场景
connect() 手动调用 手动调用 unsubscribe() 需要精确控制启动/停止
autoConnect() 第一个订阅者 永不自动断开 后台持续运行,不关心订阅状态
refCount() 第一个订阅者 最后一个订阅者取消 按需启动,资源敏感型应用 ✅ 推荐

📌 核心建议:

  • 对于大多数需要共享数据流的场景,**优先使用 refCount()**,简单粗暴且安全。
  • 需要延迟启动或预热数据时,使用 connect() 手动控制。
  • 谨慎使用 autoConnect(),避免后台无限运行导致的性能问题。

完整示例代码已托管至 GitHub: https://github.com/techlead-pro/rxjava-examples


原始标题:RxJava One Observable, Multiple Subscribers