1. 概述
Server-Sent-Events简称SSE,中文翻译为服务器推送事件,是一种HTTP标准。传统上,浏览器必须向服务器发送请求才能接受新数据,借助 SSE 服务器可以随时向客户端推送新数据。
本教程,我们将学习如何使用 Spring 实现基于服务器推送事件的接口。
Spring 4.2版本就已经支持SSE,但从Spring 5开始,我们有了更符合语义和方便的方式来处理它。
2. 使用Spring 6 Webflux实现SSE
2.1. 通过Flux处理流式事件
我们可以利用Reactor库提供的Flux
类或其他可能的实现,来控制事件元数据。
Flux
是对事件流的响应式表示——它的处理方式取决于指定的请求或响应媒体类型。
创建一个SSE流式端点时,我们需要遵循W3C规范,将MIME类型设为text/event-stream
:
@GetMapping(path = "/stream-flux", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<String> streamFlux() {
return Flux.interval(Duration.ofSeconds(1))
.map(sequence -> "Flux - " + LocalTime.now().toString());
}
interval
方法会生成一个递增地输出long
值的Flux
。然后我们将这些值映射到我们期望的输出。
让我们启动应用并尝试访问该端点。
我们将观察浏览器如何响应服务器每秒推送的事件。有关Flux
和Reactor Core
的更多信息,请参考:/reactor-core。
2.2. 利用ServerSentEvent
元素
现在,我们将我们的输出String
包装成ServerSentEvent
对象,看看这样做的好处:
@GetMapping("/stream-sse")
public Flux<ServerSentEvent<String>> streamEvents() {
return Flux.interval(Duration.ofSeconds(1))
.map(sequence -> ServerSentEvent.<String> builder()
.id(String.valueOf(sequence))
.event("periodic-event")
.data("SSE - " + LocalTime.now().toString())
.build());
}
我们可以看到,使用ServerSentEvent
实体有以下几点优势:
- 我们可以处理事件元数据,这对于实际场景至关重要。
- 可以省略“text/event-stream”媒体类型声明。
在这个例子中,我们指定了一个id
、事件名称以及最重要的事件实际数据。
此外,我们还可以添加comments
属性和retry
值,用于指定重连时间,以便在尝试发送事件时使用。
2.3. 通过WebClient消费服务器推送事件
现在,让我们使用WebClient
来消费事件流:
public void consumeServerSentEvent() {
WebClient client = WebClient.create("http://localhost:8080/sse-server");
ParameterizedTypeReference<ServerSentEvent<String>> type
= new ParameterizedTypeReference<ServerSentEvent<String>>() {};
Flux<ServerSentEvent<String>> eventStream = client.get()
.uri("/stream-sse")
.retrieve()
.bodyToFlux(type);
eventStream.subscribe(
content -> logger.info("Time: {} - event: name[{}], id [{}], content[{}] ",
LocalTime.now(), content.event(), content.id(), content.data()),
error -> logger.error("Error receiving SSE: {}", error),
() -> logger.info("Completed!!!"));
}
subscribe
方法允许我们在成功接收到事件时、遇到错误时以及流完成时指定操作。
在示例中,我们使用了retrieve
方法,这是一种获取响应体的简单直接方式。
如果收到4xx或5xx响应,该方法会自动抛出WebClientResponseException
,除非我们添加onStatus
语句来处理这些情况。
另一方面,我们也可以使用exchange
方法,它提供了对ClientResponse
的访问,即使响应失败也不会抛出异常。
如果我们不需要事件元数据,可以跳过ServerSentEvent
包装。
3. 在Spring MVC中实现SSE
如前所述,SSE规范自Spring 4.2起就得到了支持,当时引入了SseEmitter
类。
简单来说,我们会定义一个ExecutorService
,一个线程,SseEmitter
将在其中工作,推送数据,并返回事件发射器实例,以此保持连接打开:
@GetMapping("/stream-sse-mvc")
public SseEmitter streamSseMvc() {
SseEmitter emitter = new SseEmitter();
ExecutorService sseMvcExecutor = Executors.newSingleThreadExecutor();
sseMvcExecutor.execute(() -> {
try {
for (int i = 0; true; i++) {
SseEventBuilder event = SseEmitter.event()
.data("SSE MVC - " + LocalTime.now().toString())
.id(String.valueOf(i))
.name("sse event - mvc");
emitter.send(event);
Thread.sleep(1000);
}
} catch (Exception ex) {
emitter.completeWithError(ex);
}
});
return emitter;
}
请确保根据您的应用场景选择合适的ExecutorService
。
有关Spring MVC中的SSE和更多示例,请阅读这篇文章。
4. 深入理解服务器推送事件
了解了如何实现SSE接口后,让我们深入了解一下其背后的原理。
SSE是一种被大多数浏览器采纳的规范,允许任何时间双向推送事件流。
“事件”只是遵循规范定义的UTF-8编码文本数据的流。
这种格式由一系列键值对(如id、retry、data和event(表示名称))分隔行构成,支持注释。
规范并未限制数据负载格式,我们可以使用简单的String
或复杂的JSON或XML结构。
最后一点需要考虑的是,SSE流式与WebSocket通信的区别。
虽然WebSocket提供服务器和客户端之间的全双工(双向)通信,而SSE则使用单向通信。
另外,WebSocket不是一个HTTP协议,与SSE相反,它不提供标准的错误处理机制。
5. 总结
总之,本文介绍了SSE流式的主要概念,无疑这是构建下一代系统的宝贵资源。现在我们对使用这个协议时发生的事情有了更深入的理解。
此外,我们还通过一些简单示例进行了补充,这些示例可以在我们的Github仓库中找到:https://github.com/eugenp/tutorials/tree/master/spring-reactive-modules/spring-reactive-2。