2025 年,当 ChatGPT、Claude、DeepSeek 等大模型应用全面普及后,「流式响应」从一个可选特性变成了开发者必须掌握的核心技能。据统计,全球 Top 100 的 AI 应用中,92% 采用了流式输出,而支撑这一能力的底层技术正是 Web Streams API。如果你还在用 response.json() 一次性等待整个响应,或者面对大文件上传时内存飙升束手无策,这篇文章会彻底改变你处理数据流的方式。
🔍 一、为什么你需要 Web Streams API
在传统 HTTP 请求模型中,客户端发送请求,服务端返回完整响应——整个过程中,数据被「攒」成一个整体再交付。这种模式在小数据量下没有问题,但在三个场景中会暴露出致命缺陷:
场景一:LLM 流式响应。 GPT-4 生成一篇 1000 字的文章需要 8-15 秒。如果等全部生成完再显示,用户会以为系统卡死。流式输出让用户在 0.5 秒内看到第一个字,体验截然不同。
场景二:大文件上传/下载。 一个 2GB 的视频文件,用 Blob 一次性读入内存会直接让浏览器 OOM(Out of Memory)。流式读取可以将内存占用控制在几十 MB 以内。
场景三:实时数据管道。 从 SSE(Server-Sent Events)接收股票行情、日志流、传感器数据,需要实时转换并渲染,而不是攒够一批再处理。
Web Streams API 的核心设计哲学是背压(Backpressure)——当下游处理速度跟不上上游生产速度时,自动减速,避免内存爆炸。这是 Node.js Streams 的浏览器标准化版本,从 2023 年起所有主流浏览器(Chrome 89+、Firefox 110+、Safari 16.4+)和 Node.js 18+ 都已完整支持。
💡 **提示:**Web Streams API 和 Node.js 的
stream模块是两套不同的 API,但可以通过ReadableStream.from()(Node.js 20+)和Readable.toWeb()/Readable.fromWeb()互相转换。
🌊 二、核心 API 深度解析
Web Streams API 定义了四种流类型:ReadableStream(可读流)、WritableStream(可写流)、TransformStream(转换流)和 ByteLengthQueuingStrategy / CountQueuingStrategy(队列策略)。实际开发中,最常用的是前三个。
2.1 ReadableStream:数据的源头
ReadableStream 是所有流式操作的起点。它内部维护一个队列,通过 pull 函数按需生产数据,通过 cancel 函数处理取消信号。
// 创建一个自定义 ReadableStream:生成斐波那契数列
function createFibonacciStream(count = 100) {
let a = 0, b = 1, index = 0;
return new ReadableStream({
start(controller) {
console.log('流已创建,开始生成斐波那契数列');
},
pull(controller) {
if (index >= count) {
controller.close(); // 达到数量上限,关闭流
return;
}
controller.enqueue(JSON.stringify({ index, value: a }));
[a, b] = [b, a + b];
index++;
},
cancel(reason) {
console.log(`流被取消:${reason}`);
}
}, {
highWaterMark: 16, // 队列缓冲区大小(对象个数)
});
}
// 消费 ReadableStream
const fibStream = createFibonacciStream(10);
const reader = fibStream.getReader();
while (true) {
const { done, value } = await reader.read();
if (done) break;
const item = JSON.parse(value);
console.log(`F(${item.index}) = ${item.value}`);
}
📌 记住:
highWaterMark是流的「水位线」——当内部队列中积压的 chunk 数量超过这个值时,pull函数将不再被调用,直到消费者读走数据。这就是背压机制的核心。
2.2 WritableStream:数据的终点
WritableStream 代表一个可写入的数据目标。它通过 write 函数接收数据,通过 close 和 abort 函数控制生命周期。
// 创建一个 WritableStream:将数据写入内存数组(模拟数据库写入)
function createBatchWriter(batchSize = 5) {
const buffer = [];
let totalWritten = 0;
return new WritableStream({
async write(chunk) {
buffer.push(chunk);
if (buffer.length >= batchSize) {
// 模拟批量写入数据库(如 INSERT INTO ... VALUES (...),(...),(...))
console.log(`批量写入 ${buffer.length} 条记录:`, buffer.map(c => c.id));
await new Promise(r => setTimeout(r, 50)); // 模拟 I/O 延迟
totalWritten += buffer.length;
buffer.length = 0; // 清空缓冲区
}
},
close() {
if (buffer.length > 0) {
console.log(`写入剩余 ${buffer.length} 条记录`);
totalWritten += buffer.length;
}
console.log(`✅ 流关闭,共写入 ${totalWritten} 条记录`);
},
abort(reason) {
console.error(`❌ 流被中止:${reason},缓冲区中 ${buffer.length} 条数据丢失`);
}
});
}
// 配合 ReadableStream 使用:pipeline 模式
const source = createFibonacciStream(12);
await source.pipeTo(createBatchWriter(5));
2.3 TransformStream:数据的转换
TransformStream 是 Web Streams API 最优雅的设计——它同时实现 ReadableStream 和 WritableStream,接收输入流,产出转换后的输出流。这是构建数据管道(Pipeline)的核心积木。
// 创建一个 JSON 解析 TransformStream
function createJSONParseTransform() {
return new TransformStream({
transform(chunk, controller) {
try {
// chunk 可能是 Uint8Array(来自 fetch)或 string
const text = typeof chunk === 'string' ? chunk
: new TextDecoder().decode(chunk);
const parsed = JSON.parse(text);
controller.enqueue(parsed);
} catch (e) {
controller.error(new Error(`JSON 解析失败: ${e.message}`));
}
}
});
}
⚡ 三、实战:LLM 流式响应处理
LLM 流式响应是 2025-2026 年 Web Streams API 最重要的应用场景。主流大模型 API(OpenAI、Claude、DeepSeek、通义千问)都支持 SSE(Server-Sent Events)格式的流式输出,返回格式为:
data: {"id":"chatcmpl-xxx","choices":[{"delta":{"content":"你"},"index":0}]}
data: {"id":"chatcmpl-xxx","choices":[{"delta":{"content":"好"},"index":0}]}
data: [DONE]
3.1 基础实现:逐字输出
// LLM 流式响应处理器:支持 OpenAI 兼容 API 格式
async function streamLLMResponse(apiUrl, apiKey, messages, onToken) {
const response = await fetch(apiUrl, {
method: 'POST',
headers: {
'Content-Type': 'application/json',
'Authorization': `Bearer ${apiKey}`,
},
body: JSON.stringify({
model: 'deepseek-chat',
messages,
stream: true, // 关键:启用流式输出
}),
});
if (!response.ok) {
throw new Error(`API 请求失败: ${response.status} ${response.statusText}`);
}
// 使用 ReadableStream 的 pipeThrough 链式处理
const reader = response.body
.pipeThrough(createSSEDecoder()) // 解码 Uint8Array → SSE 行
.pipeThrough(createSSEDataParser()) // 解析 SSE data 字段 → JSON
.getReader();
let fullText = '';
while (true) {
const { done, value } = await reader.read();
if (done) break;
// 处理每个 token
if (value === '[DONE]') break;
const token = value.choices?.[0]?.delta?.content || '';
if (token) {
fullText += token;
onToken(token, fullText); // 回调:单个 token + 累积全文
}
}
return fullText;
}
// SSE 解码器:将 Uint8Array 流转换为行文本流
function createSSEDecoder() {
let buffer = '';
return new TransformStream({
transform(chunk, controller) {
buffer += new TextDecoder().decode(chunk, { stream: true });
const lines = buffer.split('\n');
buffer = lines.pop() || ''; // 最后一行可能不完整,保留
for (const line of lines) {
const trimmed = line.trim();
if (trimmed) controller.enqueue(trimmed);
}
},
flush(controller) {
if (buffer.trim()) controller.enqueue(buffer.trim());
}
});
}
// SSE 数据解析器:提取 data 字段并解析 JSON
function createSSEDataParser() {
return new TransformStream({
transform(line, controller) {
if (line.startsWith('data: ')) {
const data = line.slice(6);
if (data === '[DONE]') {
controller.enqueue('[DONE]');
return;
}
try {
controller.enqueue(JSON.parse(data));
} catch (e) {
console.warn('SSE JSON 解析失败,跳过:', data);
}
}
// 忽略 event:、id:、retry: 等其他 SSE 字段
}
});
}
// 使用示例
const fullResponse = await streamLLMResponse(
'https://api.deepseek.com/v1/chat/completions',
'sk-xxx',
[{ role: 'user', content: '用一句话解释什么是背压' }],
(token, fullText) => {
// 在 UI 中实时显示
document.getElementById('output').textContent = fullText;
}
);
⚠️ **警告:**上面的 SSE 解码器假设每行以
\n分隔。实际场景中,一个 chunk 可能包含多个 SSE 事件,也可能一个事件被切分成多个 chunk。createSSEDecoder中的buffer机制正是为了解决这个问题——它确保只有完整的行才会被传递给下游。
3.2 进阶:带超时和取消的流式请求
生产环境中,你需要处理网络超时、用户取消、错误重试等情况:
// 带超时和取消的 LLM 流式请求
async function streamLLMWithAbort(apiUrl, apiKey, messages, options = {}) {
const {
timeout = 30000,
signal,
onToken,
onError,
retryCount = 2
} = options;
// 创建 AbortController 支持外部取消和超时
const controller = new AbortController();
const timeoutId = setTimeout(() => controller.abort(), timeout);
// 如果外部传入 signal,监听它
if (signal) {
signal.addEventListener('abort', () => controller.abort());
}
let attempt = 0;
let lastError;
while (attempt <= retryCount) {
try {
const response = await fetch(apiUrl, {
method: 'POST',
headers: {
'Content-Type': 'application/json',
'Authorization': `Bearer ${apiKey}`,
},
body: JSON.stringify({ model: 'deepseek-chat', messages, stream: true }),
signal: controller.signal,
});
clearTimeout(timeoutId);
if (!response.ok) {
const errorBody = await response.text();
throw new Error(`API ${response.status}: ${errorBody}`);
}
// 流式读取
const reader = response.body.getReader();
const decoder = new TextDecoder();
let buffer = '';
let fullText = '';
while (true) {
const { done, value } = await reader.read();
if (done) break;
buffer += decoder.decode(value, { stream: true });
const lines = buffer.split('\n');
buffer = lines.pop() || '';
for (const line of lines) {
if (!line.startsWith('data: ')) continue;
const data = line.slice(6);
if (data === '[DONE]') return fullText;
try {
const parsed = JSON.parse(data);
const token = parsed.choices?.[0]?.delta?.content || '';
if (token) {
fullText += token;
onToken?.(token, fullText);
}
} catch { /* 跳过格式异常的行 */ }
}
}
return fullText;
} catch (error) {
clearTimeout(timeoutId);
lastError = error;
// AbortError 不重试(用户主动取消或超时)
if (error.name === 'AbortError') {
throw new Error(signal?.aborted ? '用户取消' : '请求超时');
}
// 网络错误或 5xx 错误才重试
if (attempt < retryCount && isRetryableError(error)) {
attempt++;
const delay = Math.min(1000 * Math.pow(2, attempt), 10000); // 指数退避
console.warn(`第 ${attempt} 次重试,等待 ${delay}ms...`);
onError?.(error, attempt);
await new Promise(r => setTimeout(r, delay));
continue;
}
throw error;
}
}
throw lastError;
}
function isRetryableError(error) {
// 网络错误或服务端错误可重试
return error.message.includes('Failed to fetch') ||
error.message.includes('500') ||
error.message.includes('502') ||
error.message.includes('503');
}
// 使用示例:带取消按钮
const abortController = new AbortController();
document.getElementById('stopBtn').onclick = () => abortController.abort();
streamLLMWithAbort(
'https://api.deepseek.com/v1/chat/completions',
'sk-xxx',
[{ role: 'user', content: '详细解释 Web Streams API' }],
{
timeout: 60000,
signal: abortController.signal,
onToken: (token, full) => renderMarkdown(full),
onError: (err, attempt) => showToast(`重试中 (${attempt}/2)...`),
}
).then(text => console.log('完成'))
.catch(err => console.error('失败:', err.message));
🔧 四、更多实战场景
4.1 大文件分片上传
// 流式分片上传:将大文件切成固定大小的 chunk 逐个上传
async function streamUpload(file, uploadUrl, chunkSize = 1024 * 1024) {
const totalChunks = Math.ceil(file.size / chunkSize);
let uploadedBytes = 0;
// 将 File 对象转换为 ReadableStream
const fileStream = file.stream();
// 创建分片 TransformStream
const chunker = new TransformStream({
transform(chunk, controller) {
// chunk 来自 File.stream(),通常是 Uint8Array
// 这里简单传递,实际分片由上游控制
controller.enqueue(chunk);
}
});
// 更实用的方式:使用 slice + fetch 逐块上传
for (let i = 0; i < totalChunks; i++) {
const start = i * chunkSize;
const end = Math.min(start + chunkSize, file.size);
const chunk = file.slice(start, end);
const response = await fetch(uploadUrl, {
method: 'PATCH',
headers: {
'Content-Type': 'application/octet-stream',
'Content-Range': `bytes ${start}-${end - 1}/${file.size}`,
'X-Chunk-Index': String(i),
'X-Total-Chunks': String(totalChunks),
},
body: chunk,
});
if (!response.ok) {
throw new Error(`分片 ${i} 上传失败: ${response.status}`);
}
uploadedBytes += chunk.size;
const progress = (uploadedBytes / file.size * 100).toFixed(1);
console.log(`上传进度: ${progress}% (${uploadedBytes}/${file.size})`);
}
console.log('✅ 上传完成');
}
4.2 实时日志流处理管道
// 实时日志流处理管道:过滤 → 解析 → 聚合 → 展示
function createLogProcessingPipeline() {
// 第一级:过滤掉 DEBUG 级别日志
const logFilter = new TransformStream({
transform(line, controller) {
if (!line.includes('[DEBUG]')) {
controller.enqueue(line);
}
}
});
// 第二级:解析日志结构
const logParser = new TransformStream({
transform(line, controller) {
// 格式: [2026-06-04T10:30:00Z] [ERROR] module=message
const match = line.match(/^\[(.+?)\]\s*\[(\w+)\]\s*(.+)$/);
if (match) {
controller.enqueue({
timestamp: new Date(match[1]),
level: match[2],
message: match[3],
});
}
}
});
// 第三级:错误日志聚合(每 5 秒输出一次统计)
const errorAggregator = new TransformStream({
construct(controller) {
this.errorCount = 0;
this.errors = [];
this.timer = setInterval(() => {
if (this.errors.length > 0) {
controller.enqueue({
type: 'error_summary',
count: this.errorCount,
recent: this.errors.slice(-5),
timestamp: new Date(),
});
this.errors = [];
}
}, 5000);
},
transform(log, controller) {
if (log.level === 'ERROR' || log.level === 'FATAL') {
this.errorCount++;
this.errors.push(log);
}
// 所有日志原样传递给下游(用于实时展示)
controller.enqueue(log);
},
flush() {
clearInterval(this.timer);
}
});
return { logFilter, logParser, errorAggregator };
}
// 使用示例
const { logFilter, logParser, errorAggregator } = createLogProcessingPipeline();
// 假设 logStream 来自 SSE 或 WebSocket
logStream
.pipeThrough(logFilter)
.pipeThrough(logParser)
.pipeThrough(errorAggregator)
.pipeTo(new WritableStream({
write(log) {
if (log.type === 'error_summary') {
console.warn(`⚠️ 过去 5 秒内 ${log.count} 条错误日志`);
}
}
}));
📊 五、Web Streams vs 其他方案对比
在选择流式处理方案时,开发者常常在多种技术之间犹豫。以下是主流方案的对比:
| 对比维度 | Web Streams API | SSE (EventSource) | WebSocket | RxJS Observable |
|---|---|---|---|---|
| ✅ 标准化 | W3C 标准 | W3C 标准 | W3C 标准 | 第三方库 |
| ✅ 背压支持 | 原生支持 | ❌ 不支持 | ❌ 不支持 | 通过策略模拟 |
| ✅ 可组合性 | pipeTo/pipeThrough 链式组合 | ❌ 单一通道 | ❌ 单一通道 | ✅ 操作符链式组合 |
| ✅ 浏览器原生 | ✅ 无需 polyfill | ✅ 无需 polyfill | ✅ 无需 polyfill | ❌ 需要引入库(~16KB gzipped) |
| ✅ Node.js 兼容 | ✅ 18+ 原生支持 | ✅ 需第三方库 | ✅ 需第三方库 | ✅ 通用 |
| ❌ 适用方向 | 通用数据流处理 | 服务端→客户端单向推送 | 全双工实时通信 | 复杂异步数据流 |
| ❌ 主要限制 | 学习曲线较陡 | 只能服务端推送 | 协议开销较大 | 包体积大,心智负担重 |
⚡ **关键结论:**如果你的需求是「处理 fetch 响应流」或「构建数据转换管道」,Web Streams API 是最佳选择。如果只是简单的服务端推送,SSE 更简单。如果需要双向实时通信,WebSocket 更合适。三者不是竞争关系,而是各自解决不同层次的问题。
⚠️ 六、常见坑点与避坑指南
在生产环境中使用 Web Streams API,以下几个坑点值得特别注意:
坑点一:SSE 的 chunk 边界问题。 SSE 数据通过 TCP 传输,一个 TCP 包可能包含多个 SSE 事件,也可能一个 SSE 事件被拆成多个 TCP 包。永远不要假设一次 reader.read() 返回一个完整的 SSE 事件。上面代码中的 buffer 累积机制正是为了解决这个问题。
坑点二:流的锁定(Locking)。 一个 ReadableStream 同一时刻只能有一个 reader。如果你调用了 getReader(),在释放 reader 之前再次调用会抛出 TypeError: ReadableStream is locked。
// ❌ 错误写法:重复获取 reader
const reader = stream.getReader();
const reader2 = stream.getReader(); // 💥 TypeError!
// ✅ 正确写法:先释放再获取,或用 tee() 分叉
const [stream1, stream2] = stream.tee();
const reader1 = stream1.getReader();
const reader2 = stream2.getReader();
坑点三:tee() 的内存代价。 stream.tee() 会创建两个独立的副本,内部会缓存尚未被较慢消费者读取的数据。如果两个分支的消费速度差异很大,内存占用会持续增长。在高吞吐场景下慎用 tee()。
坑点四:浏览器兼容性注意。 虽然所有主流浏览器都支持 Web Streams API,但 Safari 16.4 之前的版本不支持 ReadableStream.from()。如果你需要支持旧版 Safari,使用 polyfill 或手动创建 ReadableStream。
⚠️ **警告:**在 Service Worker 中使用 Web Streams API 时,
response.body的流一旦被读取就不能再次读取。如果你需要同时读取响应体并将其缓存,使用response.clone()或stream.tee()。
💡 七、总结与建议
Web Streams API 是现代 Web 开发中被严重低估的基础设施。它不仅仅是「fetch 响应的读取方式」,而是一套完整的数据流处理范式。我的建议是:
- ✅ LLM 应用必须用流式输出——用户体验差距是数量级的,0.5 秒看到第一个字 vs 15 秒后才看到全部内容,前者留存率高出 40%
- ✅ 大文件处理优先用流式方案——内存占用从 O(n) 降到 O(1),这是质变而非量变
- ✅ 用 TransformStream 构建数据管道——比手动回调嵌套优雅得多,且天然支持背压
- ❌ 不要用 SSE 替代 WebSocket 做双向通信——SSE 只能服务端推送到客户端
- ❌ 不要忽略错误处理和取消机制——生产环境中,流的中断和恢复是常态
如果你正在构建 AI 应用,强烈建议将流式响应处理封装成一个可复用的 hook 或 composable。以 Vue 3 为例:
// composables/useLLMStream.js
import { ref, readonly } from 'vue';
export function useLLMStream() {
const text = ref('');
const isStreaming = ref(false);
const error = ref(null);
let abortController = null;
async function send(apiUrl, apiKey, messages) {
abortController?.abort();
abortController = new AbortController();
text.value = '';
isStreaming.value = true;
error.value = null;
try {
const response = await fetch(apiUrl, {
method: 'POST',
headers: {
'Content-Type': 'application/json',
'Authorization': `Bearer ${apiKey}`,
},
body: JSON.stringify({ model: 'deepseek-chat', messages, stream: true }),
signal: abortController.signal,
});
const reader = response.body.getReader();
const decoder = new TextDecoder();
let buffer = '';
while (true) {
const { done, value } = await reader.read();
if (done) break;
buffer += decoder.decode(value, { stream: true });
const lines = buffer.split('\n');
buffer = lines.pop() || '';
for (const line of lines) {
if (!line.startsWith('data: ') || line === 'data: [DONE]') continue;
try {
const token = JSON.parse(line.slice(6)).choices?.[0]?.delta?.content;
if (token) text.value += token;
} catch {}
}
}
} catch (e) {
if (e.name !== 'AbortError') error.value = e.message;
} finally {
isStreaming.value = false;
}
}
function stop() {
abortController?.abort();
}
return { text: readonly(text), isStreaming: readonly(isStreaming), error: readonly(error), send, stop };
}
Web Streams API 的学习曲线确实比 await response.json() 陡峭,但一旦掌握,你会发现它能解决的问题远比你想象的多——从 LLM 流式响应到大文件处理,从实时日志管道到音视频流处理,这套 API 都是浏览器端最优雅的解决方案。
🔗 相关工具与资源
- 📖 MDN Web Streams API 文档 — 最权威的 API 参考
- 🔧 web-streams-polyfill — 旧浏览器 polyfill
- 📖 Node.js Streams 文档 — Node.js 端的流 API(可通过
Readable.toWeb()转换) - 🔧 DeepSeek API 文档 — 国内性价比最高的 LLM 流式 API
- 🔧 jsjson.com 在线 JSON 格式化工具 — 处理 API 响应数据