1. 引言
Ratpack是基于Netty引擎构建的框架,能快速构建HTTP应用。我们之前的文章已经介绍了其基础用法,本文将重点展示如何利用其流式API实现反应式应用。
2. 反应式流快速回顾
进入实战前,先快速回顾反应式应用的核心特征(根据原始定义):
- 响应式(Responsive)
- 弹性(Resilient)
- 可伸缩(Elastic)
- 消息驱动(Message Driven)
反应式流如何帮助实现这些特性?这里的"消息驱动"不一定指消息中间件,核心是异步请求处理和非阻塞背压支持。
Ratpack的反应式支持基于JVM的Reactive Streams API标准实现,因此可与Project Reactor、RxJava等框架互操作。
3. 使用Ratpack的Streams类
Ratpack的*Streams类提供多种工具方法创建Publisher*实例,用于构建数据处理管道。
从publish()
方法开始,它能将任何Iterable
转换为Publisher
:
Publisher<String> pub = Streams.publish(Arrays.asList("hello", "hello again"));
LoggingSubscriber<String> sub = new LoggingSubscriber<String>();
pub.subscribe(sub);
sub.block();
LoggingSubscriber
是测试用的Subscriber
实现,会记录每个发布对象。其block()
方法会阻塞调用者直到发布完成或出错。运行结果:
onSubscribe: sub=7311908
onNext: sub=7311908, value=hello
onNext: sub=7311908, value=hello again
onComplete: sub=7311908
另一个实用方法是yield()
,通过Function
参数生成下一个对象:
@Test
public void whenYield_thenSuccess() {
Publisher<String> pub = Streams.yield((t) -> {
return t.getRequestNum() < 5 ? "hello" : null;
});
LoggingSubscriber<String> sub = new LoggingSubscriber<String>();
pub.subscribe(sub);
sub.block();
assertEquals(5, sub.getReceived());
}
YieldRequest
的getRequestNum()
提供已发布对象计数,可用于终止流(返回null
)。创建周期性事件发布器:
@Test
public void whenPeriodic_thenSuccess() {
ScheduledExecutorService executor = Executors.newScheduledThreadPool(1);
Publisher<String> pub = Streams.periodically(executor, Duration.ofSeconds(1), (t) -> {
return t < 5 ? String.format("hello %d",t): null;
});
LoggingSubscriber<String> sub = new LoggingSubscriber<String>();
pub.subscribe(sub);
sub.block();
assertEquals(5, sub.getReceived());
}
返回的发布器使用ScheduledExecutorService
周期调用生产函数,直到返回null
。生产函数接收已发布对象计数用于终止流。
4. 使用TransformablePublisher
细看Streams
方法会发现它们大多返回TransformablePublisher。此接口扩展了Publisher
,提供多种实用方法(类似Project Reactor的Flux
/Mono
),便于构建复杂处理管道。
示例:用map
将整数序列转为字符串:
@Test
public void whenMap_thenSuccess() throws Exception {
TransformablePublisher<String> pub = Streams.yield( t -> {
return t.getRequestNum() < 5 ? t.getRequestNum() : null;
})
.map(v -> String.format("item %d", v));
ExecResult<List<String>> result = ExecHarness.yieldSingle((c) -> pub.toList());
assertTrue("should succeed", result.isSuccess());
assertEquals("should have 5 items",5,result.getValue().size());
}
实际执行在测试工具类ExecHarness管理的线程池中进行。因yieldSingle()
需要*Promise*,我们用toList()
适配——此方法收集所有结果到List
。
注意:文档警告此方法可能耗尽内存!建议仅用于单元测试。
除map()
外,TransformablePublisher
还提供:
-
filter()
:按Predicate
过滤上游对象 -
take()
:仅取前n个对象 -
wiretap()
:添加观测点检查流经数据 -
reduce()
:将上游对象归约为单值 -
transform()
:注入常规Publisher
5. 使用buffer()处理非规范Publisher
某些场景需处理无视请求数量、过度发送数据的Publisher。Ratpack的buffer()
方法可将多余数据暂存内存直到被消费。
创建非规范Publisher
(忽略请求数,始终多发送5个):
private class NonCompliantPublisher implements Publisher<Integer> {
@Override
public void subscribe(Subscriber<? super Integer> subscriber) {
log.info("subscribe");
subscriber.onSubscribe(new NonCompliantSubscription(subscriber));
}
private class NonCompliantSubscription implements Subscription {
private Subscriber<? super Integer> subscriber;
private int recurseLevel = 0;
public NonCompliantSubscription(Subscriber<? super Integer> subscriber) {
this.subscriber = subscriber;
}
@Override
public void request(long n) {
log.info("request: n={}", n);
if ( recurseLevel > 0 ) {
return;
}
recurseLevel++;
for (int i = 0 ; i < (n + 5) ; i ++ ) {
subscriber.onNext(i);
}
subscriber.onComplete();
}
@Override
public void cancel() {
}
}
}
先用LoggingSubscriber
测试,用take()
限制只取1项:
@Test
public void whenNonCompliantPublisherWithoutBuffer_thenSuccess() throws Exception {
TransformablePublisher<Integer> pub = Streams.transformable(new NonCompliantPublisher())
.wiretap(new LoggingAction(""))
.take(1);
LoggingSubscriber<Integer> sub = new LoggingSubscriber<>();
pub.subscribe(sub);
sub.block();
}
运行发现即使收到cancel()
,非规范发布器仍持续发送数据:
RatpackStreamsUnitTest - : event=StreamEvent[DataEvent{subscriptionId=0, data=0}]
LoggingSubscriber - onNext: sub=583189145, value=0
RatpackStreamsUnitTest - : event=StreamEvent[RequestEvent{requestAmount=1, subscriptionId=0}]
NonCompliantPublisher - request: n=1
RatpackStreamsUnitTest - : event=StreamEvent[CancelEvent{subscriptionId=0}]
LoggingSubscriber - onComplete: sub=583189145
RatpackStreamsUnitTest - : event=StreamEvent[DataEvent{subscriptionId=0, data=1}]
... more expurious data event
RatpackStreamsUnitTest - : event=StreamEvent[CompletionEvent{subscriptionId=0}]
LoggingSubscriber - onComplete: sub=583189145
添加buffer()
并增加日志观察效果:
@Test
public void whenNonCompliantPublisherWithBuffer_thenSuccess() throws Exception {
TransformablePublisher<Integer> pub = Streams.transformable(new NonCompliantPublisher())
.wiretap(new LoggingAction("before buffer"))
.buffer()
.wiretap(new LoggingAction("after buffer"))
.take(1);
LoggingSubscriber<Integer> sub = new LoggingSubscriber<>();
pub.subscribe(sub);
sub.block();
}
日志显示非规范发布器发送了所有数据,但下游仍按需逐项发送:
LoggingSubscriber - onSubscribe: sub=675852144
RatpackStreamsUnitTest - after buffer: event=StreamEvent[RequestEvent{requestAmount=1, subscriptionId=0}]
NonCompliantPublisher - subscribe
RatpackStreamsUnitTest - before buffer: event=StreamEvent[RequestEvent{requestAmount=1, subscriptionId=0}]
NonCompliantPublisher - request: n=1
RatpackStreamsUnitTest - before buffer: event=StreamEvent[DataEvent{subscriptionId=0, data=0}]
... more data events
RatpackStreamsUnitTest - before buffer: event=StreamEvent[CompletionEvent{subscriptionId=0}]
RatpackStreamsUnitTest - after buffer: event=StreamEvent[DataEvent{subscriptionId=0, data=0}]
LoggingSubscriber - onNext: sub=675852144, value=0
RatpackStreamsUnitTest - after buffer: event=StreamEvent[RequestEvent{requestAmount=1, subscriptionId=0}]
RatpackStreamsUnitTest - after buffer: event=StreamEvent[CancelEvent{subscriptionId=0}]
RatpackStreamsUnitTest - before buffer: event=StreamEvent[CancelEvent{subscriptionId=0}]
LoggingSubscriber - onComplete: sub=67585214
6. 使用*batch()*优化慢速订阅者
下游订阅者请求数据量过小(如LoggingSubscriber
每次只请求1项)会降低吞吐量。实际应用中这会导致大量上下文切换,影响性能。batch()
方法允许上游使用更高效的请求大小,同时下游保持小请求量。
先看无batch
的情况:
@Test
public void whenCompliantPublisherWithoutBatch_thenSuccess() throws Exception {
TransformablePublisher<Integer> pub = Streams.transformable(new CompliantPublisher(10))
.wiretap(new LoggingAction(""));
LoggingSubscriber<Integer> sub = new LoggingSubscriber<>();
pub.subscribe(sub);
sub.block();
}
CompliantPublisher
是测试发布器,生成0到n-1的整数。日志显示逐项发送:
CompliantPublisher - subscribe
LoggingSubscriber - onSubscribe: sub=-779393331
RatpackStreamsUnitTest - : event=StreamEvent[RequestEvent{requestAmount=1, subscriptionId=0}]
CompliantPublisher - request: requested=1, available=10
RatpackStreamsUnitTest - : event=StreamEvent[DataEvent{subscriptionId=0, data=0}]
LoggingSubscriber - onNext: sub=-779393331, value=0
... more data events omitted
CompliantPublisher - request: requested=1, available=1
RatpackStreamsUnitTest - : event=StreamEvent[CompletionEvent{subscriptionId=0}]
LoggingSubscriber - onComplete: sub=-779393331
添加batch()
使上游每次发送5项:
@Test
public void whenCompliantPublisherWithBatch_thenSuccess() throws Exception {
TransformablePublisher<Integer> pub = Streams.transformable(new CompliantPublisher(10))
.wiretap(new LoggingAction("before batch"))
.batch(5, Action.noop())
.wiretap(new LoggingAction("after batch"));
LoggingSubscriber<Integer> sub = new LoggingSubscriber<>();
pub.subscribe(sub);
sub.block();
}
batch()
接收两个参数:每次请求的项数和处理丢弃项的Action
(如错误或取消时)。日志显示上游现在每次收到5项请求:
LoggingSubscriber - onSubscribe: sub=-1936924690
RatpackStreamsUnitTest - after batch: event=StreamEvent[RequestEvent{requestAmount=1, subscriptionId=0}]
CompliantPublisher - subscribe
RatpackStreamsUnitTest - before batch: event=StreamEvent[RequestEvent{requestAmount=5, subscriptionId=0}]
CompliantPublisher - request: requested=5, available=10
RatpackStreamsUnitTest - before batch: event=StreamEvent[DataEvent{subscriptionId=0, data=0}]
... first batch data events omitted
RatpackStreamsUnitTest - before batch: event=StreamEvent[RequestEvent{requestAmount=5, subscriptionId=0}]
CompliantPublisher - request: requested=5, available=6
RatpackStreamsUnitTest - before batch: event=StreamEvent[DataEvent{subscriptionId=0, data=5}]
... second batch data events omitted
RatpackStreamsUnitTest - before batch: event=StreamEvent[RequestEvent{requestAmount=5, subscriptionId=0}]
CompliantPublisher - request: requested=5, available=1
RatpackStreamsUnitTest - before batch: event=StreamEvent[CompletionEvent{subscriptionId=0}]
RatpackStreamsUnitTest - after buffer: event=StreamEvent[DataEvent{subscriptionId=0, data=0}]
LoggingSubscriber - onNext: sub=-1936924690, value=0
RatpackStreamsUnitTest - after buffer: event=StreamEvent[RequestEvent{requestAmount=1, subscriptionId=0}]
RatpackStreamsUnitTest - after buffer: event=StreamEvent[DataEvent{subscriptionId=0, data=1}]
... downstream data events omitted
LoggingSubscriber - onComplete: sub=-1936924690
注意:因单线程执行,batch()
会持续缓冲直到收到onComplete()
信号。
7. 在Web应用中使用流
Ratpack支持将反应式流与异步Web框架结合。
7.1 接收数据流
通过处理器的Context
获取的Request
对象提供getBodyStream()
,返回ByteBuf
的TransformablePublisher
。可基于此构建处理管道:
@Bean
public Action<Chain> uploadFile() {
return chain -> chain.post("upload", ctx -> {
TransformablePublisher<? extends ByteBuf> pub = ctx.getRequest().getBodyStream();
pub.subscribe(new Subscriber<ByteBuf>() {
private Subscription sub;
@Override
public void onSubscribe(Subscription sub) {
this.sub = sub;
sub.request(1);
}
@Override
public void onNext(ByteBuf t) {
try {
... do something useful with received data
sub.request(1);
}
finally {
// 切记释放!
t.release();
}
}
@Override
public void onError(Throwable t) {
ctx.getResponse().status(500);
}
@Override
public void onComplete() {
ctx.getResponse().status(202);
}
});
});
}
实现订阅者时注意两点:
- 必须调用
ByteBuf.release()
,否则会内存泄漏 - 异步处理只能用Ratpack原语(如
Promise
、Blocking
等)
7.2 发送数据流
最直接的方式是Response.sendStream()
,接收ByteBuf
发布器并发送数据,自动应用背压防溢出:
@Bean
public Action<Chain> download() {
return chain -> chain.get("download", ctx -> {
ctx.getResponse().sendStream(new RandomBytesPublisher(1024,512));
});
}
缺点:不会自动设置任何头部(包括Content-Length
),可能影响客户端:
$ curl -v --output data.bin http://localhost:5050/download
... request messages omitted
< HTTP/1.1 200 OK
< transfer-encoding: chunked
... download progress messages omitted
推荐方式:用处理器的Context.render()
传递ResponseChunks
对象。此时响应使用分块传输编码。创建ResponseChunks的简单方法:
@Bean
public Action<Chain> downloadChunks() {
return chain -> chain.get("downloadChunks", ctx -> {
ctx.render(ResponseChunks.bufferChunks("application/octetstream",
new RandomBytesPublisher(1024,512)));
});
}
现在响应包含content-type
头部:
$ curl -v --output data.bin http://localhost:5050/downloadChunks
... request messages omitted
< HTTP/1.1 200 OK
< transfer-encoding: chunked
< content-type: application/octetstream
<
... progress messages omitted
7.3 使用服务器推送事件(SSE)
SSE支持也通过render()
实现,使用ServerSentEvents将Producer
的项适配为带元数据的Event
对象:
@Bean
public Action<Chain> quotes() {
ServerSentEvents sse = ServerSentEvents.serverSentEvents(quotesService.newTicker(), (evt) -> {
evt
.id(Long.toString(idSeq.incrementAndGet()))
.event("quote")
.data( q -> q.toString());
});
return chain -> chain.get("quotes", ctx -> ctx.render(sse));
}
QuotesService
是示例服务,定期生成随机报价。第二个参数是事件处理函数,添加id
、事件类型和载荷。用curl
测试:
$ curl -v http://localhost:5050/quotes
... request messages omitted
< HTTP/1.1 200 OK
< content-type: text/event-stream;charset=UTF-8
< transfer-encoding: chunked
... other response headers omitted
id: 10
event: quote
data: Quote [ts=2021-10-11T01:20:52.081Z, symbol=ORCL, value=53.0]
... more quotes
7.4 广播WebSocket数据
可用Websockets.websocketBroadcast()将任意Publisher数据管道输出到WebSocket:
@Bean
public Action<Chain> quotesWS() {
Publisher<String> pub = Streams.transformable(quotesService.newTicker())
.map(Quote::toString);
return chain -> chain.get("quotes-ws", ctx -> WebSockets.websocketBroadcast(ctx, pub));
}
复用之前的QuotesService
作为事件源。用curl
模拟WebSocket客户端:
$ curl --include -v \
--no-buffer \
--header "Connection: Upgrade" \
--header "Upgrade: websocket" \
--header "Sec-WebSocket-Key: SGVsbG8sIHdvcmxkIQ==" \
--header "Sec-WebSocket-Version: 13" \
http://localhost:5050/quotes-ws
... request messages omitted
< HTTP/1.1 101 Switching Protocols
HTTP/1.1 101 Switching Protocols
< upgrade: websocket
upgrade: websocket
< connection: upgrade
connection: upgrade
< sec-websocket-accept: qGEgH3En71di5rrssAZTmtRTyFk=
sec-websocket-accept: qGEgH3En71di5rrssAZTmtRTyFk=
<
<Quote [ts=2021-10-11T01:39:42.915Z, symbol=ORCL, value=63.0]
... more quotes omitted
8. 总结
本文探讨了Ratpack对反应式流的支持及其在不同场景的应用。完整示例代码见GitHub。