跳到主要内容

SSE 研究

· 阅读需 10 分钟

基本知识

Server-Sent Events, 是一种基于 HTTP 协议的服务器推送技术. 只允许从服务端到客户端的单向推送.

  • 单向(服务端->客户端)
  • 自动重连
  • 基于 HTTP 协议

使用场景

  1. 实时通知系统
  2. 实时更新的动态数据
  3. 数据流展示
  4. 数据更新频繁
  5. 低延迟单向通信
  6. ChatGPT 的流式输出

简单代码

JS 通过 EventSource 对象与服务端建立连接

  1. new EventSource(url)
  2. onmessage
  3. onopen
  4. onerror
  5. close
const url = "xxxx";
// 1. 建立连接
const eventSource = new EventSource(url);

// 2. 消息事件
eventSource.addEventListener("message", this.handleMessage); // 还可以使用 onmessage 方法

// 3. 错误事件
eventSource.addEventListener("error", this.handleError); // 还可以使用 onerror 方法

// 4. 关闭 SSE 连接
eventSource.close();
@GetMapping(path = "/stream-flux", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<ServerSentEvent<MessageData>> streamFlux(@RequestParam("id") String id) {
return Flux.interval(Duration.ofSeconds(1))
.map(sequence ->
ServerSentEvent.<MessageData>builder()
.id(id) // id
.event("periodic-event") // 事件
.retry(Duration.ofSeconds(30)) // 重连时间
.data(new MessageData( // 自定义数据
UUID.randomUUID().toString(),
"heartbeat",
LocalDateTime.now(),
Map.of("clientId", UUID.randomUUID(),
"status", "connected"
)))
.build()
)
.take(1)
;
}

基本原理

响应头

服务器会设置三个响应头:

  1. Context-Type: text/event-stream; charset=utf-8:响应的内容类型是事件流
  2. Cache-Control: no-cache:禁止缓存响应内容,确保客户端每次都能接收到最新的数据。
  3. Connection: keep-alive 保持 HTTP 连接打开, 以便服务器可以持续向客户端推送数据。

SSE 报文

服务端响应报文:

: 注释\n
id: xxx\n
retry: 5000\n
event:message\n
data: xxxx\n
\n
  1. 每行结束都要有一个\n
  2. : 注释
  3. id: 消息 id
  4. retry: 设置断线重连等待的时间
  5. event: 设置事件名
  6. data: 数据
  7. 空行表示消息结束

SSE 与 Websocket 对比

  • ws 是双向的, 因此长连接情况下 ws 数据传输效率会比 SSE 高.
  • 相对于 SSE 的 HTTP 协议, ws 协议的开销更大, 实现起来也更复杂.
  • ws 支持传输二进制, SSE 只支持 UTF-8 文本(二进制类格式可以通过 base64 编码成文本进行传输)
  • SSE 内置自动重连机制, ws 需要手动实现重连和心跳机制.

自动重连机制

当 HTTP 长连接因网络波动, 服务器重启或者其他异常断开时, EventSource 会自动在一段固定的间隔(未设置 retry 时,默认是 3000ms)后重新发起连接请求。

重连等待时间: 可以通过服务端响应的事件流设置 retry 字段告知客户端下次重连等待的时间。 如 retry: 10000, 等待 10s 再重新连接.

断点续传:Last-Event-ID, 如果发生了重连, 浏览器会把最近接收到的 id 通过 Last-Event-ID 请求头传给服务端.

异常处理,EventSource.onerror 可以监听异常事件。

断线重连的几种情况

  1. 浏览器端实现断线重连

通过监听 EventSource.onerror 事件来实现断线重连机制。

const source = new EventSource(url);

// 在 onerror 中实现断线重连
source.onerror = function (event) {
setTimeout(() => {
source = new EventSource(url);
}, 1000);
};
  1. 服务端主动要求客户端重连

服务器端可以通过发送特定事件(如 reconnect )来要求客户端重新连接。

// 服务器端设置重连延迟
emitter.send(SseEmitter.event()
.reconnectTime(5000L) // 5秒后重连
.data("数据"));

上面代码对应的报文

retry: 5000\n
data: 数据\n
\n
// 客户端
eventSource.addEventListener("reconnect", function (e) {
console.log("收到重连指令:", e.data);
eventSource.close(); // 关闭当前连接
setTimeout(() => {
connect(); // 重新连接
}, 5000);
});

执行这段代码后,客户端会先断开连接,然后在 5s 后重新连接。

  1. 异常情况导致断线

下面几种情况, 浏览器端在连接断开 5s 后会重新建立连接请求。

  • 网络故障
  • 服务器崩溃
  • 连接超时
  • 服务器主动断开(调用了 emitter.complete())

SSE 会无限次重连,直到成功或被手动关闭。

不会重连的情况

  1. 客户端手动调用 eventSource.close()
  2. 服务器返回 HTTP 204 No Content
  3. 服务器返回非 text/event-stream 的 Content-Type
  4. 服务区返回 HTTP 错误码(204, 401,403,404 等)

错误处理, 不同类型的错误需要使用不同的处理策略

  • 暂时性错误(重连)
  • 致命错误(需要向用户报告)

Authentication 问题

  1. 同源, 会直接携带 cookie

  2. 跨源, (处于同一主域名的情况下可以使用 withCredentials 来是跨源请求携带 cookie)

    const eventSource = new EventSource("https://api.myapp.com/sse", {withCredentials:true})
  3. 使用 npm event-source-polyfill 包提供的 EventSourcePolyfil 来携带请求头

    const headers = {
    "Authorization": "..." // somewhat can update this token when it's refreshed
    }
    const eventSource = new EventSourcePolyfill("/sse", { headers })

    需要注意的是, EventSourcePolyfil 会将 Last-Event-ID 作为请求参数传递,而不是请求头

注意事项

如果使用 nginx 配置需要将 proxy_buffering 关闭

proxy_buffering off;

设置响应头 x-accel-buffering 也可以让 nginx 不缓冲后端服务器的响应

x-accel-buffering: no

安全问题

  1. 防止 XSS 攻击: 一般来说, SSE 接收到数据大多数情况下会显示到前端, 因此需要防范 XSS 攻击

  2. 验证连接请求: 检查 Referer 头或使用身份验证令牌.

  3. 限制连接数据, 避免资源耗尽

    • 应用层面: 对每个连接计数

    • Nginx 层面: 限制每个 IP 的连接数

        # /etc/nginx/nginx.conf 或站点配置文件

      # limit_conn_zone 用于定义连接限制的存储区域
      # $binary_remote_addr 使用客户端 IP地址的二进制格式作为 key
      # sse_conn 共享内存区域的名称(这个是自定义的)
      # 10m 分配 10MB 内存,大概可以存储 160,000 个 IP 地址的连接状态
      limit_conn_zone $binary_remote_addr zone=sse_conn:10m;

      # 请求速率限制
      # rate=10r/s, 单 IP 每秒限制 10 个请求
      limit_req_zone $binary_remote_addr zone=sse_req:10m rate=10r/s;

      server {
      listen 80;
      server_name example.com;

      location /events {
      # 每个 IP 最多 10 个连接
      # limit_conn 应用连接限制指令
      # sse_conn 前面定义的共享内存区域
      # 10 每个 IP 最大连接数
      limit_conn sse_conn 10;

      # 请求速率限制
      # burst=20 令牌桶容量 20, 允许短时间内处理 20 个请求
      limit_req zone=sse_req burst=20;

      # SSE 特殊配置
      proxy_pass http://localhost:3000;
      proxy_set_header Connection '';
      proxy_http_version 1.1;
      chunked_transfer_encoding off;
      proxy_buffering off;
      proxy_cache off;
      proxy_read_timeout 24h;
      }
      }
    • 系统层面

      • 修改文件描述符 (默认是 1024)

        # 临时将当前 Shell 会话的文件描述符限制到 65535
        # -n 指定文件描述符(nofile)参数
        ulimit -n 65535

        # 永久修改 /etc/security/limits.conf
        echo "* soft nofile 65535" | sudo tee -a /etc/security/limits.conf # 软限制
        echo "* hard nofile 65535" | sudo tee -a /etc/security/limits.conf # 硬限制

        # 修改系统级别限制
        echo "fs.file-max = 2097152" | sudo tee -a /etc/sysctl. conf # 内核参数

        cat /proc/sys/fs/file-max # 查看系统限制
        cat /proc/sys/fs/file-nr # 查看当前使用情况

        sudo sysctl -w fs.file-max=2097152 # 直接修改(临时)
        sudo sysctl -p # 从文件加载(永久)
      • 优化 TCP 连接参数

        # /etc/sysctl.conf
        # 客户端连接请求 → [SYN队列] → [Accept队列] → accept()处理
        net.core.somaxconn = 4096 # 设置listen() 系统调用的最大连接队列长度
        net.ipv4.tcp_max_syn_backlog = 4096 # 设置 SYN 半连接队列的最大长度
        net.ipv4.ip_local_port_range = 1024 65535 # 可使用的临时端口范围
        net. ipv4.tcp_tw_reuse = 1 # 允许将处于 TIME_WAIT 状态的 socket 用于新的 TCP 连接
  4. 监控和日志记录

  5. 访问控制,只有授权用户才能接受敏感数据

性能优化

  1. 使用 gzip 对数据进行压缩
  2. 浏览器对同一网站打开的连接数(HTTP/1.1 6 个). 如果允许的话, 建议使用 HTTP/2 协议, 它支持更多并发连接(100+).
    • 需要避免每位用户的连接数量过多
  3. 引入心跳机制,定时发送 : heartbeat (以冒号开头表示在注释,会被 EventSource API 忽略)
  4. 优化数据降低带宽,只发送变更的数据
  5. 需要避免常见的资源泄露
    • 未关闭的数据库连接
    • 为已断开连接的客户端运行后台任务
    • 处理用户请求时分配的内存
    • 流式传输过程中创建的临时文件或缓冲区

参考文章