SSE(Server-Sent Events)技术文档

前置知识

什么是 SSE 请求

查看阮一峰的解释:SSE

前端建立连接后,服务端可以不间断向浏览器多次推送数据,简称流式响应。现在各大 AI 问答里,基本都是用的流式响应(渲染)。

SSE 响应格式

SSE 标准响应规范是文本,每一次响应都会是一个文本片段,片段可能有多行,每一行以特殊字段开头:<field>: <value>。最后一行是空字符串代表片段结束(结尾可能有多个空字符行,不影响)。

其中 field 的类型是:

type FieldType = "data" | "event" | "id" | "retry";

后端可能只会返回 data 开头的数据,具体情况需要和后端开发沟通。

可能存在的消息片段示例

id: xxx
data: xxx
event: xxx
retry: xxx
data: xxx
data: xxx
 

实际解析时,可能存在:一行数据拆成了 2 行,并且一般都是从冒号处截断,如:拆分后,第一行是"<field>: ",第二行是"<value>"

片段示例 1

data:
xxx
 

一、可以发起 SSE 请求的浏览器 API

数据截止 2025 年 8 月 22 日

1. EventSource (不推荐)

EventSource 是浏览器原生 API,用于发起 SSE 请求。 兼容性 95.93%,基本满足一般需求

功能薄弱:

  • 仅支持 GET 请求
  • 无法自定义请求头(如添加 Authorization)
  • 错误恢复机制简单,难以自定义

2. Fetch

注意:使用 fetch 也可以发起 SSE 请求,但是监听流式返回需要浏览器支持 ReadableStream,否则 response.body 是 undefined,无法解析流也就无法监听不间断的响应。

已知阿里的 mpass 框架支持 fetch 不支持 ReadableStream(@伍勇 提供的信息)。 兼容性:Fetch: 95.89%、ReadableStream: 94.8%

基于 fetch 实现 SSE 解析,有许多成熟的库,比如 x.ant.design 的 xStream、@microsoft/fetch-event-source(推荐)等。 x.ant.design 使用到 TextDecoderStream API,兼容性只有 94%。

@microsoft/fetch-event-source(推荐)

相比 EventSource,此库的优点:

  • 支持任意 HTTP 方法(如 POST、PUT)
  • 允许添加自定义请求头(如 Authorization: Bearer xxx)
  • 提供更精细的错误控制和重试逻辑

如果项目不需要兼容很旧的浏览器,用此方案即可。

示例:

import { fetchEventSource } from "@microsoft/fetch-event-source";
 
// 用于中断请求
const abortControllerRef = new AbortController();
 
fetchEventSource(chatApi, {
  method: "POST", // 支持 POST
  headers: {
    "Content-Type": "application/json",
    Authorization: `Bearer ${accessToken}`,
  },
  body: JSON.stringify(body),
  signal: abortControllerRef.signal,
  onopen(response) {
    console.log("连接成功", response.status);
    return Promise.resolve();
  },
  onmessage(event) {
    event.data;
    console.log("收到数据:", event.data); // 处理服务器推送的数据
  },
  onclose() {
    console.log("连接关闭");
  },
  onerror(err) {
    console.error("发生错误:", err);
    // 返回 undefined 以停止重试,或抛出错误继续重试
  },
});

注意,@microsoft/fetch-event-source 也是依赖的 ReadableStream 类型的 response.body

二、原生实现(兼容性最好)

接下来有大段代码,最好复制到编辑器里查阅。

(下方示例假定后端返回的都是纯文本。如需支持二进制,请参考@microsoft/fetch-event-source 源码)

1. XHR 原生实现

import { TextStreamTransToChunk, type Message } from "./TextStreamTransToChunk";
 
/**
 * 使用原生 XMLHttpRequest 实现流式数据获取
 * 适用于不支持 fetch 流式响应的环境
 * @param params 配置参数
 * @returns 包含 abort 方法的对象,用于取消请求
 */
export function createXHRStream(
  url: string,
  params: {
    method: string;
    headers?: any;
    body: any;
    onMessage: (params: Message) => void;
    onOpen?: () => void;
    onClose?: () => void;
    onFinish?: () => void;
    onError?: (error: any) => void;
  }
) {
  const {
    method,
    body,
    headers,
    onMessage,
    onOpen,
    onClose,
    onFinish,
    onError,
  } = params;
 
  // 创建 XMLHttpRequest 实例
  const xhr = new XMLHttpRequest();
  let processedLength = 0;
  let isOpened = false;
 
  // 获取访问令牌
  const textStreamTransform = new TextStreamTransToChunk({
    onMessage: onMessage,
  });
 
  // 配置请求
  xhr.open(method, url, true);
  xhr.withCredentials = true;
 
  // 设置请求头
  headers &&
    Object.keys(headers).forEach((key) => {
      xhr.setRequestHeader(key, headers[key]);
    });
 
  // 设置响应类型为文本
  xhr.responseType = "text";
 
  // 监听 readyState 变化
  xhr.onreadystatechange = function () {
    if (xhr.readyState === 1) {
      // OPENED state
      if (!isOpened) {
        onOpen?.();
        isOpened = true;
      }
    }
 
    if (xhr.readyState === 3) {
      // LOADING state
      // 处理流式数据
      if (xhr.responseText) {
        const newData = xhr.responseText.substring(processedLength);
        if (newData.length > 0) {
          textStreamTransform.addText(newData);
          processedLength = xhr.responseText.length;
        }
      }
    }
 
    if (xhr.readyState === 4) {
      // DONE state
      if (!isOpened) {
        onOpen?.();
        isOpened = true;
      }
 
      if (xhr.status >= 200 && xhr.status < 300) {
        // 处理剩余数据
        const remainingData = xhr.responseText.substring(processedLength);
        if (remainingData.length > 0) {
          textStreamTransform.addText(remainingData);
        }
        onFinish?.();
      } else {
        onError?.(new Error(`HTTP ${xhr.status}: ${xhr.statusText}`));
      }
      onClose?.();
    }
  };
 
  // 监听错误事件
  xhr.onerror = function () {
    if (!isOpened) {
      onOpen?.();
      isOpened = true;
    }
    onError?.(new Error("Network error"));
    onClose?.();
  };
 
  // 监听超时事件
  xhr.ontimeout = function () {
    if (!isOpened) {
      onOpen?.();
      isOpened = true;
    }
    onError?.(new Error("Request timeout"));
    onClose?.();
  };
 
  // 发送请求
  if (body) {
    xhr.send(body);
  } else {
    xhr.send();
  }
 
  // 返回 abort 方法用于取消请求
  return {
    abort: () => {
      if (!isOpened) {
        onOpen?.();
        isOpened = true;
      }
      xhr.abort();
      onClose?.();
    },
  };
}
 
// 测试代码
const test = () => {
  const accessToken = "xxx";
  createXHRStream(
    "https://ai-gateway-test.sany.com.cn/person/v1/ai/chat-messages",
    {
      method: "POST",
      body: JSON.stringify({ query: "我有哪些待办" }),
      headers: {
        Authorization: "Bearer" + " " + accessToken,
        "Content-Type": "application/json",
      },
      onMessage(e) {
        console.log("chunk ", e);
      },
    }
  );
};

工具函数:TextStreamTransToChunk

用于处理响应文本片段

type FieldType = "data" | "event" | "id" | "retry";
 
export type Message = {
  data?: string;
  event?: string;
  id?: string;
  retry?: number;
};
 
/**
 * 手动处理sse的文本流响应
 * 将完整的"data: {xxxx}"、或者不完整的"data:"合并处理,保证onMessage回调能收到{data: "string"}这样的对象
 */
export class TextStreamTransToChunk {
  /**
   * 处理文本流过程中,遇到的最后一个类型消息
   */
  private preField?: FieldType;
  private message: Message;
 
  onId?: (id: string) => void;
  onRetry?: (retry: number) => void;
  /**
   * 整段Text已处理完毕
   */
  onMessage?: (params: Message) => void;
 
  constructor(params: {
    onMessage: (params: Message) => void;
    onId?: (id: string) => void;
    onRetry?: (retry: number) => void;
  }) {
    this.onMessage = params.onMessage;
    this.onId = params.onId;
    this.onRetry = params.onRetry;
    this.message = {
      data: "",
      event: "",
      id: "",
      retry: undefined,
    };
  }
 
  /**
   * 添加整段Text消息
   * @param val 有多种可能性
   * - 完整消息,目前我们的后端只会返回data开头的数据
   * ```sh
   * id: xxx
   * data: xxx
   * event: xxx
   * retry: xxx
   * data: xxx
   * data: xxx
   * ""
   * ```
   * - 不完整消息
   * ```sh
   * data: xxx
   * data:
   * ```
   */
  addText(text: string) {
    const lines = text.split("\n");
    lines.forEach((line) => {
      this.processLine(line);
    });
  }
 
  /**
   * 处理行数据
   * @param line "data: xxx"、"data: "、"id: xx"、 "event: xx"、"retry"
   * @returns
   */
  private processLine(line: string) {
    if (
      line &&
      (line === "data" || line === "data:" || line.indexOf("data:") !== 0)
    ) {
      console.log("break line", line);
    }
    if (line.length === 0) {
      // console.log("line end ----------------------------> onMessage");
      // 空行表示一个完整消息的结束
      if (
        this.message.data ||
        this.message.event ||
        this.message.id ||
        this.message.retry !== undefined
      ) {
        this.onMessage?.({
          data: this.message.data,
          event: this.message.event,
          id: this.message.id,
          retry: this.message.retry,
        });
 
        // 重置消息对象
        this.message = {
          data: "",
          event: "",
          id: "",
          retry: undefined,
        };
      }
      return;
    }
 
    // 查找字段名和值的分隔符
    const colonIndex = line.indexOf(":");
    /**
     *  2种情况,整行作为上一个field类型的值
     * - 没有冒号的情况
     * - index大于5(以"data" | "event" | "id" | "retry" 开头的行,冒号index不会超过5)
     */
    if (colonIndex === -1 || colonIndex > 5) {
      this.processField(this.preField!, line.trim() as FieldType);
    } else {
      const field = line.substring(0, colonIndex).trim() as FieldType;
      const value = line.substring(colonIndex + 1).trim();
      this.processField(field, value);
    }
  }
 
  /**
   * 处理已识别类型的数据
   * @param field
   * @param value
   */
  private processField(field: FieldType, value: string) {
    this.preField = field;
    switch (field) {
      case "data":
        // 连续接收多个 data 字段需要拼接,每行之间用换行符分隔
        this.message.data = this.message.data
          ? this.message.data + "\n" + value
          : value;
        break;
      case "event":
        this.message.event = value;
        break;
      case "id":
        this.message.id = value;
        this.onId?.(value);
        break;
      case "retry":
        const retry = parseInt(value, 10);
        if (!isNaN(retry)) {
          this.message.retry = retry;
          this.onRetry?.(retry);
        }
        break;
      default:
        // 忽略未知字段
        break;
    }
  }
 
  // 提供方法获取当前消息状态
  getCurrentMessage() {
    return { ...this.message };
  }
}

2. Axios 实现

axios 的方案底层也是 xhr,实现方案和 xhr 原生实现也差不多,如果你的项目里已经用了 axios,可以直接使用此方案

import axios from "axios";
import { TextStreamTransToChunk, type Message } from "./TextStreamTransToChunk";
 
/**
 * 使用 axios 实现流式数据获取
 * 适用于不支持 fetch 流式响应的环境
 * @param url 请求地址
 * @param params 配置参数
 * @returns 包含 cancel 方法的对象,用于取消请求
 */
export function createAxiosStream(
  url: string,
  params: {
    method: string;
    headers?: any;
    body: any;
    onMessage: (params: Message) => void;
    onOpen?: () => void;
    onClose?: () => void;
    onFinish?: () => void;
    onError?: (error: any) => void;
  }
) {
  const CancelToken = axios.CancelToken;
  const source = CancelToken.source();
  let processedLength = 0;
  let isOpened = false;
 
  const {
    method,
    body,
    headers,
    onMessage,
    onOpen,
    onClose,
    onFinish,
    onError,
  } = params;
 
  const textStreamTransform = new TextStreamTransToChunk({
    onMessage: onMessage,
  });
 
  // 对于不支持流式响应的环境,使用轮询或者分块请求方式
  axios({
    url,
    method: method,
    data: body,
    headers,
    responseType: "text",
    cancelToken: source.token,
    onDownloadProgress: function (progressEvent) {
      // currentTarget.responseText 可能无法获取到实时数据
      const xhr = progressEvent?.event.currentTarget;
      const responseText = xhr?.responseText;
 
      // 检查连接是否已打开
      if (xhr.readyState >= 1 && !isOpened) {
        onOpen?.();
        isOpened = true;
      }
 
      // console.log(xhr, responseText);
      if (xhr.readyState === 3 && responseText) {
        // LOADING
        const newData = responseText.substring(processedLength);
        if (newData.length > 0) {
          textStreamTransform.addText(newData);
          processedLength = xhr.responseText.length;
        }
      }
    },
  })
    .then((response) => {
      // 确保连接已打开
      if (!isOpened) {
        onOpen?.();
        isOpened = true;
      }
 
      // 处理剩余数据
      const remainingData = response.data.substring(processedLength);
      if (remainingData.length > 0) {
        textStreamTransform.addText(remainingData);
      }
      onFinish?.();
      onClose?.();
    })
    .catch((error) => {
      if (axios.isCancel(error)) {
        // 确保连接已打开
        if (!isOpened) {
          onOpen?.();
          isOpened = true;
        }
        console.log("Request canceled", error.message);
        onClose?.();
      } else {
        // 确保连接已打开
        if (!isOpened) {
          onOpen?.();
          isOpened = true;
        }
        onError?.(error);
        onClose?.();
      }
    });
 
  // 返回 cancel 方法用于取消请求
  return {
    abort: (message?: string) => {
      // 确保连接已打开
      if (!isOpened) {
        onOpen?.();
        isOpened = true;
      }
      source.cancel(message);
      onClose?.();
    },
  };
}
 
// 测试代码
const test = () => {
  const accessToken = "xxx";
  createAxiosStream(
    "https://ai-gateway-test.sany.com.cn/person/v1/ai/chat-messages",
    {
      method: "POST",
      body: JSON.stringify({ query: "我有哪些待办" }),
      headers: {
        Authorization: "Bearer" + " " + accessToken,
        "Content-Type": "application/json",
      },
      onMessage(e) {
        console.log("chunk ", e);
      },
    }
  );
};

三、mock 一个流式响应

开发过程中,后端接口不一定及时开发完,此时需要 mock 流式响应,方便前端代码编写。 如果不想自己开发一个 mock 服务端,可以考虑前端 mock 数据:

创建

自行创建一个 ReadableStream 的流,作为 fetch 响应里的 response.body

import { generateUUID } from "./uuid";
import { sleep } from "@/api/mock";
 
export let useMock = false;
const _contentChunks =
  "这是一个关于一只小狐狸和一座会唱歌的森林的故事:在遥远的北方,有一片被薄雾笼罩的古老森林,......。🌙✨你想听下一段吗?".match(
    /.{1,4}/g
  ) as string[];
 
export function mockReadableStream(
  askMessage: string,
  conversation_id?: string
) {
  const sseChunks: string[] = [];
  const contentChunks = [
    ..._contentChunks,
    `你的问题是:${askMessage}`,
    "message_end占位",
  ];
  for (let i = 0; i < contentChunks.length; i++) {
    const sseEventPart = `event: message\ndata: {"answer": "${
      contentChunks[i]
    }","conversation_id": "${conversation_id || generateUUID()}","event": "${
      i !== contentChunks.length - 1 ? "agent_message" : "message_end"
    }","id": "${i}"}\n\n`;
    sseChunks.push(sseEventPart);
  }
 
  return new ReadableStream({
    async start(controller) {
      for (const chunk of sseChunks) {
        await sleep(200);
        controller.enqueue(new TextEncoder().encode(chunk));
      }
      controller.close();
    },
  });
}
 
/**
 * 读取sse文本流兼容方法
 * @param response
 * @param callback
 * @returns
 */
export async function processStream(
  response: Response,
  callback: (chunk: { data: any }) => any
) {
  const reader = response.body?.getReader();
  if (!reader) {
    return;
  }
  const decoder = new TextDecoder();
  let partialChunk = "";
 
  while (true) {
    const { done, value } = await reader.read();
    if (done) {
      if (partialChunk)
        callback({
          data: partialChunk,
        });
      break;
    }
 
    const text = partialChunk + decoder.decode(value, { stream: true });
    const lines = text.split("\n");
 
    partialChunk = lines.pop() || "";
    lines.forEach((line) => {
      if (line) {
        const jsonString = line.replace("data:", "");
        jsonString &&
          callback({
            data: jsonString,
          });
      }
    });
  }
}

使用 mock 流

const response = {
  body: mockReadableStream(param.question, param.conversation_id),
  ok: true,
};
processStream(response, (chunk) => {
  console.log(chunk);
});

四、如何流式渲染一个自定义UI的回答(而不是纯文本markdown渲染)?

  1. 当要渲染自定义UI时(如表格),需要大模型流式返回特殊格式的消息,一般是JSON
  2. 前后端约定好,JSON里有一个字段标识(如UI_TYPE):当前要渲染哪种自定义UI。每一种自定义UI都有一个标识值。
  3. 前端在接收流式响应时,一旦发现流的响应符合JSON特征或者特定结构,就开始尝试修复JSON(因为数据是流式返回,所以已经收到的数据大概率不是完整JSON)
  4. 使用jsonrepair修复残缺JSON数据,并渲染返回的数据
import { jsonrepair } from "jsonrepair";
 
/**
 * 尝试修复json
 * @param jsonString 
 * @returns 
 */
export function repairAndParseJson(jsonString: string) {
  if (!jsonString || typeof jsonString !== "string") {
    return null;
  }
 
  try {
    // 首先尝试直接解析
    return JSON.parse(jsonString);
  } catch (e) {
    try {
      // 使用jsonrepair修复
      const repaired = jsonrepair(jsonString);
      return JSON.parse(repaired);
    } catch (e2) {
      console.error("JSON修复失败:", e2);
      return null;
    }
  }
}
  1. 写自定义UI代码时要注意,JSON里的每个字段都有可能为空,要做全量的代码健壮性。