1. 引言

在这篇文章中,我们将初步了解 RSocket,以及它如何实现客户端与服务器之间的通信。

2. 什么是 RSocket?

RSocket 是一种用于分布式应用中的二进制、点对点通信协议。从这个角度来看,它是 HTTP 等协议的一种替代方案。

本文不会对 RSocket 和其他协议进行全面比较,而是聚焦于 RSocket 的一个核心特性:它的交互模型

RSocket 提供了四种交互模型。接下来,我们将通过示例逐一介绍每种模型。

3. Maven 依赖

在我们的示例中,RSocket 只需要两个直接依赖:

<dependency>
    <groupId>io.rsocket</groupId>
    <artifactId>rsocket-core</artifactId>
    <version>0.11.13</version>
</dependency>
<dependency>
    <groupId>io.rsocket</groupId>
    <artifactId>rsocket-transport-netty</artifactId>
    <version>0.11.13</version>
</dependency>

rsocket-corersocket-transport-netty 这两个依赖都可以在 Maven Central 找到。

⚠️ **注意:RSocket 库大量使用了 reactive streams**。本文中会频繁使用 FluxMono 类,因此建议读者对响应式编程有一定了解。

4. 服务端设置

首先,我们创建一个 Server 类:

public class Server {
    private final Disposable server;

    public Server() {
        this.server = RSocketFactory.receive()
          .acceptor((setupPayload, reactiveSocket) -> Mono.just(new RSocketImpl()))
          .transport(TcpServerTransport.create("localhost", TCP_PORT))
          .start()
          .subscribe();
    }

    public void dispose() {
        this.server.dispose();
    }

    private class RSocketImpl extends AbstractRSocket {}
}

我们使用 RSocketFactory 来监听一个 TCP 端口。通过传入自定义的 RSocketImpl 来处理客户端请求。接下来,我们会在 RSocketImpl 中逐步添加方法。

启动服务端只需实例化即可:

Server server = new Server();

一个服务端实例可以处理多个连接,因此一个实例就足以支持我们所有的示例。

当我们完成操作后,调用 dispose 方法可以停止服务端并释放 TCP 端口。

5. 交互模型

5.1. 请求/响应(Request/Response)

RSocket 提供了请求/响应模型:每个请求都会收到一个响应

在这个模型中,我们创建一个简单的服务,它会将客户端发送的消息原样返回。

我们先在 RSocketImpl 中添加一个方法:

@Override
public Mono<Payload> requestResponse(Payload payload) {
    try {
        return Mono.just(payload); // 将 payload 原样返回
    } catch (Exception x) {
        return Mono.error(x);
    }
}

该方法 requestResponse 每个请求返回一个响应结果,从返回类型 Mono 可以看出。

Payload 是包含消息内容和元数据的类,在所有交互模型中都会使用。Payload 内容是二进制的,但提供了方便的字符串处理方法。

接下来,我们创建客户端:

public class ReqResClient {

    private final RSocket socket;

    public ReqResClient() {
        this.socket = RSocketFactory.connect()
          .transport(TcpClientTransport.create("localhost", TCP_PORT))
          .start()
          .block();
    }

    public String callBlocking(String string) {
        return socket
          .requestResponse(DefaultPayload.create(string))
          .map(Payload::getDataUtf8)
          .block();
    }

    public void dispose() {
        this.socket.dispose();
    }
}

客户端通过 RSocketFactory.connect() 方法与服务端建立连接。使用 requestResponse 方法发送 payload 到服务端

payload 包含传入的字符串。Mono 返回后,我们可以使用 getDataUtf8() 获取响应中的字符串内容

最后,运行集成测试来验证请求/响应:

@Test
public void whenSendingAString_thenRevceiveTheSameString() {
    ReqResClient client = new ReqResClient();
    String string = "Hello RSocket";

    assertEquals(string, client.callBlocking(string));

    client.dispose();
}

5.2. 单向发送(Fire-and-Forget)

在 Fire-and-Forget 模型中,客户端不会收到服务端的响应

本例中,客户端会每隔 50ms 发送一次模拟数据给服务端,服务端负责发布这些数据。

我们在 RSocketImpl 中添加处理方法:

@Override
public Mono<Void> fireAndForget(Payload payload) {
    try {
        dataPublisher.publish(payload); // 转发 payload
        return Mono.empty();
    } catch (Exception x) {
        return Mono.error(x);
    }
}

这个方法与请求/响应类似,但 **返回的是 Mono 而不是 *Mono***。

dataPublisher 是一个 org.reactivestreams.Publisher 实例,用于将 payload 传递给订阅者。我们将在后续 request/stream 示例中使用它。

接着,我们创建客户端:

public class FireNForgetClient {
    private final RSocket socket;
    private final List<Float> data;

    public FireNForgetClient() {
        this.socket = RSocketFactory.connect()
          .transport(TcpClientTransport.create("localhost", TCP_PORT))
          .start()
          .block();
    }

    /** 每隔 50ms 发送一个 float 类型的二进制数据 */
    public void sendData() {
        data = Collections.unmodifiableList(generateData());
        Flux.interval(Duration.ofMillis(50))
          .take(data.size())
          .map(this::createFloatPayload)
          .flatMap(socket::fireAndForget)
          .blockLast();
    }

    // ... 
}

连接方式与之前一致。

sendData() 方法使用 Flux 发送多个消息。对每条消息,我们调用 socket::fireAndForget

⚠️ 必须订阅每个 Mono 响应,否则 socket::fireAndForget 不会执行

flatMap 用于传递响应,blockLast 作为订阅者触发执行。

我们将在下一节测试 fire-and-forget 模型时,通过 request/stream 客户端接收这些数据。

5.3. 请求/流(Request/Stream)

在请求/流模型中,一个请求可以收到多个响应。我们可以基于 fire-and-forget 示例来演示这一点。

我们在 RSocketImpl 中添加新方法:

@Override
public Flux<Payload> requestStream(Payload payload) {
    return Flux.from(dataPublisher);
}

该方法 requestStream 返回一个 Flux。正如前文所述,fireAndForget 方法会将数据发布到 dataPublisher。现在我们使用它来创建一个流,实现异步数据传输。

客户端代码如下:

public class ReqStreamClient {

    private final RSocket socket;

    public ReqStreamClient() {
        this.socket = RSocketFactory.connect()
          .transport(TcpClientTransport.create("localhost", TCP_PORT))
          .start()
          .block();
    }

    public Flux<Float> getDataStream() {
        return socket
          .requestStream(DefaultPayload.create(DATA_STREAM_NAME))
          .map(Payload::getData)
          .map(buf -> buf.getFloat())
          .onErrorReturn(null);
    }

    public void dispose() {
        this.socket.dispose();
    }
}

连接方式不变。

getDataStream() 中,我们使用 socket.requestStream() 接收服务端返回的 Flux。然后提取二进制数据中的 Float 值,返回给调用者。

测试代码如下,验证 fire-and-forget 到 request/stream 的完整流程:

@Test
public void whenSendingStream_thenReceiveTheSameStream() {
    FireNForgetClient fnfClient = new FireNForgetClient(); 
    ReqStreamClient streamClient = new ReqStreamClient();

    List<Float> data = fnfClient.getData();
    List<Float> dataReceived = new ArrayList<>();

    Disposable subscription = streamClient.getDataStream()
      .index()
      .subscribe(
        tuple -> {
            assertEquals("Wrong value", data.get(tuple.getT1().intValue()), tuple.getT2());
            dataReceived.add(tuple.getT2());
        },
        err -> LOG.error(err.getMessage())
      );

    fnfClient.sendData();

    // ... dispose client & subscription

    assertEquals("Wrong data count received", data.size(), dataReceived.size());
}

5.4. 通道(Channel)

通道模型支持双向通信,消息流可以在两个方向上异步流动。

我们通过一个简单的游戏模拟来测试。在这个游戏中,通道两端的玩家会随机发送消息给对方,对方则做出响应。

首先,在服务端添加处理方法:

@Override
public Flux<Payload> requestChannel(Publisher<Payload> payloads) {
    Flux.from(payloads)
      .subscribe(gameController::processPayload);
    return Flux.from(gameController);
}

该方法 requestChannel 同时处理输入和输出的 Payload。输入参数是一个来自客户端的 payload 流,每个 payload 会传给 gameController::processPayload 处理。

作为响应,我们返回一个新的 Flux 流给客户端,该流来自 gameController,它本身也是一个 Publisher

GameController 类大致如下:

public class GameController implements Publisher<Payload> {
    
    @Override
    public void subscribe(Subscriber<? super Payload> subscriber) {
        // 随机间隔发送 payload 消息给订阅者
    }

    public void processPayload(Payload payload) {
        // 响应对方玩家的消息
    }
}

GameController 被订阅时,它会开始发送消息。

客户端代码如下:

public class ChannelClient {

    private final RSocket socket;
    private final GameController gameController;

    public ChannelClient() {
        this.socket = RSocketFactory.connect()
          .transport(TcpClientTransport.create("localhost", TCP_PORT))
          .start()
          .block();

        this.gameController = new GameController("Client Player");
    }

    public void playGame() {
        socket.requestChannel(Flux.from(gameController))
          .doOnNext(gameController::processPayload)
          .blockLast();
    }

    public void dispose() {
        this.socket.dispose();
    }
}

客户端和服务端类似,创建自己的 GameController 实例。

使用 socket.requestChannel() 发送 payload 流给服务端,服务端也会返回自己的 payload 流。

接收到的 payload 会传给 gameController::processPayload 处理。

在游戏模拟中,客户端和服务端是镜像的,双方都在发送和接收 payload 流

这些流是独立运行的,无需同步。

最后,运行测试:

@Test
public void whenRunningChannelGame_thenLogTheResults() {
    ChannelClient client = new ChannelClient();
    client.playGame();
    client.dispose();
}

6. 结论

本文介绍了 RSocket 提供的四种交互模型。建议访问 RSocket 官网 获取更深入的内容,特别是 FAQMotivations 文档,它们提供了很好的背景知识。


原始标题:Introduction to RSocket | Baeldung