1. 概述
本文将深入探讨 Java 9 的响应式流(Reactive Streams)。简单来说,我们将学习如何使用 Flow
类——它封装了构建响应式流处理逻辑的核心组件。
响应式流是一种带非阻塞背压的异步流处理标准,该标准在《响应式宣言》中定义。目前已有多种实现,例如 RxJava 或 Akka-Streams。
2. 响应式 API 核心概念
构建一个 Flow
需要三个核心抽象组件,通过组合它们可实现异步处理逻辑:
- 发布者(Publisher):每个
Flow
都需要处理由Publisher
实例发布的事件。它只有一个方法subscribe()
,订阅者(Subscriber)通过调用此方法接收事件。 - 订阅者(Subscriber):消息接收方需实现
Subscriber
接口。这通常是流处理的终点,因为它的实例不会继续传递消息。可将其视为“数据槽(Sink)”,需重写四个方法:onSubscribe()
onNext()
onError()
onComplete()
- 处理器(Processor):若需转换消息并传递给下一个
Subscriber
,需实现Processor
接口。它既是Subscriber
(接收消息)又是Publisher
(处理后转发消息)。
3. 消息发布与消费
假设要构建一个简单的 Flow
:Publisher
发布消息,Subscriber
逐条消费消息。
3.1 实现订阅者
创建 EndSubscriber
类,实现 Subscriber
接口并重写方法:
public class EndSubscriber<T> implements Subscriber<T> {
private Subscription subscription;
public List<T> consumedElements = new LinkedList<>();
@Override
public void onSubscribe(Subscription subscription) {
this.subscription = subscription;
subscription.request(1); // 请求第一条消息
}
}
consumedElements
列表用于存储消费的消息(测试用)。
3.2 消息处理逻辑
实现剩余方法,核心是 onNext()
——当 Publisher
发布新消息时调用:
@Override
public void onNext(T item) {
System.out.println("Got : " + item);
consumedElements.add(item);
subscription.request(1); // 继续请求下一条
}
⚠️ 关键点:在 onSubscribe()
和处理消息后,必须调用 subscription.request()
表示准备好接收更多消息。
最后实现错误处理和完成回调:
@Override
public void onError(Throwable t) {
t.printStackTrace();
}
@Override
public void onComplete() {
System.out.println("Done");
}
3.3 测试消费流程
使用 SubmissionPublisher
(实现 Publisher
的并发工具类)发布消息:
@Test
public void whenSubscribeToIt_thenShouldConsumeAll()
throws InterruptedException {
// given
SubmissionPublisher<String> publisher = new SubmissionPublisher<>();
EndSubscriber<String> subscriber = new EndSubscriber<>();
publisher.subscribe(subscriber);
List<String> items = List.of("1", "x", "2", "x", "3", "x");
// when
assertThat(publisher.getNumberOfSubscribers()).isEqualTo(1);
items.forEach(publisher::submit); // 发布消息
publisher.close(); // 触发 onComplete()
// then
await().atMost(1000, TimeUnit.MILLISECONDS)
.until(
() -> assertThat(subscriber.consumedElements)
.containsExactlyElementsOf(items)
);
}
输出结果:
Got : 1
Got : x
Got : 2
Got : x
Got : 3
Got : x
Done
4. 消息转换处理
若需在发布者和订阅者之间添加转换逻辑,需实现 Processor
。
4.1 实现处理器
创建 TransformProcessor
类,同时继承 SubmissionPublisher
(作为发布者)并实现 Processor
(作为订阅者):
public class TransformProcessor<T, R>
extends SubmissionPublisher<R>
implements Flow.Processor<T, R> {
private Function<T, R> function;
private Flow.Subscription subscription;
public TransformProcessor(Function<T, R> function) {
super();
this.function = function;
}
@Override
public void onSubscribe(Flow.Subscription subscription) {
this.subscription = subscription;
subscription.request(1);
}
@Override
public void onNext(T item) {
submit(function.apply(item)); // 转换并发布
subscription.request(1);
}
@Override
public void onError(Throwable t) {
t.printStackTrace();
}
@Override
public void onComplete() {
close(); // 关闭发布者
}
}
4.2 测试转换流程
将字符串转换为整数:
@Test
public void whenSubscribeAndTransformElements_thenShouldConsumeAll()
throws InterruptedException {
// given
SubmissionPublisher<String> publisher = new SubmissionPublisher<>();
TransformProcessor<String, Integer> transformProcessor
= new TransformProcessor<>(Integer::parseInt);
EndSubscriber<Integer> subscriber = new EndSubscriber<>();
List<String> items = List.of("1", "2", "3");
List<Integer> expectedResult = List.of(1, 2, 3);
// when
publisher.subscribe(transformProcessor);
transformProcessor.subscribe(subscriber);
items.forEach(publisher::submit);
publisher.close();
// then
await().atMost(1000, TimeUnit.MILLISECONDS)
.until(() ->
assertThat(subscriber.consumedElements)
.containsExactlyElementsOf(expectedResult)
);
}
✅ 关键点:调用 publisher.close()
会触发 TransformProcessor
的 onComplete()
,进而关闭后续处理器。
5. 通过 Subscription 控制消息需求
利用 Subscription
的 request()
方法可精确控制消费数量,实现背压机制。
5.1 限制消费数量
修改 EndSubscriber
,通过构造参数指定最大消费数:
public class EndSubscriber<T> implements Subscriber<T> {
private AtomicInteger howMuchMessagesConsume;
private Subscription subscription;
public List<T> consumedElements = new LinkedList<>();
public EndSubscriber(Integer howMuchMessagesConsume) {
this.howMuchMessagesConsume
= new AtomicInteger(howMuchMessagesConsume);
}
@Override
public void onSubscribe(Subscription subscription) {
this.subscription = subscription;
subscription.request(1);
}
@Override
public void onNext(T item) {
howMuchMessagesConsume.decrementAndGet();
System.out.println("Got : " + item);
consumedElements.add(item);
if (howMuchMessagesConsume.get() > 0) {
subscription.request(1); // 仅在未达上限时继续请求
}
}
//...
}
5.2 测试背压控制
只消费第一条消息:
@Test
public void whenRequestForOnlyOneElement_thenShouldConsumeOne()
throws InterruptedException {
// given
SubmissionPublisher<String> publisher = new SubmissionPublisher<>();
EndSubscriber<String> subscriber = new EndSubscriber<>(1); // 限制消费1条
publisher.subscribe(subscriber);
List<String> items = List.of("1", "x", "2", "x", "3", "x");
List<String> expected = List.of("1");
// when
assertThat(publisher.getNumberOfSubscribers()).isEqualTo(1);
items.forEach(publisher::submit);
publisher.close();
// then
await().atMost(1000, TimeUnit.MILLISECONDS)
.until(() ->
assertThat(subscriber.consumedElements)
.containsExactlyElementsOf(expected)
);
}
尽管发布者发送了 6 条消息,订阅者仅消费第一条——通过 request(1)
实现了精确的流量控制。
6. 总结
本文介绍了 Java 9 响应式流的核心概念:
- ✅ 构建由
Publisher
和Subscriber
组成的处理流 - ✅ 使用
Processor
实现消息转换 - ✅ 通过
Subscription
控制消费需求(背压机制)
所有示例代码可在 GitHub 项目 中获取,这是一个 Maven 项目,可直接导入运行。