1 场景
AI对话项目使用SSE通信,如果用户刷新 / 关闭页面,后端不清理连接,就可能导致 资源泄露。
2. 解决方案
1. 非零超时+ 回调清理(必做)
1. 非零超时
SseEmitter
默认 0L
表示永不超时,最容易导致僵尸连接。
- 设置合理的超时时间(比如 30s~5min),避免连接永远挂在服务器上。
- 超时会触发
onTimeout
→ 你就能清理 session。
1 2
| SseEmitter emitter = new SseEmitter(TimeUnit.MINUTES.toMillis(5));
|
2. 回调清理
- 注册
onCompletion
、onTimeout
、onError
,确保一旦连接断开就清理资源。
- 这样即使异常关闭,也能释放
EventSource
。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34
| import com.fasterxml.jackson.databind.ObjectMapper; import lombok.NoArgsConstructor; import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j; import okhttp3.sse.EventSource; import org.jetbrains.annotations.NotNull; import org.springframework.stereotype.Component; import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
@Slf4j @Component @NoArgsConstructor public class ChatSSEEventSourceListener extends BaseSSEEventSourceListener {
private static final ChatSessionManager chatSessionManager = SpringUtils.getBean(ChatSessionManager.class);
public ChatSSEEventSourceListener(SseEmitter emitter, Long userId, Long sessionId) { super(emitter, userId, sessionId);
emitter.onCompletion(() -> { log.info("SSE 完成,释放资源,sessionId={}", sessionId); chatSessionManager.cancelSession(sessionId); }); emitter.onTimeout(() -> { log.warn("SSE 超时,释放资源,sessionId={}", sessionId); chatSessionManager.cancelSession(sessionId); }); emitter.onError((ex) -> { log.error("SSE 出错,释放资源,sessionId={}", sessionId, ex); chatSessionManager.cancelSession(sessionId); }); } }
|
3. 心跳机制
有些浏览器直接关闭页面时,TCP 连接不会立刻触发 onError
,所以需要主动检测。因为SSE 的设计初衷是 “服务端向客户端单向推送数据”,所以连接的生命周期由服务端主导,也就是需要后端去发心跳
后端:定时发心跳
1 2 3 4 5 6 7 8 9
| ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor(); scheduler.scheduleAtFixedRate(() -> { try { emitter.send(SseEmitter.event().comment("heartbeat")); } catch (IOException e) { log.warn("SSE 心跳失败,关闭 sessionId={}", sessionId); emitter.completeWithError(e); } }, 15, 15, TimeUnit.SECONDS);
|
前端:监听断开并自动重连
1 2 3 4 5 6 7 8 9 10
| function connectSSE() { const evtSource = new EventSource("/sse/chat"); evtSource.onopen = () => console.log("SSE connected"); evtSource.onerror = () => { console.log("SSE error, reconnecting..."); evtSource.close(); setTimeout(connectSSE, 2000); }; } connectSSE();
|
这样前端挂了,后端心跳失败,就能立即释放资源。
4. SessionManager 清理机制
你现在有个 ChatSessionManager
,里面存着 ConcurrentHashMap<Long, EventSource>
。
要避免泄露,可以:
- 在
cancelSession
里一定要 remove
。
- 增加一个 后台清理线程(比如每 5 分钟扫一遍),把长时间没活跃的连接干掉。
示例:
1 2 3 4 5 6 7 8 9 10
| @Scheduled(fixedDelay = 300000) public void cleanupStaleSessions() { sessions.forEach((sessionId, eventSource) -> { eventSource.cancel(); sessions.remove(sessionId); log.info("清理过期 SSE 连接 sessionId={}", sessionId); }); }
|
5. 如果并发量特别大 → 考虑 WebSocket
- SSE 的线程模型是 长轮询风格,高并发下对 Tomcat/Jetty 压力较大。
- WebSocket 本身就支持双向心跳,连接状态更可控。
- 如果用户量很大(10w+),可以考虑转成 WebSocket,或者放在 Nginx + SSE 反向代理层做断线管理。