SSE 研究
基本知识
Server-Sent Events, 是一种基于 HTTP 协议的服务器推送技术. 只允许从服务端到客户端的单向推送.
- 单向(服务端->客户端)
- 自动重连
- 基于 HTTP 协议
使用场景
- 实时通知系统
- 实时更新的动态数据
- 数据流展示
- 数据更新频繁
- 低延迟单向通信
- ChatGPT 的流式输出
简单代码
JS 通过 EventSource 对象与服务端建立连接
- new EventSource(url)
- onmessage
- onopen
- onerror
- close
const url = "xxxx";
// 1. 建立连接
const eventSource = new EventSource(url);
// 2. 消息事件
eventSource.addEventListener("message", this.handleMessage); // 还可以使用 onmessage 方法
// 3. 错误事件
eventSource.addEventListener("error", this.handleError); // 还可以使用 onerror 方法
// 4. 关闭 SSE 连接
eventSource.close();
@GetMapping(path = "/stream-flux", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<ServerSentEvent<MessageData>> streamFlux(@RequestParam("id") String id) {
return Flux.interval(Duration.ofSeconds(1))
.map(sequence ->
ServerSentEvent.<MessageData>builder()
.id(id) // id
.event("periodic-event") // 事件
.retry(Duration.ofSeconds(30)) // 重连时间
.data(new MessageData( // 自定义数据
UUID.randomUUID().toString(),
"heartbeat",
LocalDateTime.now(),
Map.of("clientId", UUID.randomUUID(),
"status", "connected"
)))
.build()
)
.take(1)
;
}
基本原理
响应头
服务器会设置三个响应头:
Context-Type: text/event-stream; charset=utf-8:响应的内容类型是事件流Cache-Control: no-cache:禁止缓存响应内容,确保客户端每次都能接收到最新的数据。Connection: keep-alive保持 HTTP 连接打开, 以便服务器可以持续向客户端推送数据。
SSE 报文
服务端响应报文:
: 注释\n
id: xxx\n
retry: 5000\n
event:message\n
data: xxxx\n
\n
- 每行结束都要有一个
\n :注释id: 消息 idretry: 设置断线重连等待的时间event: 设置事件名data: 数据- 空行表示消息结束
SSE 与 Websocket 对比
- ws 是双向的, 因此长连接情况下 ws 数据传输效率会比 SSE 高.
- 相对于 SSE 的 HTTP 协议, ws 协议的开销更大, 实现起来 也更复杂.
- ws 支持传输二进制, SSE 只支持 UTF-8 文本(二进制类格式可以通过 base64 编码成文本进行传输)
- SSE 内置自动重连机制, ws 需要手动实现重连和心跳机制.
自动重连机制
当 HTTP 长连接因网络波动, 服务器重启或者其他异常断开时, EventSource 会自动在一段固定的间隔(未设置 retry 时,默认是 3000ms)后重新发起连接请求。
重连等待时间: 可以通过服务端响应的事件流设置 retry 字段告知客户端下次重连等待的时间。 如 retry: 10000, 等待 10s 再重新连接.
断点续传:Last-Event-ID, 如果发生了重连, 浏览器会把最近接收到的 id 通过 Last-Event-ID 请求头传给服务端.
异常处理,EventSource.onerror 可以监听异常事件。
断线重连的几种情况
- 浏览器端实现断线重连
通过监听 EventSource.onerror 事件来实现断线重连机制。
const source = new EventSource(url);
// 在 onerror 中实现断线重连
source.onerror = function (event) {
setTimeout(() => {
source = new EventSource(url);
}, 1000);
};
- 服务端主动要求客户端重连
服务器端可以通过发送特定事件(如 reconnect )来要求客户端重新连接。
// 服务器端设置重连延迟
emitter.send(SseEmitter.event()
.reconnectTime(5000L) // 5秒后重连
.data("数据"));
上面代码对应的报文
retry: 5000\n
data: 数据\n
\n
// 客户端
eventSource.addEventListener("reconnect", function (e) {
console.log("收到重连指令:", e.data);
eventSource.close(); // 关闭当前连接
setTimeout(() => {
connect(); // 重新连接
}, 5000);
});
执行这段代 码后,客户端会先断开连接,然后在 5s 后重新连接。
- 异常情况导致断线
下面几种情况, 浏览器端在连接断开 5s 后会重新建立连接请求。
- 网络故障
- 服务器崩溃
- 连接超时
- 服务器主动断开(调用了
emitter.complete())
SSE 会无限次重连,直到成功或被手动关闭。
不会重连的情况
- 客户端手动调用 eventSource.close()
- 服务器返回 HTTP 204 No Content
- 服务器返回非 text/event-stream 的 Content-Type
- 服务区返回 HTTP 错误码(204, 401,403,404 等)
错误处理, 不同类型的错误需要使用不同的处理策略
- 暂时性错误(重连)
- 致命错误(需要向用户报告)
Authentication 问题
-
同源, 会直接携带 cookie
-
跨源, (处于同一主域名的情况下可以使用 withCredentials 来是跨源请求携带 cookie)
const eventSource = new EventSource("https://api.myapp.com/sse", {withCredentials:true}) -
使用 npm event-source-polyfill 包提供的 EventSourcePolyfil 来携带请求头
const headers = {
"Authorization": "..." // somewhat can update this token when it's refreshed
}
const eventSource = new EventSourcePolyfill("/sse", { headers })需要注意的是, EventSourcePolyfil 会将 Last-Event-ID 作为请求参数传递,而不是请求头
注意事项
如果使用 nginx 配置需要将 proxy_buffering 关闭
proxy_buffering off;
设置响应头 x-accel-buffering 也可以让 nginx 不缓冲后端服务器的响应
x-accel-buffering: no
安全问题
-
防止 XSS 攻击: 一般来说, SSE 接收到数据大多数情况下会显示到前端, 因此需要防范 XSS 攻击
-
验证连接请求: 检查 Referer 头或使用身份验证令牌.
-
限制连接数据, 避免资源耗尽
-
应用层面: 对每个连接计数
-
Nginx 层面: 限制每个 IP 的连接数
# /etc/nginx/nginx.conf 或站点配置文件
# limit_conn_zone 用于定义连接限制的存储区域
# $binary_remote_addr 使用客户端 IP地址的二进制格式作为 key
# sse_conn 共享内存区域的名称(这个是自定义的)
# 10m 分配 10MB 内存,大概可以存储 160,000 个 IP 地址的连接状态
limit_conn_zone $binary_remote_addr zone=sse_conn:10m;
# 请求速率限制
# rate=10r/s, 单 IP 每秒限制 10 个请求
limit_req_zone $binary_remote_addr zone=sse_req:10m rate=10r/s;
server {
listen 80;
server_name example.com;
location /events {
# 每个 IP 最多 10 个连接
# limit_conn 应用连接限制指令
# sse_conn 前面定义的共享内存区域
# 10 每个 IP 最大连接数
limit_conn sse_conn 10;
# 请求速率限制
# burst=20 令牌桶容量 20, 允许短时间内处理 20 个请求
limit_req zone=sse_req burst=20;
# SSE 特殊配置
proxy_pass http://localhost:3000;
proxy_set_header Connection '';
proxy_http_version 1.1;
chunked_transfer_encoding off;
proxy_buffering off;
proxy_cache off;
proxy_read_timeout 24h;
}
} -
系统层面
-
修改文件描述符 (默认是 1024)
# 临时将当前 Shell 会话的文件描述符限制到 65535
# -n 指定文件描述符(nofile)参数
ulimit -n 65535
# 永久修改 /etc/security/limits.conf
echo "* soft nofile 65535" | sudo tee -a /etc/security/limits.conf # 软限制
echo "* hard nofile 65535" | sudo tee -a /etc/security/limits.conf # 硬限制
# 修改系统级别限制
echo "fs.file-max = 2097152" | sudo tee -a /etc/sysctl. conf # 内核参数
cat /proc/sys/fs/file-max # 查看系统限制
cat /proc/sys/fs/file-nr # 查看当前使用情况
sudo sysctl -w fs.file-max=2097152 # 直接修改(临时)
sudo sysctl -p # 从文件加载(永久) -
优化 TCP 连接参数
# /etc/sysctl.conf
# 客户端连接请求 → [SYN队列] → [Accept队列] → accept()处理
net.core.somaxconn = 4096 # 设置listen() 系统调用的最大连接队列长度
net.ipv4.tcp_max_syn_backlog = 4096 # 设置 SYN 半连接队列的最大长度
net.ipv4.ip_local_port_range = 1024 65535 # 可使用的临时端口范围
net. ipv4.tcp_tw_reuse = 1 # 允许将处于 TIME_WAIT 状态的 socket 用于新的 TCP 连接
-
-
-
监控和日志记录
-
访问控制,只有授权用户才能接受敏感数据
性能优化
- 使用 gzip 对数据进行压缩
- 浏览器对同一网站打开的连接数(HTTP/1.1 6 个). 如果允许的话, 建议使用 HTTP/2 协议, 它支持更多并发连接(100+).
- 需要避免每位用户的连接数量过多
- 引入心跳机制,定时发送
: heartbeat(以冒号开头表示在注释,会被 EventSource API 忽略) - 优化数据降低带宽,只发送变更的数据
- 需要避免常见的资源泄露
- 未关闭的数据库连接
- 为已断开连接的客户端运行后台任务
- 处理用户请求时分配的内存
- 流式传输过程中创建的临时文件或缓冲区