一、SseEmitter是什么?
SseEmitter是Spring生态中对Server-Sent Events(SSE,服务器发送事件)的Java封装实现,主要用于Spring MVC/Spring Boot项目中快速实现服务器到客户端的单向实时消息推送,无需客户端反复发起轮询请求。它本质是对SSE协议的Java化封装,专门用于标准化服务器推送的消息格式,简化开发流程,属于Spring对W3C SSE标准的便捷实现,并非全新的通信协议。
注意:SSE是浏览器原生支持的标准协议,SseEmitter只是Spring对该协议的便捷实现,并非全新的通信协议。
二、完整工作流程
1. 客户端发起连接
浏览器通过原生EventSource API向服务器发送GET请求,请求头会携带Accept: text/event-stream、Cache-Control: no-cache等标识,表明这是一个SSE长连接请求。
2. 服务器初始化连接
Spring MVC接收到请求后,开发者创建SseEmitter实例(通常会手动设置超时时间,避免空闲连接占用资源),直接返回该实例。此时Servlet容器会启用异步请求处理,挂起当前HTTP连接,不会立即关闭请求。
3. 服务器主动推送数据
开发者通过SseEmitter.send(Object data)方法推送数据,Spring会自动将数据封装为符合SSE标准的事件格式,通过持久化的HTTP连接发送给客户端。
4. 连接终止
- 服务器可主动调用
SseEmitter.complete()正常关闭连接; - 若发生异常,可调用
SseEmitter.completeWithError(Throwable ex)终止连接并抛出异常; - 客户端可主动调用
EventSource.close()断开连接; - 超时或网络异常时,连接会自动断开,浏览器会自动重试建立连接。
三、底层原理
- 基于HTTP1.1长连接
SSE本质是复用HTTP协议的长连接,不需要额外的协议升级,服务器通过保持响应不结束来持续推送数据。 - Spring异步请求支持
当控制器返回SseEmitter时,Spring MVC不会立即提交响应,而是将请求挂起,释放容器线程,避免阻塞服务器资源。 - 标准化协议封装
SseEmitter会自动为响应添加强制头:Content-Type: text/event-stream:声明响应为SSE流格式Cache-Control: no-cache:禁用缓存,保证实时性Connection: keep-alive:保持连接不主动断开
- 消息格式约束
推送的消息必须遵循SSE协议规范:每条消息以data:开头,以\n\n作为结束标记,支持额外的事件名、ID等元数据。
四、与WebSocket的区别
两者都是实时通信技术,但适用场景和特性差异明显,对比如下:
| 维度 | SseEmitter(SSE) | WebSocket |
|---|---|---|
| 协议底层 | 基于标准HTTP 1.1长连接,复用HTTP端口 | 需通过HTTP握手升级为独立ws/wss协议,脱离HTTP原生能力 |
| 通信方向 | 单工:仅服务器可主动推送数据 | 全双工:客户端/服务器均可主动发送消息 |
| 数据类型 | 默认仅支持纯文本,二进制需Base64编码 | 原生支持文本、二进制(字节数组、Blob等),传输效率更高 |
| 自动重连 | 浏览器原生支持自动重试连接 | 需开发者手动监听close事件实现重连 |
| 兼容性 | 仅Edge/IE12+支持,现代浏览器全兼容 | IE10+及所有现代浏览器均支持,兼容性更好 |
| 并发连接限制 | HTTP/1.1下单域名最多6个长连接,HTTP/2无限制 | 无并发连接限制 |
| 适用场景 | 单向推送:系统通知、监控面板、实时日志 | 双向交互:即时聊天、直播弹幕、协同编辑 |
五、使用建议与快速示例
如果仅需要服务器单向主动推送数据,优先选择SseEmitter(开发更简单、通信开销更低);如果需要客户端和服务器双向交互,则必须使用WebSocket。
后端Spring Boot代码示例
@RestController
@RequestMapping("/sse")
public class SseController {
// 存储所有活跃的SSE连接,用于批量推送
private final Set<SseEmitter> emitters = ConcurrentHashMap.newKeySet();
@GetMapping("/push")
public SseEmitter connect() {
SseEmitter emitter = new SseEmitter(60_000L); // 设置60秒超时
emitters.add(emitter);
// 监听连接关闭,自动移除失效的emitter
emitter.onCompletion(() -> emitters.remove(emitter));
emitter.onError(e -> emitters.remove(emitter));
return emitter;
}
// 批量推送消息
public void broadcast(String message) {
emitters.forEach(emitter -> {
try {
emitter.send(message, MediaType.TEXT_PLAIN);
} catch (IOException e) {
emitter.completeWithError(e);
emitters.remove(emitter);
}
});
}
}
前端客户端代码示例
const eventSource = new EventSource("http://localhost:8080/sse/push");
eventSource.onmessage = (e) => {
console.log("收到服务器推送消息:", e.data);
};
eventSource.onerror = (err) => {
console.error("SSE连接断开,将自动重试");
};
评论区