Web Streams API 实战:ReadableStream 与 LLM 流式响应处理完全指南

深入解析 Web Streams API 的 ReadableStream、WritableStream、TransformStream,结合 LLM 流式响应、大文件上传、实时数据管道等实战场景,附完整代码与性能对比。

前端开发 2026-06-03 12 分钟

2025 年,当 ChatGPT、Claude、DeepSeek 等大模型应用全面普及后,「流式响应」从一个可选特性变成了开发者必须掌握的核心技能。据统计,全球 Top 100 的 AI 应用中,92% 采用了流式输出,而支撑这一能力的底层技术正是 Web Streams API。如果你还在用 response.json() 一次性等待整个响应,或者面对大文件上传时内存飙升束手无策,这篇文章会彻底改变你处理数据流的方式。

🔍 一、为什么你需要 Web Streams API

在传统 HTTP 请求模型中,客户端发送请求,服务端返回完整响应——整个过程中,数据被「攒」成一个整体再交付。这种模式在小数据量下没有问题,但在三个场景中会暴露出致命缺陷:

场景一:LLM 流式响应。 GPT-4 生成一篇 1000 字的文章需要 8-15 秒。如果等全部生成完再显示,用户会以为系统卡死。流式输出让用户在 0.5 秒内看到第一个字,体验截然不同。

场景二:大文件上传/下载。 一个 2GB 的视频文件,用 Blob 一次性读入内存会直接让浏览器 OOM(Out of Memory)。流式读取可以将内存占用控制在几十 MB 以内。

场景三:实时数据管道。 从 SSE(Server-Sent Events)接收股票行情、日志流、传感器数据,需要实时转换并渲染,而不是攒够一批再处理。

Web Streams API 的核心设计哲学是背压(Backpressure)——当下游处理速度跟不上上游生产速度时,自动减速,避免内存爆炸。这是 Node.js Streams 的浏览器标准化版本,从 2023 年起所有主流浏览器(Chrome 89+、Firefox 110+、Safari 16.4+)和 Node.js 18+ 都已完整支持。

💡 **提示:**Web Streams API 和 Node.js 的 stream 模块是两套不同的 API,但可以通过 ReadableStream.from()(Node.js 20+)和 Readable.toWeb() / Readable.fromWeb() 互相转换。

🌊 二、核心 API 深度解析

Web Streams API 定义了四种流类型:ReadableStream(可读流)、WritableStream(可写流)、TransformStream(转换流)和 ByteLengthQueuingStrategy / CountQueuingStrategy(队列策略)。实际开发中,最常用的是前三个。

2.1 ReadableStream:数据的源头

ReadableStream 是所有流式操作的起点。它内部维护一个队列,通过 pull 函数按需生产数据,通过 cancel 函数处理取消信号。

// 创建一个自定义 ReadableStream:生成斐波那契数列
function createFibonacciStream(count = 100) {
  let a = 0, b = 1, index = 0;
  
  return new ReadableStream({
    start(controller) {
      console.log('流已创建,开始生成斐波那契数列');
    },
    pull(controller) {
      if (index >= count) {
        controller.close();  // 达到数量上限,关闭流
        return;
      }
      controller.enqueue(JSON.stringify({ index, value: a }));
      [a, b] = [b, a + b];
      index++;
    },
    cancel(reason) {
      console.log(`流被取消:${reason}`);
    }
  }, {
    highWaterMark: 16,  // 队列缓冲区大小(对象个数)
  });
}

// 消费 ReadableStream
const fibStream = createFibonacciStream(10);
const reader = fibStream.getReader();

while (true) {
  const { done, value } = await reader.read();
  if (done) break;
  const item = JSON.parse(value);
  console.log(`F(${item.index}) = ${item.value}`);
}

📌 记住:highWaterMark 是流的「水位线」——当内部队列中积压的 chunk 数量超过这个值时,pull 函数将不再被调用,直到消费者读走数据。这就是背压机制的核心。

2.2 WritableStream:数据的终点

WritableStream 代表一个可写入的数据目标。它通过 write 函数接收数据,通过 closeabort 函数控制生命周期。

// 创建一个 WritableStream:将数据写入内存数组(模拟数据库写入)
function createBatchWriter(batchSize = 5) {
  const buffer = [];
  let totalWritten = 0;

  return new WritableStream({
    async write(chunk) {
      buffer.push(chunk);
      if (buffer.length >= batchSize) {
        // 模拟批量写入数据库(如 INSERT INTO ... VALUES (...),(...),(...))
        console.log(`批量写入 ${buffer.length} 条记录:`, buffer.map(c => c.id));
        await new Promise(r => setTimeout(r, 50)); // 模拟 I/O 延迟
        totalWritten += buffer.length;
        buffer.length = 0; // 清空缓冲区
      }
    },
    close() {
      if (buffer.length > 0) {
        console.log(`写入剩余 ${buffer.length} 条记录`);
        totalWritten += buffer.length;
      }
      console.log(`✅ 流关闭,共写入 ${totalWritten} 条记录`);
    },
    abort(reason) {
      console.error(`❌ 流被中止:${reason},缓冲区中 ${buffer.length} 条数据丢失`);
    }
  });
}

// 配合 ReadableStream 使用:pipeline 模式
const source = createFibonacciStream(12);
await source.pipeTo(createBatchWriter(5));

2.3 TransformStream:数据的转换

TransformStream 是 Web Streams API 最优雅的设计——它同时实现 ReadableStreamWritableStream,接收输入流,产出转换后的输出流。这是构建数据管道(Pipeline)的核心积木。

// 创建一个 JSON 解析 TransformStream
function createJSONParseTransform() {
  return new TransformStream({
    transform(chunk, controller) {
      try {
        // chunk 可能是 Uint8Array(来自 fetch)或 string
        const text = typeof chunk === 'string' ? chunk 
          : new TextDecoder().decode(chunk);
        const parsed = JSON.parse(text);
        controller.enqueue(parsed);
      } catch (e) {
        controller.error(new Error(`JSON 解析失败: ${e.message}`));
      }
    }
  });
}

⚡ 三、实战:LLM 流式响应处理

LLM 流式响应是 2025-2026 年 Web Streams API 最重要的应用场景。主流大模型 API(OpenAI、Claude、DeepSeek、通义千问)都支持 SSE(Server-Sent Events)格式的流式输出,返回格式为:

data: {"id":"chatcmpl-xxx","choices":[{"delta":{"content":"你"},"index":0}]}
data: {"id":"chatcmpl-xxx","choices":[{"delta":{"content":"好"},"index":0}]}
data: [DONE]

3.1 基础实现:逐字输出

// LLM 流式响应处理器:支持 OpenAI 兼容 API 格式
async function streamLLMResponse(apiUrl, apiKey, messages, onToken) {
  const response = await fetch(apiUrl, {
    method: 'POST',
    headers: {
      'Content-Type': 'application/json',
      'Authorization': `Bearer ${apiKey}`,
    },
    body: JSON.stringify({
      model: 'deepseek-chat',
      messages,
      stream: true,  // 关键:启用流式输出
    }),
  });

  if (!response.ok) {
    throw new Error(`API 请求失败: ${response.status} ${response.statusText}`);
  }

  // 使用 ReadableStream 的 pipeThrough 链式处理
  const reader = response.body
    .pipeThrough(createSSEDecoder())     // 解码 Uint8Array → SSE 行
    .pipeThrough(createSSEDataParser())   // 解析 SSE data 字段 → JSON
    .getReader();

  let fullText = '';

  while (true) {
    const { done, value } = await reader.read();
    if (done) break;

    // 处理每个 token
    if (value === '[DONE]') break;

    const token = value.choices?.[0]?.delta?.content || '';
    if (token) {
      fullText += token;
      onToken(token, fullText);  // 回调:单个 token + 累积全文
    }
  }

  return fullText;
}

// SSE 解码器:将 Uint8Array 流转换为行文本流
function createSSEDecoder() {
  let buffer = '';
  return new TransformStream({
    transform(chunk, controller) {
      buffer += new TextDecoder().decode(chunk, { stream: true });
      const lines = buffer.split('\n');
      buffer = lines.pop() || '';  // 最后一行可能不完整,保留
      for (const line of lines) {
        const trimmed = line.trim();
        if (trimmed) controller.enqueue(trimmed);
      }
    },
    flush(controller) {
      if (buffer.trim()) controller.enqueue(buffer.trim());
    }
  });
}

// SSE 数据解析器:提取 data 字段并解析 JSON
function createSSEDataParser() {
  return new TransformStream({
    transform(line, controller) {
      if (line.startsWith('data: ')) {
        const data = line.slice(6);
        if (data === '[DONE]') {
          controller.enqueue('[DONE]');
          return;
        }
        try {
          controller.enqueue(JSON.parse(data));
        } catch (e) {
          console.warn('SSE JSON 解析失败,跳过:', data);
        }
      }
      // 忽略 event:、id:、retry: 等其他 SSE 字段
    }
  });
}

// 使用示例
const fullResponse = await streamLLMResponse(
  'https://api.deepseek.com/v1/chat/completions',
  'sk-xxx',
  [{ role: 'user', content: '用一句话解释什么是背压' }],
  (token, fullText) => {
    // 在 UI 中实时显示
    document.getElementById('output').textContent = fullText;
  }
);

⚠️ **警告:**上面的 SSE 解码器假设每行以 \n 分隔。实际场景中,一个 chunk 可能包含多个 SSE 事件,也可能一个事件被切分成多个 chunk。createSSEDecoder 中的 buffer 机制正是为了解决这个问题——它确保只有完整的行才会被传递给下游。

3.2 进阶:带超时和取消的流式请求

生产环境中,你需要处理网络超时、用户取消、错误重试等情况:

// 带超时和取消的 LLM 流式请求
async function streamLLMWithAbort(apiUrl, apiKey, messages, options = {}) {
  const { 
    timeout = 30000, 
    signal, 
    onToken, 
    onError,
    retryCount = 2 
  } = options;

  // 创建 AbortController 支持外部取消和超时
  const controller = new AbortController();
  const timeoutId = setTimeout(() => controller.abort(), timeout);

  // 如果外部传入 signal,监听它
  if (signal) {
    signal.addEventListener('abort', () => controller.abort());
  }

  let attempt = 0;
  let lastError;

  while (attempt <= retryCount) {
    try {
      const response = await fetch(apiUrl, {
        method: 'POST',
        headers: {
          'Content-Type': 'application/json',
          'Authorization': `Bearer ${apiKey}`,
        },
        body: JSON.stringify({ model: 'deepseek-chat', messages, stream: true }),
        signal: controller.signal,
      });

      clearTimeout(timeoutId);

      if (!response.ok) {
        const errorBody = await response.text();
        throw new Error(`API ${response.status}: ${errorBody}`);
      }

      // 流式读取
      const reader = response.body.getReader();
      const decoder = new TextDecoder();
      let buffer = '';
      let fullText = '';

      while (true) {
        const { done, value } = await reader.read();
        if (done) break;

        buffer += decoder.decode(value, { stream: true });
        const lines = buffer.split('\n');
        buffer = lines.pop() || '';

        for (const line of lines) {
          if (!line.startsWith('data: ')) continue;
          const data = line.slice(6);
          if (data === '[DONE]') return fullText;

          try {
            const parsed = JSON.parse(data);
            const token = parsed.choices?.[0]?.delta?.content || '';
            if (token) {
              fullText += token;
              onToken?.(token, fullText);
            }
          } catch { /* 跳过格式异常的行 */ }
        }
      }

      return fullText;

    } catch (error) {
      clearTimeout(timeoutId);
      lastError = error;

      // AbortError 不重试(用户主动取消或超时)
      if (error.name === 'AbortError') {
        throw new Error(signal?.aborted ? '用户取消' : '请求超时');
      }

      // 网络错误或 5xx 错误才重试
      if (attempt < retryCount && isRetryableError(error)) {
        attempt++;
        const delay = Math.min(1000 * Math.pow(2, attempt), 10000); // 指数退避
        console.warn(`第 ${attempt} 次重试,等待 ${delay}ms...`);
        onError?.(error, attempt);
        await new Promise(r => setTimeout(r, delay));
        continue;
      }

      throw error;
    }
  }

  throw lastError;
}

function isRetryableError(error) {
  // 网络错误或服务端错误可重试
  return error.message.includes('Failed to fetch') || 
         error.message.includes('500') || 
         error.message.includes('502') || 
         error.message.includes('503');
}

// 使用示例:带取消按钮
const abortController = new AbortController();

document.getElementById('stopBtn').onclick = () => abortController.abort();

streamLLMWithAbort(
  'https://api.deepseek.com/v1/chat/completions',
  'sk-xxx',
  [{ role: 'user', content: '详细解释 Web Streams API' }],
  {
    timeout: 60000,
    signal: abortController.signal,
    onToken: (token, full) => renderMarkdown(full),
    onError: (err, attempt) => showToast(`重试中 (${attempt}/2)...`),
  }
).then(text => console.log('完成'))
 .catch(err => console.error('失败:', err.message));

🔧 四、更多实战场景

4.1 大文件分片上传

// 流式分片上传:将大文件切成固定大小的 chunk 逐个上传
async function streamUpload(file, uploadUrl, chunkSize = 1024 * 1024) {
  const totalChunks = Math.ceil(file.size / chunkSize);
  let uploadedBytes = 0;

  // 将 File 对象转换为 ReadableStream
  const fileStream = file.stream();

  // 创建分片 TransformStream
  const chunker = new TransformStream({
    transform(chunk, controller) {
      // chunk 来自 File.stream(),通常是 Uint8Array
      // 这里简单传递,实际分片由上游控制
      controller.enqueue(chunk);
    }
  });

  // 更实用的方式:使用 slice + fetch 逐块上传
  for (let i = 0; i < totalChunks; i++) {
    const start = i * chunkSize;
    const end = Math.min(start + chunkSize, file.size);
    const chunk = file.slice(start, end);

    const response = await fetch(uploadUrl, {
      method: 'PATCH',
      headers: {
        'Content-Type': 'application/octet-stream',
        'Content-Range': `bytes ${start}-${end - 1}/${file.size}`,
        'X-Chunk-Index': String(i),
        'X-Total-Chunks': String(totalChunks),
      },
      body: chunk,
    });

    if (!response.ok) {
      throw new Error(`分片 ${i} 上传失败: ${response.status}`);
    }

    uploadedBytes += chunk.size;
    const progress = (uploadedBytes / file.size * 100).toFixed(1);
    console.log(`上传进度: ${progress}% (${uploadedBytes}/${file.size})`);
  }

  console.log('✅ 上传完成');
}

4.2 实时日志流处理管道

// 实时日志流处理管道:过滤 → 解析 → 聚合 → 展示
function createLogProcessingPipeline() {
  // 第一级:过滤掉 DEBUG 级别日志
  const logFilter = new TransformStream({
    transform(line, controller) {
      if (!line.includes('[DEBUG]')) {
        controller.enqueue(line);
      }
    }
  });

  // 第二级:解析日志结构
  const logParser = new TransformStream({
    transform(line, controller) {
      // 格式: [2026-06-04T10:30:00Z] [ERROR] module=message
      const match = line.match(/^\[(.+?)\]\s*\[(\w+)\]\s*(.+)$/);
      if (match) {
        controller.enqueue({
          timestamp: new Date(match[1]),
          level: match[2],
          message: match[3],
        });
      }
    }
  });

  // 第三级:错误日志聚合(每 5 秒输出一次统计)
  const errorAggregator = new TransformStream({
    construct(controller) {
      this.errorCount = 0;
      this.errors = [];
      this.timer = setInterval(() => {
        if (this.errors.length > 0) {
          controller.enqueue({
            type: 'error_summary',
            count: this.errorCount,
            recent: this.errors.slice(-5),
            timestamp: new Date(),
          });
          this.errors = [];
        }
      }, 5000);
    },
    transform(log, controller) {
      if (log.level === 'ERROR' || log.level === 'FATAL') {
        this.errorCount++;
        this.errors.push(log);
      }
      // 所有日志原样传递给下游(用于实时展示)
      controller.enqueue(log);
    },
    flush() {
      clearInterval(this.timer);
    }
  });

  return { logFilter, logParser, errorAggregator };
}

// 使用示例
const { logFilter, logParser, errorAggregator } = createLogProcessingPipeline();

// 假设 logStream 来自 SSE 或 WebSocket
logStream
  .pipeThrough(logFilter)
  .pipeThrough(logParser)
  .pipeThrough(errorAggregator)
  .pipeTo(new WritableStream({
    write(log) {
      if (log.type === 'error_summary') {
        console.warn(`⚠️ 过去 5 秒内 ${log.count} 条错误日志`);
      }
    }
  }));

📊 五、Web Streams vs 其他方案对比

在选择流式处理方案时,开发者常常在多种技术之间犹豫。以下是主流方案的对比:

对比维度 Web Streams API SSE (EventSource) WebSocket RxJS Observable
✅ 标准化 W3C 标准 W3C 标准 W3C 标准 第三方库
✅ 背压支持 原生支持 ❌ 不支持 ❌ 不支持 通过策略模拟
✅ 可组合性 pipeTo/pipeThrough 链式组合 ❌ 单一通道 ❌ 单一通道 ✅ 操作符链式组合
✅ 浏览器原生 ✅ 无需 polyfill ✅ 无需 polyfill ✅ 无需 polyfill ❌ 需要引入库(~16KB gzipped)
✅ Node.js 兼容 ✅ 18+ 原生支持 ✅ 需第三方库 ✅ 需第三方库 ✅ 通用
❌ 适用方向 通用数据流处理 服务端→客户端单向推送 全双工实时通信 复杂异步数据流
❌ 主要限制 学习曲线较陡 只能服务端推送 协议开销较大 包体积大,心智负担重

⚡ **关键结论:**如果你的需求是「处理 fetch 响应流」或「构建数据转换管道」,Web Streams API 是最佳选择。如果只是简单的服务端推送,SSE 更简单。如果需要双向实时通信,WebSocket 更合适。三者不是竞争关系,而是各自解决不同层次的问题。

⚠️ 六、常见坑点与避坑指南

在生产环境中使用 Web Streams API,以下几个坑点值得特别注意:

坑点一:SSE 的 chunk 边界问题。 SSE 数据通过 TCP 传输,一个 TCP 包可能包含多个 SSE 事件,也可能一个 SSE 事件被拆成多个 TCP 包。永远不要假设一次 reader.read() 返回一个完整的 SSE 事件。上面代码中的 buffer 累积机制正是为了解决这个问题。

坑点二:流的锁定(Locking)。 一个 ReadableStream 同一时刻只能有一个 reader。如果你调用了 getReader(),在释放 reader 之前再次调用会抛出 TypeError: ReadableStream is locked

// ❌ 错误写法:重复获取 reader
const reader = stream.getReader();
const reader2 = stream.getReader(); // 💥 TypeError!

// ✅ 正确写法:先释放再获取,或用 tee() 分叉
const [stream1, stream2] = stream.tee();
const reader1 = stream1.getReader();
const reader2 = stream2.getReader();

坑点三:tee() 的内存代价。 stream.tee() 会创建两个独立的副本,内部会缓存尚未被较慢消费者读取的数据。如果两个分支的消费速度差异很大,内存占用会持续增长。在高吞吐场景下慎用 tee()

坑点四:浏览器兼容性注意。 虽然所有主流浏览器都支持 Web Streams API,但 Safari 16.4 之前的版本不支持 ReadableStream.from()。如果你需要支持旧版 Safari,使用 polyfill 或手动创建 ReadableStream

⚠️ **警告:**在 Service Worker 中使用 Web Streams API 时,response.body 的流一旦被读取就不能再次读取。如果你需要同时读取响应体并将其缓存,使用 response.clone()stream.tee()

💡 七、总结与建议

Web Streams API 是现代 Web 开发中被严重低估的基础设施。它不仅仅是「fetch 响应的读取方式」,而是一套完整的数据流处理范式。我的建议是:

  • LLM 应用必须用流式输出——用户体验差距是数量级的,0.5 秒看到第一个字 vs 15 秒后才看到全部内容,前者留存率高出 40%
  • 大文件处理优先用流式方案——内存占用从 O(n) 降到 O(1),这是质变而非量变
  • 用 TransformStream 构建数据管道——比手动回调嵌套优雅得多,且天然支持背压
  • 不要用 SSE 替代 WebSocket 做双向通信——SSE 只能服务端推送到客户端
  • 不要忽略错误处理和取消机制——生产环境中,流的中断和恢复是常态

如果你正在构建 AI 应用,强烈建议将流式响应处理封装成一个可复用的 hook 或 composable。以 Vue 3 为例:

// composables/useLLMStream.js
import { ref, readonly } from 'vue';

export function useLLMStream() {
  const text = ref('');
  const isStreaming = ref(false);
  const error = ref(null);
  let abortController = null;

  async function send(apiUrl, apiKey, messages) {
    abortController?.abort();
    abortController = new AbortController();
    
    text.value = '';
    isStreaming.value = true;
    error.value = null;

    try {
      const response = await fetch(apiUrl, {
        method: 'POST',
        headers: {
          'Content-Type': 'application/json',
          'Authorization': `Bearer ${apiKey}`,
        },
        body: JSON.stringify({ model: 'deepseek-chat', messages, stream: true }),
        signal: abortController.signal,
      });

      const reader = response.body.getReader();
      const decoder = new TextDecoder();
      let buffer = '';

      while (true) {
        const { done, value } = await reader.read();
        if (done) break;

        buffer += decoder.decode(value, { stream: true });
        const lines = buffer.split('\n');
        buffer = lines.pop() || '';

        for (const line of lines) {
          if (!line.startsWith('data: ') || line === 'data: [DONE]') continue;
          try {
            const token = JSON.parse(line.slice(6)).choices?.[0]?.delta?.content;
            if (token) text.value += token;
          } catch {}
        }
      }
    } catch (e) {
      if (e.name !== 'AbortError') error.value = e.message;
    } finally {
      isStreaming.value = false;
    }
  }

  function stop() {
    abortController?.abort();
  }

  return { text: readonly(text), isStreaming: readonly(isStreaming), error: readonly(error), send, stop };
}

Web Streams API 的学习曲线确实比 await response.json() 陡峭,但一旦掌握,你会发现它能解决的问题远比你想象的多——从 LLM 流式响应到大文件处理,从实时日志管道到音视频流处理,这套 API 都是浏览器端最优雅的解决方案。

🔗 相关工具与资源

📚 相关文章