1. 引言
在这篇文章中,我们将初步了解 RSocket,以及它如何实现客户端与服务器之间的通信。
2. 什么是 RSocket?
RSocket 是一种用于分布式应用中的二进制、点对点通信协议。从这个角度来看,它是 HTTP 等协议的一种替代方案。
本文不会对 RSocket 和其他协议进行全面比较,而是聚焦于 RSocket 的一个核心特性:它的交互模型。
RSocket 提供了四种交互模型。接下来,我们将通过示例逐一介绍每种模型。
3. Maven 依赖
在我们的示例中,RSocket 只需要两个直接依赖:
<dependency>
<groupId>io.rsocket</groupId>
<artifactId>rsocket-core</artifactId>
<version>0.11.13</version>
</dependency>
<dependency>
<groupId>io.rsocket</groupId>
<artifactId>rsocket-transport-netty</artifactId>
<version>0.11.13</version>
</dependency>
rsocket-core 和 rsocket-transport-netty 这两个依赖都可以在 Maven Central 找到。
⚠️ **注意:RSocket 库大量使用了 reactive streams**。本文中会频繁使用 Flux 和 Mono 类,因此建议读者对响应式编程有一定了解。
4. 服务端设置
首先,我们创建一个 Server 类:
public class Server {
private final Disposable server;
public Server() {
this.server = RSocketFactory.receive()
.acceptor((setupPayload, reactiveSocket) -> Mono.just(new RSocketImpl()))
.transport(TcpServerTransport.create("localhost", TCP_PORT))
.start()
.subscribe();
}
public void dispose() {
this.server.dispose();
}
private class RSocketImpl extends AbstractRSocket {}
}
我们使用 RSocketFactory 来监听一个 TCP 端口。通过传入自定义的 RSocketImpl 来处理客户端请求。接下来,我们会在 RSocketImpl 中逐步添加方法。
启动服务端只需实例化即可:
Server server = new Server();
✅ 一个服务端实例可以处理多个连接,因此一个实例就足以支持我们所有的示例。
当我们完成操作后,调用 dispose 方法可以停止服务端并释放 TCP 端口。
5. 交互模型
5.1. 请求/响应(Request/Response)
RSocket 提供了请求/响应模型:每个请求都会收到一个响应。
在这个模型中,我们创建一个简单的服务,它会将客户端发送的消息原样返回。
我们先在 RSocketImpl 中添加一个方法:
@Override
public Mono<Payload> requestResponse(Payload payload) {
try {
return Mono.just(payload); // 将 payload 原样返回
} catch (Exception x) {
return Mono.error(x);
}
}
该方法 requestResponse 每个请求返回一个响应结果,从返回类型 Mono
Payload 是包含消息内容和元数据的类,在所有交互模型中都会使用。Payload 内容是二进制的,但提供了方便的字符串处理方法。
接下来,我们创建客户端:
public class ReqResClient {
private final RSocket socket;
public ReqResClient() {
this.socket = RSocketFactory.connect()
.transport(TcpClientTransport.create("localhost", TCP_PORT))
.start()
.block();
}
public String callBlocking(String string) {
return socket
.requestResponse(DefaultPayload.create(string))
.map(Payload::getDataUtf8)
.block();
}
public void dispose() {
this.socket.dispose();
}
}
客户端通过 RSocketFactory.connect() 方法与服务端建立连接。使用 requestResponse 方法发送 payload 到服务端。
payload 包含传入的字符串。当 Mono
最后,运行集成测试来验证请求/响应:
@Test
public void whenSendingAString_thenRevceiveTheSameString() {
ReqResClient client = new ReqResClient();
String string = "Hello RSocket";
assertEquals(string, client.callBlocking(string));
client.dispose();
}
5.2. 单向发送(Fire-and-Forget)
在 Fire-and-Forget 模型中,客户端不会收到服务端的响应。
本例中,客户端会每隔 50ms 发送一次模拟数据给服务端,服务端负责发布这些数据。
我们在 RSocketImpl 中添加处理方法:
@Override
public Mono<Void> fireAndForget(Payload payload) {
try {
dataPublisher.publish(payload); // 转发 payload
return Mono.empty();
} catch (Exception x) {
return Mono.error(x);
}
}
这个方法与请求/响应类似,但 **返回的是 Mono
dataPublisher 是一个 org.reactivestreams.Publisher 实例,用于将 payload 传递给订阅者。我们将在后续 request/stream 示例中使用它。
接着,我们创建客户端:
public class FireNForgetClient {
private final RSocket socket;
private final List<Float> data;
public FireNForgetClient() {
this.socket = RSocketFactory.connect()
.transport(TcpClientTransport.create("localhost", TCP_PORT))
.start()
.block();
}
/** 每隔 50ms 发送一个 float 类型的二进制数据 */
public void sendData() {
data = Collections.unmodifiableList(generateData());
Flux.interval(Duration.ofMillis(50))
.take(data.size())
.map(this::createFloatPayload)
.flatMap(socket::fireAndForget)
.blockLast();
}
// ...
}
连接方式与之前一致。
sendData() 方法使用 Flux 发送多个消息。对每条消息,我们调用 socket::fireAndForget。
⚠️ 必须订阅每个 Mono
flatMap 用于传递响应,blockLast 作为订阅者触发执行。
我们将在下一节测试 fire-and-forget 模型时,通过 request/stream 客户端接收这些数据。
5.3. 请求/流(Request/Stream)
在请求/流模型中,一个请求可以收到多个响应。我们可以基于 fire-and-forget 示例来演示这一点。
我们在 RSocketImpl 中添加新方法:
@Override
public Flux<Payload> requestStream(Payload payload) {
return Flux.from(dataPublisher);
}
该方法 requestStream 返回一个 Flux
客户端代码如下:
public class ReqStreamClient {
private final RSocket socket;
public ReqStreamClient() {
this.socket = RSocketFactory.connect()
.transport(TcpClientTransport.create("localhost", TCP_PORT))
.start()
.block();
}
public Flux<Float> getDataStream() {
return socket
.requestStream(DefaultPayload.create(DATA_STREAM_NAME))
.map(Payload::getData)
.map(buf -> buf.getFloat())
.onErrorReturn(null);
}
public void dispose() {
this.socket.dispose();
}
}
连接方式不变。
在 getDataStream() 中,我们使用 socket.requestStream() 接收服务端返回的 Flux
测试代码如下,验证 fire-and-forget 到 request/stream 的完整流程:
@Test
public void whenSendingStream_thenReceiveTheSameStream() {
FireNForgetClient fnfClient = new FireNForgetClient();
ReqStreamClient streamClient = new ReqStreamClient();
List<Float> data = fnfClient.getData();
List<Float> dataReceived = new ArrayList<>();
Disposable subscription = streamClient.getDataStream()
.index()
.subscribe(
tuple -> {
assertEquals("Wrong value", data.get(tuple.getT1().intValue()), tuple.getT2());
dataReceived.add(tuple.getT2());
},
err -> LOG.error(err.getMessage())
);
fnfClient.sendData();
// ... dispose client & subscription
assertEquals("Wrong data count received", data.size(), dataReceived.size());
}
5.4. 通道(Channel)
通道模型支持双向通信,消息流可以在两个方向上异步流动。
我们通过一个简单的游戏模拟来测试。在这个游戏中,通道两端的玩家会随机发送消息给对方,对方则做出响应。
首先,在服务端添加处理方法:
@Override
public Flux<Payload> requestChannel(Publisher<Payload> payloads) {
Flux.from(payloads)
.subscribe(gameController::processPayload);
return Flux.from(gameController);
}
该方法 requestChannel 同时处理输入和输出的 Payload 流。输入参数是一个来自客户端的 payload 流,每个 payload 会传给 gameController::processPayload 处理。
作为响应,我们返回一个新的 Flux 流给客户端,该流来自 gameController,它本身也是一个 Publisher。
GameController 类大致如下:
public class GameController implements Publisher<Payload> {
@Override
public void subscribe(Subscriber<? super Payload> subscriber) {
// 随机间隔发送 payload 消息给订阅者
}
public void processPayload(Payload payload) {
// 响应对方玩家的消息
}
}
当 GameController 被订阅时,它会开始发送消息。
客户端代码如下:
public class ChannelClient {
private final RSocket socket;
private final GameController gameController;
public ChannelClient() {
this.socket = RSocketFactory.connect()
.transport(TcpClientTransport.create("localhost", TCP_PORT))
.start()
.block();
this.gameController = new GameController("Client Player");
}
public void playGame() {
socket.requestChannel(Flux.from(gameController))
.doOnNext(gameController::processPayload)
.blockLast();
}
public void dispose() {
this.socket.dispose();
}
}
客户端和服务端类似,创建自己的 GameController 实例。
使用 socket.requestChannel() 发送 payload 流给服务端,服务端也会返回自己的 payload 流。
接收到的 payload 会传给 gameController::processPayload 处理。
在游戏模拟中,客户端和服务端是镜像的,双方都在发送和接收 payload 流。
这些流是独立运行的,无需同步。
最后,运行测试:
@Test
public void whenRunningChannelGame_thenLogTheResults() {
ChannelClient client = new ChannelClient();
client.playGame();
client.dispose();
}
6. 结论
本文介绍了 RSocket 提供的四种交互模型。建议访问 RSocket 官网 获取更深入的内容,特别是 FAQ 和 Motivations 文档,它们提供了很好的背景知识。