SSE(Server-Sent Events)技术文档
前置知识
什么是 SSE 请求
查看阮一峰的解释:SSE
前端建立连接后,服务端可以不间断向浏览器多次推送数据,简称流式响应。现在各大 AI 问答里,基本都是用的流式响应(渲染)。
SSE 响应格式
SSE 标准响应规范是文本,每一次响应都会是一个文本片段,片段可能有多行,每一行以特殊字段开头:<field>: <value>。最后一行是空字符串代表片段结束(结尾可能有多个空字符行,不影响)。
其中 field 的类型是:
type FieldType = "data" | "event" | "id" | "retry";后端可能只会返回 data 开头的数据,具体情况需要和后端开发沟通。
可能存在的消息片段示例
id: xxx
data: xxx
event: xxx
retry: xxx
data: xxx
data: xxx
实际解析时,可能存在:一行数据拆成了 2 行,并且一般都是从冒号处截断,如:拆分后,第一行是"<field>: ",第二行是"<value>"。
片段示例 1
data:
xxx
一、可以发起 SSE 请求的浏览器 API
数据截止 2025 年 8 月 22 日
1. EventSource (不推荐)
EventSource 是浏览器原生 API,用于发起 SSE 请求。 兼容性 95.93%,基本满足一般需求
功能薄弱:
- 仅支持 GET 请求
- 无法自定义请求头(如添加 Authorization)
- 错误恢复机制简单,难以自定义
2. Fetch
注意:使用 fetch 也可以发起 SSE 请求,但是监听流式返回需要浏览器支持 ReadableStream,否则 response.body 是 undefined,无法解析流也就无法监听不间断的响应。
已知阿里的 mpass 框架支持 fetch 不支持 ReadableStream(@伍勇 提供的信息)。 兼容性:Fetch: 95.89%、ReadableStream: 94.8%
基于 fetch 实现 SSE 解析,有许多成熟的库,比如 x.ant.design 的 xStream、@microsoft/fetch-event-source(推荐)等。 x.ant.design 使用到 TextDecoderStream API,兼容性只有 94%。
@microsoft/fetch-event-source(推荐)
相比 EventSource,此库的优点:
- 支持任意 HTTP 方法(如 POST、PUT)
- 允许添加自定义请求头(如 Authorization: Bearer xxx)
- 提供更精细的错误控制和重试逻辑
如果项目不需要兼容很旧的浏览器,用此方案即可。
示例:
import { fetchEventSource } from "@microsoft/fetch-event-source";
// 用于中断请求
const abortControllerRef = new AbortController();
fetchEventSource(chatApi, {
method: "POST", // 支持 POST
headers: {
"Content-Type": "application/json",
Authorization: `Bearer ${accessToken}`,
},
body: JSON.stringify(body),
signal: abortControllerRef.signal,
onopen(response) {
console.log("连接成功", response.status);
return Promise.resolve();
},
onmessage(event) {
event.data;
console.log("收到数据:", event.data); // 处理服务器推送的数据
},
onclose() {
console.log("连接关闭");
},
onerror(err) {
console.error("发生错误:", err);
// 返回 undefined 以停止重试,或抛出错误继续重试
},
});注意,@microsoft/fetch-event-source 也是依赖的 ReadableStream 类型的 response.body
二、原生实现(兼容性最好)
接下来有大段代码,最好复制到编辑器里查阅。
(下方示例假定后端返回的都是纯文本。如需支持二进制,请参考@microsoft/fetch-event-source 源码)
1. XHR 原生实现
import { TextStreamTransToChunk, type Message } from "./TextStreamTransToChunk";
/**
* 使用原生 XMLHttpRequest 实现流式数据获取
* 适用于不支持 fetch 流式响应的环境
* @param params 配置参数
* @returns 包含 abort 方法的对象,用于取消请求
*/
export function createXHRStream(
url: string,
params: {
method: string;
headers?: any;
body: any;
onMessage: (params: Message) => void;
onOpen?: () => void;
onClose?: () => void;
onFinish?: () => void;
onError?: (error: any) => void;
}
) {
const {
method,
body,
headers,
onMessage,
onOpen,
onClose,
onFinish,
onError,
} = params;
// 创建 XMLHttpRequest 实例
const xhr = new XMLHttpRequest();
let processedLength = 0;
let isOpened = false;
// 获取访问令牌
const textStreamTransform = new TextStreamTransToChunk({
onMessage: onMessage,
});
// 配置请求
xhr.open(method, url, true);
xhr.withCredentials = true;
// 设置请求头
headers &&
Object.keys(headers).forEach((key) => {
xhr.setRequestHeader(key, headers[key]);
});
// 设置响应类型为文本
xhr.responseType = "text";
// 监听 readyState 变化
xhr.onreadystatechange = function () {
if (xhr.readyState === 1) {
// OPENED state
if (!isOpened) {
onOpen?.();
isOpened = true;
}
}
if (xhr.readyState === 3) {
// LOADING state
// 处理流式数据
if (xhr.responseText) {
const newData = xhr.responseText.substring(processedLength);
if (newData.length > 0) {
textStreamTransform.addText(newData);
processedLength = xhr.responseText.length;
}
}
}
if (xhr.readyState === 4) {
// DONE state
if (!isOpened) {
onOpen?.();
isOpened = true;
}
if (xhr.status >= 200 && xhr.status < 300) {
// 处理剩余数据
const remainingData = xhr.responseText.substring(processedLength);
if (remainingData.length > 0) {
textStreamTransform.addText(remainingData);
}
onFinish?.();
} else {
onError?.(new Error(`HTTP ${xhr.status}: ${xhr.statusText}`));
}
onClose?.();
}
};
// 监听错误事件
xhr.onerror = function () {
if (!isOpened) {
onOpen?.();
isOpened = true;
}
onError?.(new Error("Network error"));
onClose?.();
};
// 监听超时事件
xhr.ontimeout = function () {
if (!isOpened) {
onOpen?.();
isOpened = true;
}
onError?.(new Error("Request timeout"));
onClose?.();
};
// 发送请求
if (body) {
xhr.send(body);
} else {
xhr.send();
}
// 返回 abort 方法用于取消请求
return {
abort: () => {
if (!isOpened) {
onOpen?.();
isOpened = true;
}
xhr.abort();
onClose?.();
},
};
}
// 测试代码
const test = () => {
const accessToken = "xxx";
createXHRStream(
"https://ai-gateway-test.sany.com.cn/person/v1/ai/chat-messages",
{
method: "POST",
body: JSON.stringify({ query: "我有哪些待办" }),
headers: {
Authorization: "Bearer" + " " + accessToken,
"Content-Type": "application/json",
},
onMessage(e) {
console.log("chunk ", e);
},
}
);
};工具函数:TextStreamTransToChunk
用于处理响应文本片段
type FieldType = "data" | "event" | "id" | "retry";
export type Message = {
data?: string;
event?: string;
id?: string;
retry?: number;
};
/**
* 手动处理sse的文本流响应
* 将完整的"data: {xxxx}"、或者不完整的"data:"合并处理,保证onMessage回调能收到{data: "string"}这样的对象
*/
export class TextStreamTransToChunk {
/**
* 处理文本流过程中,遇到的最后一个类型消息
*/
private preField?: FieldType;
private message: Message;
onId?: (id: string) => void;
onRetry?: (retry: number) => void;
/**
* 整段Text已处理完毕
*/
onMessage?: (params: Message) => void;
constructor(params: {
onMessage: (params: Message) => void;
onId?: (id: string) => void;
onRetry?: (retry: number) => void;
}) {
this.onMessage = params.onMessage;
this.onId = params.onId;
this.onRetry = params.onRetry;
this.message = {
data: "",
event: "",
id: "",
retry: undefined,
};
}
/**
* 添加整段Text消息
* @param val 有多种可能性
* - 完整消息,目前我们的后端只会返回data开头的数据
* ```sh
* id: xxx
* data: xxx
* event: xxx
* retry: xxx
* data: xxx
* data: xxx
* ""
* ```
* - 不完整消息
* ```sh
* data: xxx
* data:
* ```
*/
addText(text: string) {
const lines = text.split("\n");
lines.forEach((line) => {
this.processLine(line);
});
}
/**
* 处理行数据
* @param line "data: xxx"、"data: "、"id: xx"、 "event: xx"、"retry"
* @returns
*/
private processLine(line: string) {
if (
line &&
(line === "data" || line === "data:" || line.indexOf("data:") !== 0)
) {
console.log("break line", line);
}
if (line.length === 0) {
// console.log("line end ----------------------------> onMessage");
// 空行表示一个完整消息的结束
if (
this.message.data ||
this.message.event ||
this.message.id ||
this.message.retry !== undefined
) {
this.onMessage?.({
data: this.message.data,
event: this.message.event,
id: this.message.id,
retry: this.message.retry,
});
// 重置消息对象
this.message = {
data: "",
event: "",
id: "",
retry: undefined,
};
}
return;
}
// 查找字段名和值的分隔符
const colonIndex = line.indexOf(":");
/**
* 2种情况,整行作为上一个field类型的值
* - 没有冒号的情况
* - index大于5(以"data" | "event" | "id" | "retry" 开头的行,冒号index不会超过5)
*/
if (colonIndex === -1 || colonIndex > 5) {
this.processField(this.preField!, line.trim() as FieldType);
} else {
const field = line.substring(0, colonIndex).trim() as FieldType;
const value = line.substring(colonIndex + 1).trim();
this.processField(field, value);
}
}
/**
* 处理已识别类型的数据
* @param field
* @param value
*/
private processField(field: FieldType, value: string) {
this.preField = field;
switch (field) {
case "data":
// 连续接收多个 data 字段需要拼接,每行之间用换行符分隔
this.message.data = this.message.data
? this.message.data + "\n" + value
: value;
break;
case "event":
this.message.event = value;
break;
case "id":
this.message.id = value;
this.onId?.(value);
break;
case "retry":
const retry = parseInt(value, 10);
if (!isNaN(retry)) {
this.message.retry = retry;
this.onRetry?.(retry);
}
break;
default:
// 忽略未知字段
break;
}
}
// 提供方法获取当前消息状态
getCurrentMessage() {
return { ...this.message };
}
}2. Axios 实现
axios 的方案底层也是 xhr,实现方案和 xhr 原生实现也差不多,如果你的项目里已经用了 axios,可以直接使用此方案
import axios from "axios";
import { TextStreamTransToChunk, type Message } from "./TextStreamTransToChunk";
/**
* 使用 axios 实现流式数据获取
* 适用于不支持 fetch 流式响应的环境
* @param url 请求地址
* @param params 配置参数
* @returns 包含 cancel 方法的对象,用于取消请求
*/
export function createAxiosStream(
url: string,
params: {
method: string;
headers?: any;
body: any;
onMessage: (params: Message) => void;
onOpen?: () => void;
onClose?: () => void;
onFinish?: () => void;
onError?: (error: any) => void;
}
) {
const CancelToken = axios.CancelToken;
const source = CancelToken.source();
let processedLength = 0;
let isOpened = false;
const {
method,
body,
headers,
onMessage,
onOpen,
onClose,
onFinish,
onError,
} = params;
const textStreamTransform = new TextStreamTransToChunk({
onMessage: onMessage,
});
// 对于不支持流式响应的环境,使用轮询或者分块请求方式
axios({
url,
method: method,
data: body,
headers,
responseType: "text",
cancelToken: source.token,
onDownloadProgress: function (progressEvent) {
// currentTarget.responseText 可能无法获取到实时数据
const xhr = progressEvent?.event.currentTarget;
const responseText = xhr?.responseText;
// 检查连接是否已打开
if (xhr.readyState >= 1 && !isOpened) {
onOpen?.();
isOpened = true;
}
// console.log(xhr, responseText);
if (xhr.readyState === 3 && responseText) {
// LOADING
const newData = responseText.substring(processedLength);
if (newData.length > 0) {
textStreamTransform.addText(newData);
processedLength = xhr.responseText.length;
}
}
},
})
.then((response) => {
// 确保连接已打开
if (!isOpened) {
onOpen?.();
isOpened = true;
}
// 处理剩余数据
const remainingData = response.data.substring(processedLength);
if (remainingData.length > 0) {
textStreamTransform.addText(remainingData);
}
onFinish?.();
onClose?.();
})
.catch((error) => {
if (axios.isCancel(error)) {
// 确保连接已打开
if (!isOpened) {
onOpen?.();
isOpened = true;
}
console.log("Request canceled", error.message);
onClose?.();
} else {
// 确保连接已打开
if (!isOpened) {
onOpen?.();
isOpened = true;
}
onError?.(error);
onClose?.();
}
});
// 返回 cancel 方法用于取消请求
return {
abort: (message?: string) => {
// 确保连接已打开
if (!isOpened) {
onOpen?.();
isOpened = true;
}
source.cancel(message);
onClose?.();
},
};
}
// 测试代码
const test = () => {
const accessToken = "xxx";
createAxiosStream(
"https://ai-gateway-test.sany.com.cn/person/v1/ai/chat-messages",
{
method: "POST",
body: JSON.stringify({ query: "我有哪些待办" }),
headers: {
Authorization: "Bearer" + " " + accessToken,
"Content-Type": "application/json",
},
onMessage(e) {
console.log("chunk ", e);
},
}
);
};三、mock 一个流式响应
开发过程中,后端接口不一定及时开发完,此时需要 mock 流式响应,方便前端代码编写。 如果不想自己开发一个 mock 服务端,可以考虑前端 mock 数据:
创建
自行创建一个 ReadableStream 的流,作为 fetch 响应里的 response.body
import { generateUUID } from "./uuid";
import { sleep } from "@/api/mock";
export let useMock = false;
const _contentChunks =
"这是一个关于一只小狐狸和一座会唱歌的森林的故事:在遥远的北方,有一片被薄雾笼罩的古老森林,......。🌙✨你想听下一段吗?".match(
/.{1,4}/g
) as string[];
export function mockReadableStream(
askMessage: string,
conversation_id?: string
) {
const sseChunks: string[] = [];
const contentChunks = [
..._contentChunks,
`你的问题是:${askMessage}`,
"message_end占位",
];
for (let i = 0; i < contentChunks.length; i++) {
const sseEventPart = `event: message\ndata: {"answer": "${
contentChunks[i]
}","conversation_id": "${conversation_id || generateUUID()}","event": "${
i !== contentChunks.length - 1 ? "agent_message" : "message_end"
}","id": "${i}"}\n\n`;
sseChunks.push(sseEventPart);
}
return new ReadableStream({
async start(controller) {
for (const chunk of sseChunks) {
await sleep(200);
controller.enqueue(new TextEncoder().encode(chunk));
}
controller.close();
},
});
}
/**
* 读取sse文本流兼容方法
* @param response
* @param callback
* @returns
*/
export async function processStream(
response: Response,
callback: (chunk: { data: any }) => any
) {
const reader = response.body?.getReader();
if (!reader) {
return;
}
const decoder = new TextDecoder();
let partialChunk = "";
while (true) {
const { done, value } = await reader.read();
if (done) {
if (partialChunk)
callback({
data: partialChunk,
});
break;
}
const text = partialChunk + decoder.decode(value, { stream: true });
const lines = text.split("\n");
partialChunk = lines.pop() || "";
lines.forEach((line) => {
if (line) {
const jsonString = line.replace("data:", "");
jsonString &&
callback({
data: jsonString,
});
}
});
}
}使用 mock 流
const response = {
body: mockReadableStream(param.question, param.conversation_id),
ok: true,
};
processStream(response, (chunk) => {
console.log(chunk);
});四、如何流式渲染一个自定义UI的回答(而不是纯文本markdown渲染)?
- 当要渲染自定义UI时(如表格),需要大模型流式返回特殊格式的消息,一般是JSON
- 前后端约定好,JSON里有一个字段标识(如UI_TYPE):当前要渲染哪种自定义UI。每一种自定义UI都有一个标识值。
- 前端在接收流式响应时,一旦发现流的响应符合JSON特征或者特定结构,就开始尝试修复JSON(因为数据是流式返回,所以已经收到的数据大概率不是完整JSON)
- 使用jsonrepair修复残缺JSON数据,并渲染返回的数据
import { jsonrepair } from "jsonrepair";
/**
* 尝试修复json
* @param jsonString
* @returns
*/
export function repairAndParseJson(jsonString: string) {
if (!jsonString || typeof jsonString !== "string") {
return null;
}
try {
// 首先尝试直接解析
return JSON.parse(jsonString);
} catch (e) {
try {
// 使用jsonrepair修复
const repaired = jsonrepair(jsonString);
return JSON.parse(repaired);
} catch (e2) {
console.error("JSON修复失败:", e2);
return null;
}
}
}- 写自定义UI代码时要注意,JSON里的每个字段都有可能为空,要做全量的代码健壮性。