1. 引言

RxJava 的流行催生了众多扩展其功能的第三方库。这些库大多旨在解决开发者使用 RxJava 时遇到的典型问题,**RxRelay** 就是其中之一。

2. 关于 Subject 的踩坑

简单来说,SubjectObservableObserver 之间的桥梁。作为 Observer,它能订阅一个或多个 Observable 并接收事件;同时作为 Observable,它能向订阅者重放事件或发射新事件。更多 Subject 的细节可参考这篇文章

Subject 的一个常见坑是:一旦接收到 onComplete()onError(),它就彻底无法再传递数据。有时这是预期行为,但有时不是。当需要避免这种终止状态时,就该考虑使用 RxRelay 了。

3. Relay 基础

*本质上 Relay 就是去掉 onComplete()onError() 能力的 Subject,因此它能持续发射数据。*
这让我们在不同类型 API 之间搭建桥梁时,无需担心意外触发终止状态。

使用 RxRelay 需添加依赖:

<dependency>
  <groupId>com.jakewharton.rxrelay2</groupId>
  <artifactId>rxrelay</artifactId>
  <version>1.2.0</version>
</dependency>

4. Relay 类型详解

库中提供了三种 Relay 类型,我们快速过一遍。

4.1. PublishRelay

特性:订阅后只接收后续发射的事件
⚠️ 无缓冲机制,行为类似冷 Observable

public void whenObserverSubscribedToPublishRelay_itReceivesEmittedEvents() {
    PublishRelay<Integer> publishRelay = PublishRelay.create();
    TestObserver<Integer> firstObserver = TestObserver.create();
    TestObserver<Integer> secondObserver = TestObserver.create();
    
    publishRelay.subscribe(firstObserver);
    firstObserver.assertSubscribed();
    publishRelay.accept(5);
    publishRelay.accept(10);
    publishRelay.subscribe(secondObserver);
    secondObserver.assertSubscribed();
    publishRelay.accept(15);
    firstObserver.assertValues(5, 10, 15);
    
    // 第二个观察者只收到最后的事件
    secondObserver.assertValue(15);
}

4.2. BehaviorRelay

特性:订阅时立即接收最近的事件,以及之后所有事件
❌ 无默认值时首次订阅会收到空事件

创建方式对比

方法 行为 使用场景
createDefault(1) 提供默认值 需要兜底数据
create() 无默认值 允许空状态
// 带默认值版本
public void whenObserverSubscribedToBehaviorRelay_itReceivesDefaultValue() {
    BehaviorRelay<Integer> behaviorRelay = BehaviorRelay.createDefault(1);
    TestObserver<Integer> firstObserver = new TestObserver<>();
    behaviorRelay.subscribe(firstObserver);
    firstObserver.assertValue(1);
}

// 无默认值版本
public void whenObserverSubscribedToBehaviorRelayWithoutDefaultValue_itIsEmpty() {
    BehaviorRelay<Integer> behaviorRelay = BehaviorRelay.create();
    TestObserver<Integer> firstObserver = new TestObserver<>();
    behaviorRelay.subscribe(firstObserver);
    firstObserver.assertEmpty();
}

完整示例:

public void whenObserverSubscribedToBehaviorRelay_itReceivesEmittedEvents() {
    BehaviorRelay<Integer> behaviorRelay = BehaviorRelay.create();
    TestObserver<Integer> firstObserver = TestObserver.create();
    TestObserver<Integer> secondObserver = TestObserver.create();
    behaviorRelay.accept(5);     
    behaviorRelay.subscribe(firstObserver);
    behaviorRelay.accept(10);
    behaviorRelay.subscribe(secondObserver);
    behaviorRelay.accept(15);
    firstObserver.assertValues(5, 10, 15);
    secondObserver.assertValues(10, 15);
}

4.3. ReplayRelay

特性:缓冲所有事件,新订阅者会收到完整历史记录
⚠️ 内存占用需警惕!支持设置缓冲大小/时间上限

三种创建方式

  1. 无限制缓冲(默认)

    public void whenObserverSubscribedToReplayRelay_itReceivesEmittedEvents() {
        ReplayRelay<Integer> replayRelay = ReplayRelay.create();
        TestObserver<Integer> firstObserver = TestObserver.create();
        TestObserver<Integer> secondObserver = TestObserver.create();
        replayRelay.subscribe(firstObserver);
        replayRelay.accept(5);
        replayRelay.accept(10);
        replayRelay.accept(15);
        replayRelay.subscribe(secondObserver);
        firstObserver.assertValues(5, 10, 15);
        secondObserver.assertValues(5, 10, 15);
    }
    
  2. 限制缓冲大小(简单粗暴防OOM)

    public void whenObserverSubscribedToReplayRelayWithLimitedSize_itReceivesEmittedEvents() {
        ReplayRelay<Integer> replayRelay = ReplayRelay.createWithSize(2);
        TestObserver<Integer> firstObserver = TestObserver.create();
        replayRelay.accept(5);
        replayRelay.accept(10);
        replayRelay.accept(15);  // 超出缓冲大小,5被丢弃
        replayRelay.accept(20);  // 超出缓冲大小,10被丢弃
        replayRelay.subscribe(firstObserver);
        firstObserver.assertValues(15, 20);  // 只收到最后两个
    }
    
  3. 限制事件存活时间

    public void whenObserverSubscribedToReplayRelayWithMaxAge_thenItReceivesEmittedEvents() {
        SingleScheduler scheduler = new SingleScheduler();
        ReplayRelay<Integer> replayRelay =
          ReplayRelay.createWithTime(2000, TimeUnit.MILLISECONDS, scheduler);
        TestObserver<Integer> firstObserver = TestObserver.create();
        replayRelay.accept(5);
        replayRelay.accept(10);
        Thread.sleep(3000);  // 事件过期
        replayRelay.subscribe(firstObserver);
        firstObserver.assertEmpty();  // 收不到任何事件
    }
    

5. 自定义 Relay

所有 Relay 类型都继承自抽象类 Relay,这意味着我们可以轻松实现自定义版本。只需实现三个核心方法:

  • accept():处理数据发送
  • hasObservers():检查观察者数量
  • subscribeActual():管理订阅逻辑

示例:随机发送事件的 Relay
(简单粗暴实现,仅作演示)

public class RandomRelay extends Relay<Integer> {
    Random random = new Random();
    List<Observer<? super Integer>> observers = new ArrayList<>();

    @Override
    public void accept(Integer integer) {
        if (observers.isEmpty()) return;
        int observerIndex = random.nextInt(observers.size());
        observers.get(observerIndex).onNext(integer);
    }

    @Override
    public boolean hasObservers() {
        return !observers.isEmpty();
    }

    @Override
    protected void subscribeActual(Observer<? super Integer> observer) {
        observers.add(observer);
        observer.onSubscribe(Disposables.fromRunnable(
          () -> System.out.println("资源已释放")));
    }
}

测试验证(确保只有随机一个观察者收到事件):

public void whenTwoObserversSubscribedToRandomRelay_thenOnlyOneReceivesEvent() {
    RandomRelay randomRelay = new RandomRelay();
    TestObserver<Integer> firstObserver = TestObserver.create();
    TestObserver<Integer> secondObserver = TestObserver.create();
    randomRelay.subscribe(firstObserver);
    randomRelay.subscribe(secondObserver);
    randomRelay.accept(5);
    
    // 必有一个观察者收到事件,另一个为空
    if (firstObserver.values().isEmpty()) {
        secondObserver.assertValue(5);
    } else {
        firstObserver.assertValue(5);
        secondObserver.assertEmpty();
    }
}

6. 总结

本文介绍了 RxRelay 这个特殊的 Subject 变体——它通过移除终止事件的能力,解决了数据流意外中断的问题。核心优势包括:

  • ✅ 避免因 onComplete()/onError() 导致的数据流终止
  • ✅ 三种内置类型满足不同场景需求
  • ✅ 支持灵活的自定义扩展

进阶资料可查阅官方文档,完整代码示例见GitHub仓库


原始标题:Introduction to RxRelay for RxJava