对服务器发送事件(SSE)的简单研究和应用
了解契机
最近在做B2C项目的时候碰到一个场景: 在订单某一流程下, 需要顾客对商家出示二维码, 然后商家扫码以调用后端提供的接口修改订单状态以推进服务流程的进行。
现在的问题是: 修改订单的接口是由商家调用的, 那么商家端可以由接口的 resp
很容易地知道订单状态改变是否成功, 而顾客端仅仅是出示了一个二维码, 它该如何从服务端知道订单的状态是否被改变成功, 可以进行服务流程的下一步了呢?
Spring 框架下服务端主动推送消息的技术
据我在网上调研, 得到的方式有以下几种:
WebSocket
: 支持双向通信, 全双工、低延迟SSE(Server-Sent Events)
: 基于HTTP
, 由服务端单向向客户端发送信息短轮询
: 客户端基于一定时间间隔反复调用接口长轮询
: 客户端发出请求,服务器“挂起”请求直到有数据再响应
短轮询和长轮询肯定不考虑, 下面主要考虑 WebSocket
与 SSE
的优缺点:
WebSocket
是支持双向通信的, 实时性强、适合频繁交互 (如聊天、协作、游戏), 但实现起来稍微有点复杂, 因为需要管理连接(如心跳管理), 且对于用户鉴权部分需要重新编写(WebSocket
由HTTP
升级而来, 因而需要对请求头等部分重新处理)SSE
只支持服务端向客户端的单向推送消息, 但实现较为简单(Servlet
原生支持) 且其本身是基于HTTP
的连接, 可以在原有的鉴权基础上直接使用
SSE 的使用
spring-boot-starter-web
依赖, Servlet
对 SSE
直接支持
编写
引入依赖:
1 |
|
具体编写
编写测试接口需要注意以下几个方面:
- 返回的类必须是
SseEmitter
, 不可以再用其他包装类包装返回 Thread.sleep(10000)
表示返回数据的事件间隔- 在
SseEmitter.event().data()
中的是需要返回的对象1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19@RestController
public class ServletSseController {
@GetMapping("/sse-blocking")
public SseEmitter ServletSse() {
SseEmitter emitter = new SseEmitter();
new Thread(() -> {
try {
while (!Thread.currentThread().isInterrupted()) {
Thread.sleep(10000);
emitter.send(SseEmitter.event().data("Data"));
}
} catch (Exception e) {
emitter.completeWithError(e);
}
}).start();
return emitter;
}
}
测试接口返回
至此, 一个 SSE
的接口就完成了接下来是对性能方面的探究
接口性能
Servlet
下的 SSE
接口是 阻塞式 的, 这就意味着在高并发下它很容易触发 OOM, 因为 Servlet
本身的同步模式[1]。 直到3.0版本开始才引入了异步机制!
在实际开发过程中, 向上面编写的每次直接 new 一个线程显然是不合适的, 按照一般方式引入线程池, 设置主要参数: CorePoolSize
, MaxPoolSize
, QueueCapacity
, RejectedExecutionHandler
等, 由此引发的问题是: 在这个线程池提供的线程数量固定, 而接口又是阻塞式的情况下, 并发量貌似低的有点离谱…
使用 ThreadPoolExecutor.CallerRunsPolicy()
(提交任务的线程(也就是调用 execute() 的线程)自己来执行这个任务) 的拒绝策略会让被拒绝的任务由容器 tomcat
的工作线程执行…这是不能接受的! 由此, 设想使用异步的框架 WebFlux
WebFlux 实现 SSE 接口
Spring WebFlux
是 Spring Framework 5.0
中引入的新的响应式web框架。与Spring MVC不同,它不需要Servlet API,是完全异步且非阻塞的,并且通过Reactor项目实现了Reactive Streams规范[2]。使用也很简单
依赖:
1 |
|
编写接口:
1 |
|
接口测试:
接口对比
基于servlet的sse接口
1 |
|
基于webflux的sse接口
1 |
|
测试接口
1 |
|
在 application.properties
中限制Tomcat进程数量:
1 |
|
阻塞式sse的测试类
1 |
|
结果:
异步非阻塞式SSE测试:
1 |
|
踩坑记录
- 由于原本项目中使用的的是Tomcat9作为容器, 是支持同时在使用servlet的同时使用webflux框架的. servlet3.1及以上才支持异步, 并不是指
servlet api
的版本要在3.1以上, 而是容器Tomcat
支持3.1及以上就行 - 当
webflux
请求进入Tomcat
时, 实际还是使用的Servlet容器
入口, 只是不占用tomcat的工作线程。
Tomcat传统上使用基于线程池的请求处理模型,而不是事件驱动的异步I/O模型。当请求标记为异步时(例如async-supported=true),Tomcat会将其分配到 一个独立的线程池 中处理。
sequenceDiagram
participant Client
participant Tomcat
participant ServletHttpHandlerAdapter
participant WebFlux (Reactor)
participant ServletOutputStream
Client->>Tomcat: 发送HTTP请求 (SSE)
Tomcat->>ServletHttpHandlerAdapter: 调用 service()
ServletHttpHandlerAdapter->>ServletHttpHandlerAdapter: request.startAsync()
ServletHttpHandlerAdapter->>WebFlux (Reactor): httpHandler.handle(request, response)
WebFlux (Reactor)->>WebFlux (Reactor): 构建响应式流 (Flux)
WebFlux (Reactor)->>ServletOutputStream: 注册 WriteListener (onWritePossible)
ServletHttpHandlerAdapter->>Tomcat: 返回 (线程释放)
Note over Tomcat: 请求线程释放,不阻塞
loop 每次输出可写
WebFlux (Reactor)->>ServletOutputStream: 触发 onWritePossible()
ServletOutputStream->>Client: 写入响应数据
end
WebFlux (Reactor)->>ServletOutputStream: Flux 完成,关闭响应流
ServletOutputStream->>Tomcat: asyncContext.complete()
Tomcat->>Client: 响应完成