1. 概述

Server-Sent-Events(简称SSE,中文翻译为服务器推送事件)是一种HTTP标准,用于向客户端推送实时数据,例如chatGPT中流式消息就是使用的SSE协议。与 WebSocket 不同的是,SSE 是基于 HTTP 协议实现的,并且只支持服务器到客户端的单向通信。

本教程中,我们将学习如何使用 Spring 分别实现基于SSE 的Server和Client端。

2. 方法一,使用Webflux实现SSE

Spring 4.2 开始就已经支持SSE,从Spring 5开始我们可以使用WebFlux 更优雅的实现SSE协议。

2.1. 通过Flux实现SSE接口

下面我们创建一个SSE的接口,每隔一秒向客户端输出当前时间。我们可以利用Reactor库提供的Flux类实现。

需要遵守W3C规范,将MIME类型设为text/event-stream

@GetMapping(path = "/stream-flux", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<String> streamFlux() {
    return Flux.interval(Duration.ofSeconds(1))
      .map(sequence -> "Flux - " + LocalTime.now().toString());
}

有关FluxReactor Core的更多信息,请参考这篇文章

2.2. 使用 ServerSentEvent

上面例子很简单,如果想要自定义事件。我们就需要使用到 ServerSentEvent:

@GetMapping("/stream-sse")
public Flux<ServerSentEvent<String>> streamEvents() {
    return Flux.interval(Duration.ofSeconds(1))
      .map(sequence -> ServerSentEvent.<String> builder()
        .id(String.valueOf(sequence))
          .event("periodic-event")
          .data("SSE - " + LocalTime.now().toString())
          .build());
}

我们可以看到,使用ServerSentEvent有以下几点优势:

  1. 我们可以直接操作事件元数据
  2. 可以省略 text/event-stream 类型声明。

本例中,我们指定了事件ID、事件名称以及具体的数据内容。

此外,我们还可以添加comments属性和retry值,用于指定重连时间,以便在尝试发送事件时使用。

2.3. 发送SSE请求

要发送SSE请求,消费事件流,可以使用 WebClient 实现。

使用 WebClient 前,我们需要引入 webflux 依赖:

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
public void consumeServerSentEvent() {
    WebClient client = WebClient.create("http://localhost:8080/sse-server");
    ParameterizedTypeReference<ServerSentEvent<String>> type
     = new ParameterizedTypeReference<ServerSentEvent<String>>() {};

    Flux<ServerSentEvent<String>> eventStream = client.get()
      .uri("/stream-sse")
      .retrieve()
      .bodyToFlux(type);

    eventStream.subscribe(
      content -> logger.info("Time: {} - event: name[{}], id [{}], content[{}] ",
        LocalTime.now(), content.event(), content.id(), content.data()),
      error -> logger.error("Error receiving SSE: {}", error),
      () -> logger.info("Completed!!!"));
}

subscribe方法允许我们在成功接收到事件时、遇到错误时以及流完成时指定操作。

在示例中,我们使用了retrieve方法,这是一种获取响应体的简单直接方式。

如果收到4xx或5xx响应,该方法会自动抛出WebClientResponseException,除非我们添加onStatus语句来处理这些情况。

另一方面,我们也可以使用exchange方法,它提供了对ClientResponse的访问,即使响应失败也不会抛出异常。

如果我们不需要事件元数据,可以跳过ServerSentEvent包装。

3. 方法二、使用Spring MVC实现SSE接口

前面我们提到Spring 4.2起就开始支持SSE规范,当时引入了SseEmitter类。下面是一个简单示例:

@GetMapping("/stream-sse-mvc")
public SseEmitter streamSseMvc() {
    SseEmitter emitter = new SseEmitter();
    ExecutorService sseMvcExecutor = Executors.newSingleThreadExecutor();
    sseMvcExecutor.execute(() -> {
        try {
            for (int i = 0; true; i++) {
                SseEventBuilder event = SseEmitter.event()
                  .data("SSE MVC - " + LocalTime.now().toString())
                  .id(String.valueOf(i))
                  .name("sse event - mvc");
                emitter.send(event);
                Thread.sleep(1000);
            }
        } catch (Exception ex) {
            emitter.completeWithError(ex);
        }
    });
    return emitter;
}

想要进一步学习Spring MVC中SSE的应用示例,请阅读这篇文章

4. 深入理解服务器推送事件

了解了如何实现SSE接口后,让我们深入了解一下其背后的原理。

SSE是一种被大多数浏览器采纳的规范,允许任何时间双向推送事件流。

“事件” 是以纯文本格式通过持续的 HTTP 响应传输的,包含如下字段:

  • event: 事件类型,默认为 "message"。
  • data: 数据内容
  • id: 事件的唯一标识符
  • retry: 重连时间间隔,告知客户端在断开连接时应等待多少毫秒后重试连接。

规范并未限制数据负载格式,我们可以使用简单的String或复杂的JSON或XML结构。

最后一点需要考虑的是,SSE流式与WebSocket通信的区别。

虽然WebSocket提供服务器和客户端之间的全双工(双向)通信,而SSE则使用单向通信。

另外,WebSocket不是一个HTTP协议,与SSE相反,它不提供标准的错误处理机制。

5. 总结

本文介绍了SSE流式的主要概念,并提供了简单示例,这些示例源码可以在我们的Github仓库中找到:https://github.com/eugenp/tutorials/tree/master/spring-reactive-modules/spring-reactive-2。