1. 概述

本文将深入探讨 Java 9 的响应式流(Reactive Streams)。简单来说,我们将学习如何使用 Flow 类——它封装了构建响应式流处理逻辑的核心组件。

响应式流是一种带非阻塞背压的异步流处理标准,该标准在《响应式宣言》中定义。目前已有多种实现,例如 RxJava 或 Akka-Streams。

2. 响应式 API 核心概念

构建一个 Flow 需要三个核心抽象组件,通过组合它们可实现异步处理逻辑:

  • 发布者(Publisher):每个 Flow 都需要处理由 Publisher 实例发布的事件。它只有一个方法 subscribe(),订阅者(Subscriber)通过调用此方法接收事件。
  • 订阅者(Subscriber):消息接收方需实现 Subscriber 接口。这通常是流处理的终点,因为它的实例不会继续传递消息。可将其视为“数据槽(Sink)”,需重写四个方法:
    • onSubscribe()
    • onNext()
    • onError()
    • onComplete()
  • 处理器(Processor):若需转换消息并传递给下一个 Subscriber,需实现 Processor 接口。它既是 Subscriber(接收消息)又是 Publisher(处理后转发消息)。

3. 消息发布与消费

假设要构建一个简单的 FlowPublisher 发布消息,Subscriber 逐条消费消息。

3.1 实现订阅者

创建 EndSubscriber 类,实现 Subscriber 接口并重写方法:

public class EndSubscriber<T> implements Subscriber<T> {
    private Subscription subscription;
    public List<T> consumedElements = new LinkedList<>();

    @Override
    public void onSubscribe(Subscription subscription) {
        this.subscription = subscription;
        subscription.request(1); // 请求第一条消息
    }
}

consumedElements 列表用于存储消费的消息(测试用)。

3.2 消息处理逻辑

实现剩余方法,核心是 onNext()——当 Publisher 发布新消息时调用:

@Override
public void onNext(T item) {
    System.out.println("Got : " + item);
    consumedElements.add(item);
    subscription.request(1); // 继续请求下一条
}

⚠️ 关键点:在 onSubscribe() 和处理消息后,必须调用 subscription.request() 表示准备好接收更多消息。

最后实现错误处理和完成回调:

@Override
public void onError(Throwable t) {
    t.printStackTrace();
}

@Override
public void onComplete() {
    System.out.println("Done");
}

3.3 测试消费流程

使用 SubmissionPublisher(实现 Publisher 的并发工具类)发布消息:

@Test
public void whenSubscribeToIt_thenShouldConsumeAll() 
  throws InterruptedException {
 
    // given
    SubmissionPublisher<String> publisher = new SubmissionPublisher<>();
    EndSubscriber<String> subscriber = new EndSubscriber<>();
    publisher.subscribe(subscriber);
    List<String> items = List.of("1", "x", "2", "x", "3", "x");

    // when
    assertThat(publisher.getNumberOfSubscribers()).isEqualTo(1);
    items.forEach(publisher::submit); // 发布消息
    publisher.close(); // 触发 onComplete()

    // then
     await().atMost(1000, TimeUnit.MILLISECONDS)
       .until(
         () -> assertThat(subscriber.consumedElements)
         .containsExactlyElementsOf(items)
     );
}

输出结果:

Got : 1
Got : x
Got : 2
Got : x
Got : 3
Got : x
Done

4. 消息转换处理

若需在发布者和订阅者之间添加转换逻辑,需实现 Processor

4.1 实现处理器

创建 TransformProcessor 类,同时继承 SubmissionPublisher(作为发布者)并实现 Processor(作为订阅者):

public class TransformProcessor<T, R> 
  extends SubmissionPublisher<R> 
  implements Flow.Processor<T, R> {

    private Function<T, R> function;
    private Flow.Subscription subscription;

    public TransformProcessor(Function<T, R> function) {
        super();
        this.function = function;
    }

    @Override
    public void onSubscribe(Flow.Subscription subscription) {
        this.subscription = subscription;
        subscription.request(1);
    }

    @Override
    public void onNext(T item) {
        submit(function.apply(item)); // 转换并发布
        subscription.request(1);
    }

    @Override
    public void onError(Throwable t) {
        t.printStackTrace();
    }

    @Override
    public void onComplete() {
        close(); // 关闭发布者
    }
}

4.2 测试转换流程

将字符串转换为整数:

@Test
public void whenSubscribeAndTransformElements_thenShouldConsumeAll()
  throws InterruptedException {
 
    // given
    SubmissionPublisher<String> publisher = new SubmissionPublisher<>();
    TransformProcessor<String, Integer> transformProcessor 
      = new TransformProcessor<>(Integer::parseInt);
    EndSubscriber<Integer> subscriber = new EndSubscriber<>();
    List<String> items = List.of("1", "2", "3");
    List<Integer> expectedResult = List.of(1, 2, 3);

    // when
    publisher.subscribe(transformProcessor);
    transformProcessor.subscribe(subscriber);
    items.forEach(publisher::submit);
    publisher.close();

    // then
     await().atMost(1000, TimeUnit.MILLISECONDS)
       .until(() -> 
         assertThat(subscriber.consumedElements)
         .containsExactlyElementsOf(expectedResult)
     );
}

关键点:调用 publisher.close() 会触发 TransformProcessoronComplete(),进而关闭后续处理器。

5. 通过 Subscription 控制消息需求

利用 Subscriptionrequest() 方法可精确控制消费数量,实现背压机制。

5.1 限制消费数量

修改 EndSubscriber,通过构造参数指定最大消费数:

public class EndSubscriber<T> implements Subscriber<T> {
 
    private AtomicInteger howMuchMessagesConsume;
    private Subscription subscription;
    public List<T> consumedElements = new LinkedList<>();

    public EndSubscriber(Integer howMuchMessagesConsume) {
        this.howMuchMessagesConsume 
          = new AtomicInteger(howMuchMessagesConsume);
    }

    @Override
    public void onSubscribe(Subscription subscription) {
        this.subscription = subscription;
        subscription.request(1);
    }

    @Override
    public void onNext(T item) {
        howMuchMessagesConsume.decrementAndGet();
        System.out.println("Got : " + item);
        consumedElements.add(item);
        if (howMuchMessagesConsume.get() > 0) {
            subscription.request(1); // 仅在未达上限时继续请求
        }
    }
    //...
}

5.2 测试背压控制

只消费第一条消息:

@Test
public void whenRequestForOnlyOneElement_thenShouldConsumeOne()
  throws InterruptedException {
 
    // given
    SubmissionPublisher<String> publisher = new SubmissionPublisher<>();
    EndSubscriber<String> subscriber = new EndSubscriber<>(1); // 限制消费1条
    publisher.subscribe(subscriber);
    List<String> items = List.of("1", "x", "2", "x", "3", "x");
    List<String> expected = List.of("1");

    // when
    assertThat(publisher.getNumberOfSubscribers()).isEqualTo(1);
    items.forEach(publisher::submit);
    publisher.close();

    // then
    await().atMost(1000, TimeUnit.MILLISECONDS)
      .until(() -> 
        assertThat(subscriber.consumedElements)
       .containsExactlyElementsOf(expected)
    );
}

尽管发布者发送了 6 条消息,订阅者仅消费第一条——通过 request(1) 实现了精确的流量控制。

6. 总结

本文介绍了 Java 9 响应式流的核心概念:

  • ✅ 构建由 PublisherSubscriber 组成的处理流
  • ✅ 使用 Processor 实现消息转换
  • ✅ 通过 Subscription 控制消费需求(背压机制)

所有示例代码可在 GitHub 项目 中获取,这是一个 Maven 项目,可直接导入运行。


原始标题:Java 9 Reactive Streams