1. 概述

Server-Sent-Events简称SSE,中文翻译为服务器推送事件,是一种HTTP标准。传统上,浏览器必须向服务器发送请求才能接受新数据,借助 SSE 服务器可以随时向客户端推送新数据。

本教程,我们将学习如何使用 Spring 实现基于服务器推送事件的接口。

Spring 4.2版本就已经支持SSE,但从Spring 5开始,我们有了更符合语义和方便的方式来处理它

2. 使用Spring 6 Webflux实现SSE

2.1. 通过Flux处理流式事件

我们可以利用Reactor库提供的Flux类或其他可能的实现,来控制事件元数据。

Flux是对事件流的响应式表示——它的处理方式取决于指定的请求或响应媒体类型。

创建一个SSE流式端点时,我们需要遵循W3C规范,将MIME类型设为text/event-stream

@GetMapping(path = "/stream-flux", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<String> streamFlux() {
    return Flux.interval(Duration.ofSeconds(1))
      .map(sequence -> "Flux - " + LocalTime.now().toString());
}

interval方法会生成一个递增地输出long值的Flux。然后我们将这些值映射到我们期望的输出。

让我们启动应用并尝试访问该端点。

我们将观察浏览器如何响应服务器每秒推送的事件。有关FluxReactor Core的更多信息,请参考:/reactor-core。

2.2. 利用ServerSentEvent元素

现在,我们将我们的输出String包装成ServerSentEvent对象,看看这样做的好处:

@GetMapping("/stream-sse")
public Flux<ServerSentEvent<String>> streamEvents() {
    return Flux.interval(Duration.ofSeconds(1))
      .map(sequence -> ServerSentEvent.<String> builder()
        .id(String.valueOf(sequence))
          .event("periodic-event")
          .data("SSE - " + LocalTime.now().toString())
          .build());
}

我们可以看到,使用ServerSentEvent实体有以下几点优势:

  1. 我们可以处理事件元数据,这对于实际场景至关重要。
  2. 可以省略“text/event-stream”媒体类型声明。

在这个例子中,我们指定了一个id、事件名称以及最重要的事件实际数据。

此外,我们还可以添加comments属性和retry值,用于指定重连时间,以便在尝试发送事件时使用。

2.3. 通过WebClient消费服务器推送事件

现在,让我们使用WebClient来消费事件流:

public void consumeServerSentEvent() {
    WebClient client = WebClient.create("http://localhost:8080/sse-server");
    ParameterizedTypeReference<ServerSentEvent<String>> type
     = new ParameterizedTypeReference<ServerSentEvent<String>>() {};

    Flux<ServerSentEvent<String>> eventStream = client.get()
      .uri("/stream-sse")
      .retrieve()
      .bodyToFlux(type);

    eventStream.subscribe(
      content -> logger.info("Time: {} - event: name[{}], id [{}], content[{}] ",
        LocalTime.now(), content.event(), content.id(), content.data()),
      error -> logger.error("Error receiving SSE: {}", error),
      () -> logger.info("Completed!!!"));
}

subscribe方法允许我们在成功接收到事件时、遇到错误时以及流完成时指定操作。

在示例中,我们使用了retrieve方法,这是一种获取响应体的简单直接方式。

如果收到4xx或5xx响应,该方法会自动抛出WebClientResponseException,除非我们添加onStatus语句来处理这些情况。

另一方面,我们也可以使用exchange方法,它提供了对ClientResponse的访问,即使响应失败也不会抛出异常。

如果我们不需要事件元数据,可以跳过ServerSentEvent包装。

3. 在Spring MVC中实现SSE

如前所述,SSE规范自Spring 4.2起就得到了支持,当时引入了SseEmitter类。

简单来说,我们会定义一个ExecutorService,一个线程,SseEmitter将在其中工作,推送数据,并返回事件发射器实例,以此保持连接打开:

@GetMapping("/stream-sse-mvc")
public SseEmitter streamSseMvc() {
    SseEmitter emitter = new SseEmitter();
    ExecutorService sseMvcExecutor = Executors.newSingleThreadExecutor();
    sseMvcExecutor.execute(() -> {
        try {
            for (int i = 0; true; i++) {
                SseEventBuilder event = SseEmitter.event()
                  .data("SSE MVC - " + LocalTime.now().toString())
                  .id(String.valueOf(i))
                  .name("sse event - mvc");
                emitter.send(event);
                Thread.sleep(1000);
            }
        } catch (Exception ex) {
            emitter.completeWithError(ex);
        }
    });
    return emitter;
}

请确保根据您的应用场景选择合适的ExecutorService

有关Spring MVC中的SSE和更多示例,请阅读这篇文章

4. 深入理解服务器推送事件

了解了如何实现SSE接口后,让我们深入了解一下其背后的原理。

SSE是一种被大多数浏览器采纳的规范,允许任何时间双向推送事件流。

“事件”只是遵循规范定义的UTF-8编码文本数据的流。

这种格式由一系列键值对(如id、retry、data和event(表示名称))分隔行构成,支持注释。

规范并未限制数据负载格式,我们可以使用简单的String或复杂的JSON或XML结构。

最后一点需要考虑的是,SSE流式与WebSocket通信的区别。

虽然WebSocket提供服务器和客户端之间的全双工(双向)通信,而SSE则使用单向通信。

另外,WebSocket不是一个HTTP协议,与SSE相反,它不提供标准的错误处理机制。

5. 总结

总之,本文介绍了SSE流式的主要概念,无疑这是构建下一代系统的宝贵资源。现在我们对使用这个协议时发生的事情有了更深入的理解。

此外,我们还通过一些简单示例进行了补充,这些示例可以在我们的Github仓库中找到:https://github.com/eugenp/tutorials/tree/master/spring-reactive-modules/spring-reactive-2。