1. 概述
Server-Sent-Events(简称SSE,中文翻译为服务器推送事件)是一种HTTP标准,用于向客户端推送实时数据,例如chatGPT中流式消息就是使用的SSE协议。与 WebSocket 不同的是,SSE 是基于 HTTP 协议实现的,并且只支持服务器到客户端的单向通信。
本教程中,我们将学习如何使用 Spring 分别实现基于SSE 的Server和Client端。
2. 方法一,使用Webflux实现SSE
Spring 4.2 开始就已经支持SSE,从Spring 5开始我们可以使用WebFlux 更优雅的实现SSE协议。
2.1. 通过Flux实现SSE接口
下面我们创建一个SSE的接口,每隔一秒向客户端输出当前时间。我们可以利用Reactor库提供的Flux
类实现。
需要遵守W3C规范,将MIME类型设为text/event-stream
:
@GetMapping(path = "/stream-flux", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<String> streamFlux() {
return Flux.interval(Duration.ofSeconds(1))
.map(sequence -> "Flux - " + LocalTime.now().toString());
}
有关Flux
和Reactor Core
的更多信息,请参考这篇文章。
2.2. 使用 ServerSentEvent
上面例子很简单,如果想要自定义事件。我们就需要使用到 ServerSentEvent
:
@GetMapping("/stream-sse")
public Flux<ServerSentEvent<String>> streamEvents() {
return Flux.interval(Duration.ofSeconds(1))
.map(sequence -> ServerSentEvent.<String> builder()
.id(String.valueOf(sequence))
.event("periodic-event")
.data("SSE - " + LocalTime.now().toString())
.build());
}
我们可以看到,使用ServerSentEvent
有以下几点优势:
- 我们可以直接操作事件元数据
- 可以省略
text/event-stream
类型声明。
本例中,我们指定了事件ID、事件名称以及具体的数据内容。
此外,我们还可以添加comments
属性和retry
值,用于指定重连时间,以便在尝试发送事件时使用。
2.3. 发送SSE请求
要发送SSE请求,消费事件流,可以使用 WebClient
实现。
使用 WebClient
前,我们需要引入 webflux
依赖:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
public void consumeServerSentEvent() {
WebClient client = WebClient.create("http://localhost:8080/sse-server");
ParameterizedTypeReference<ServerSentEvent<String>> type
= new ParameterizedTypeReference<ServerSentEvent<String>>() {};
Flux<ServerSentEvent<String>> eventStream = client.get()
.uri("/stream-sse")
.retrieve()
.bodyToFlux(type);
eventStream.subscribe(
content -> logger.info("Time: {} - event: name[{}], id [{}], content[{}] ",
LocalTime.now(), content.event(), content.id(), content.data()),
error -> logger.error("Error receiving SSE: {}", error),
() -> logger.info("Completed!!!"));
}
subscribe
方法允许我们在成功接收到事件时、遇到错误时以及流完成时指定操作。
在示例中,我们使用了retrieve
方法,这是一种获取响应体的简单直接方式。
如果收到4xx或5xx响应,该方法会自动抛出WebClientResponseException
,除非我们添加onStatus
语句来处理这些情况。
另一方面,我们也可以使用exchange
方法,它提供了对ClientResponse
的访问,即使响应失败也不会抛出异常。
如果我们不需要事件元数据,可以跳过ServerSentEvent
包装。
3. 方法二、使用Spring MVC实现SSE接口
前面我们提到Spring 4.2起就开始支持SSE规范,当时引入了SseEmitter
类。下面是一个简单示例:
@GetMapping("/stream-sse-mvc")
public SseEmitter streamSseMvc() {
SseEmitter emitter = new SseEmitter();
ExecutorService sseMvcExecutor = Executors.newSingleThreadExecutor();
sseMvcExecutor.execute(() -> {
try {
for (int i = 0; true; i++) {
SseEventBuilder event = SseEmitter.event()
.data("SSE MVC - " + LocalTime.now().toString())
.id(String.valueOf(i))
.name("sse event - mvc");
emitter.send(event);
Thread.sleep(1000);
}
} catch (Exception ex) {
emitter.completeWithError(ex);
}
});
return emitter;
}
想要进一步学习Spring MVC中SSE的应用示例,请阅读这篇文章。
4. 深入理解服务器推送事件
了解了如何实现SSE接口后,让我们深入了解一下其背后的原理。
SSE是一种被大多数浏览器采纳的规范,允许任何时间双向推送事件流。
“事件” 是以纯文本格式通过持续的 HTTP 响应传输的,包含如下字段:
- event: 事件类型,默认为 "message"。
- data: 数据内容
- id: 事件的唯一标识符
- retry: 重连时间间隔,告知客户端在断开连接时应等待多少毫秒后重试连接。
规范并未限制数据负载格式,我们可以使用简单的String
或复杂的JSON或XML结构。
最后一点需要考虑的是,SSE流式与WebSocket通信的区别。
虽然WebSocket提供服务器和客户端之间的全双工(双向)通信,而SSE则使用单向通信。
另外,WebSocket不是一个HTTP协议,与SSE相反,它不提供标准的错误处理机制。
5. 总结
本文介绍了SSE流式的主要概念,并提供了简单示例,这些示例源码可以在我们的Github仓库中找到:https://github.com/eugenp/tutorials/tree/master/spring-reactive-modules/spring-reactive-2。