跳到主要内容

SSE 技术

· 阅读需 11 分钟

基本知识

Server-Sent Events, 是一种基于 HTTP 协议的服务器推送技术. 只允许从服务端到客户端的单向推送.

  • 单向(服务端->客户端)
  • 自动重连
  • 基于 HTTP 协议
  • 浏览器支持比较好

使用场景

  1. 实时通知系统
  2. 实时更新的动态数据
  3. 数据流展示
  4. 数据更新频繁
  5. 低延迟单向通信
  6. ChatGPT 的流式输出

简单代码

JS 通过 EventSource 对象与服务端建立连接

  1. new EventSource(url)
  2. onmessage
  3. onopen
  4. onerror
  5. close
const url = "xxxx";
// 1. 建立连接
const eventSource = new EventSource(url);

// 2. 消息事件
eventSource.addEventListener("message", this.handleMessage); // 还可以使用 onmessage 方法

// 3. 错误事件
eventSource.addEventListener("error", this.handleError); // 还可以使用 onerror 方法

// 4. 关闭 SSE 连接
eventSource.close();
@GetMapping(path = "/stream-flux", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<ServerSentEvent<MessageData>> streamFlux(@RequestParam("id") String id) {
return Flux.interval(Duration.ofSeconds(1))
.map(sequence ->
ServerSentEvent.<MessageData>builder()
.id(id) // id
.event("periodic-event") // 事件
.retry(Duration.ofSeconds(30)) // 重连时间
.data(new MessageData( // 自定义数据
UUID.randomUUID().toString(),
"heartbeat",
LocalDateTime.now(),
Map.of("clientId", UUID.randomUUID(),
"status", "connected"
)))
.build()
)
.take(1)
;
}

基本原理

响应头

服务器会设置三个响应头:

  1. Context-Type: text/event-stream; charset=utf-8:响应的内容类型是事件流
  2. Cache-Control: no-cache:禁止缓存响应内容,确保客户端每次都能接收到最新的数据。
  3. Connection: keep-alive 保持 HTTP 连接打开, 以便服务器可以持续向客户端推送数据。

如果使用到了 Nginx, 一般还会加上 x-accel-buffering: no, 让 Nginx 不缓冲后端服务器的响应

SSE 报文

服务端响应报文:

: 注释\n
id: xxx\n
retry: 5000\n
event:message\n
data: xxxx\n
\n
  1. 每行结束都要有一个\n
  2. : 注释
  3. id: 消息 id
  4. retry: 设置断线重连等待的时间
  5. event: 设置事件名
  6. data: 数据
  7. 空行表示消息结束

SSE 与 Websocket 对比

  • ws 是双向的, 因此长连接情况下 ws 数据传输效率会比 SSE 高.
  • 相对于 SSE 的 HTTP 协议, ws 协议的开销更大, 实现起来也更复杂.
  • ws 支持传输二进制, SSE 只支持 UTF-8 文本(二进制类格式可以通过 base64 编码成文本进行传输)
  • SSE 内置自动重连机制, ws 需要手动实现重连和心跳机制.

自动重连机制

当 HTTP 长连接因网络波动, 服务器重启或者其他异常断开时, EventSource 会自动在一段固定的间隔(未设置 retry 时,默认是 3000ms)后重新发起连接请求。

重连等待时间: 可以通过服务端响应的事件流设置 retry 字段告知客户端下次重连等待的时间。 如 retry: 10000, 等待 10s 再重新连接.

断点续传:Last-Event-ID, 如果发生了重连, 浏览器会把最近接收到的 id 通过 Last-Event-ID 请求头传给服务端. 服务端可以根据这个字段续传, 避免数据丢失.

异常处理,EventSource.onerror 可以监听异常事件。

断线重连的几种情况

  1. EventSource 自动重连
const source = new EventSource(url);

在出现异常情况之后, 客户端会自动重连.

下面几种情况, 浏览器端在连接断开后会重新建立连接请求。

  • 网络故障
  • 服务器崩溃
  • 连接超时
  • 服务器主动断开(调用了 emitter.complete())

SSE 会无限次重连,直到成功或被手动关闭。

不会重连的情况

  1. 客户端手动调用 eventSource.close()
  2. 服务器返回 HTTP 204 No Content
  3. 服务器返回非 text/event-stream 的 Content-Type
  4. 服务区返回 HTTP 错误码(204, 401,403,404 等)

错误处理, 不同类型的错误需要使用不同的处理策略

  • 暂时性错误(重连)
  • 致命错误(需要向用户报告)
  1. 服务端控制重连的时机

服务端通过设置 retry 来控制客户端在断连之后等待重连的时间.

  1. 异常情况下的手动重连

通过监听 EventSource.onerror 事件来实现断线重连机制。

const source = new EventSource(url);

// 在 onerror 中实现断线重连
source.onerror = function (event) {
setTimeout(() => {
source = new EventSource(url);
}, 1000);
};
  1. 服务端主动要求客户端重连

服务器端可以通过发送特定事件(如 reconnect )来要求客户端重新连接。

// 服务器端设置重连延迟
emitter.send(SseEmitter.event()
.reconnectTime(5000L) // 5秒后重连
.data("数据"));

上面代码对应的报文

retry: 5000\n
data: 数据\n
\n
// 客户端
eventSource.addEventListener("reconnect", function (e) {
console.log("收到重连指令:", e.data);
eventSource.close(); // 关闭当前连接
setTimeout(() => {
connect(); // 重新连接
}, 5000);
});

执行这段代码后,客户端会先断开连接,然后在 5s 后重新连接。

Authentication 问题

  1. 同源, 会直接携带 cookie

  2. 跨源, (处于同一主域名的情况下可以使用 withCredentials 来是跨源请求携带 cookie)

    const eventSource = new EventSource("https://api.myapp.com/sse", {withCredentials:true})
  3. 使用 npm event-source-polyfill 包提供的 EventSourcePolyfil 来携带请求头

    const headers = {
    "Authorization": "..." // somewhat can update this token when it's refreshed
    }
    const eventSource = new EventSourcePolyfill("/sse", { headers })

    需要注意的是, EventSourcePolyfil 会将 Last-Event-ID 作为请求参数传递,而不是请求头

注意事项

如果使用 Nginx 配置需要将 proxy_buffering 关闭

proxy_buffering off;

设置响应头 x-accel-buffering 也可以让 Nginx 不缓冲后端服务器的响应

x-accel-buffering: no

分布式架构设计

  1. 主题分片 Topic Sharding
    • 按业务领域分片
      • 用户消息
      • 订单消息
      • 通知消息
    • 用户分片: 根据 userID hash 分配到不同的服务器

安全问题

  1. 防止 XSS 攻击: 一般来说, SSE 接收到数据大多数情况下会显示到前端, 因此需要防范 XSS 攻击

  2. 验证连接请求: 检查 Referer 头或使用身份验证令牌.

  3. 限制连接数据, 避免资源耗尽

    • 应用层面: 对每个连接计数

    • Nginx 层面: 限制每个 IP 的连接数

        # /etc/nginx/nginx.conf 或站点配置文件

      # limit_conn_zone 用于定义连接限制的存储区域
      # $binary_remote_addr 使用客户端 IP地址的二进制格式作为 key
      # sse_conn 共享内存区域的名称(这个是自定义的)
      # 10m 分配 10MB 内存,大概可以存储 160,000 个 IP 地址的连接状态
      limit_conn_zone $binary_remote_addr zone=sse_conn:10m;

      # 请求速率限制
      # rate=10r/s, 单 IP 每秒限制 10 个请求
      limit_req_zone $binary_remote_addr zone=sse_req:10m rate=10r/s;

      server {
      listen 80;
      server_name example.com;

      location /events {
      # 每个 IP 最多 10 个连接
      # limit_conn 应用连接限制指令
      # sse_conn 前面定义的共享内存区域
      # 10 每个 IP 最大连接数
      limit_conn sse_conn 10;

      # 请求速率限制
      # burst=20 令牌桶容量 20, 允许短时间内处理 20 个请求
      limit_req zone=sse_req burst=20;

      # SSE 特殊配置
      proxy_pass http://localhost:3000;
      proxy_set_header Connection '';
      proxy_http_version 1.1;
      chunked_transfer_encoding off;
      proxy_buffering off;
      proxy_cache off;
      proxy_read_timeout 24h;
      }
      }
    • 系统层面

      • 修改文件描述符 (默认是 1024)

        # 临时将当前 Shell 会话的文件描述符限制到 65535
        # -n 指定文件描述符(nofile)参数
        ulimit -n 65535

        # 永久修改 /etc/security/limits.conf
        echo "* soft nofile 65535" | sudo tee -a /etc/security/limits.conf # 软限制
        echo "* hard nofile 65535" | sudo tee -a /etc/security/limits.conf # 硬限制

        # 修改系统级别限制
        echo "fs.file-max = 2097152" | sudo tee -a /etc/sysctl. conf # 内核参数

        cat /proc/sys/fs/file-max # 查看系统限制
        cat /proc/sys/fs/file-nr # 查看当前使用情况

        sudo sysctl -w fs.file-max=2097152 # 直接修改(临时)
        sudo sysctl -p # 从文件加载(永久)
      • 优化 TCP 连接参数

        # /etc/sysctl.conf
        # 客户端连接请求 → [SYN队列] → [Accept队列] → accept()处理
        net.core.somaxconn = 4096 # 设置listen() 系统调用的最大连接队列长度
        net.ipv4.tcp_max_syn_backlog = 4096 # 设置 SYN 半连接队列的最大长度
        net.ipv4.ip_local_port_range = 1024 65535 # 可使用的临时端口范围
        net. ipv4.tcp_tw_reuse = 1 # 允许将处于 TIME_WAIT 状态的 socket 用于新的 TCP 连接
  4. 监控和日志记录

  5. 访问控制,只有授权用户才能接受敏感数据

性能优化

  1. 使用 gzip 对数据进行压缩
  2. 浏览器对同一网站打开的连接数(HTTP/1.1 6 个). 如果允许的话, 建议使用 HTTP/2 协议, 它支持更多并发连接(100+).
    • 需要避免每位用户的连接数量过多
  3. 引入心跳机制,定时发送 : heartbeat (以冒号开头表示在注释,会被 EventSource API 忽略)
  4. 优化数据降低带宽,只发送变更的数据
  5. 需要避免常见的资源泄露
    • 未关闭的数据库连接
    • 为已断开连接的客户端运行后台任务
    • 处理用户请求时分配的内存
    • 流式传输过程中创建的临时文件或缓冲区

参考文章