CHAPTER 02

流式响应处理

实现 SSE/Chunk 流式输出,逐字呈现 LLM 回复

1. 为什么需要流式响应

上一章的 chat 方法是"一次性"的 —— 发送请求后等待 LLM 生成完所有内容,再一次性返回。对于短回复这没问题,但 LLM 经常需要生成几百甚至上千 token 的回复,等待时间可能长达 10-30 秒。

流式响应(Streaming)解决了这个问题:LLM 每生成一小段文本就立即发送给客户端,用户看到文字一个一个蹦出来,感知延迟从十几秒降到不到一秒。这就是你在 ChatGPT、Claude 聊天界面看到的"打字机效果"。

Non-streaming Request Waiting... (10-30s) Full response Streaming Request chunk1 chunk2 chunk3 chunk4 ... ~0.5s first token latency
流式响应的技术本质
流式传输不是什么新技术。Anthropic 使用 SSE(Server-Sent Events)协议,OpenAI 也是类似的机制。HTTP 响应的 Content-Typetext/event-stream,服务端持续发送 data: 开头的事件行,客户端逐行读取解析。

对于 Agent 来说,流式不只是"好看"。它还有一个实际好处:用户可以在 LLM 输出的过程中及时中断,避免浪费 token。

2. StreamEvent 类型

在上一章的类型定义文件中,我们已经定义了 StreamEvent

// Streaming event types
export interface StreamEvent {
  type: "message_start" | "text_delta" | "message_stop" | "error";
  text?: string;
}
@dataclass
class StreamEvent:
    """Event emitted during streaming."""
    type: str  # "message_start", "text_delta", "message_stop", "error"
    text: str | None = None

这四种事件构成了流式通信的完整生命周期:

事件含义text 字段
message_start流开始,准备接收
text_delta一小段文本到达增量文本
message_stop流结束,生成完毕
error出错错误信息

事件序列始终是:message_start → 若干 text_deltamessage_stop。完整文本 = 所有 text_deltatext 拼接。

3. Anthropic 的流式协议

Anthropic SDK 提供了 messages.stream() 方法,返回一个异步可迭代对象。底层是 SSE 协议,SDK 已经帮我们解析好了。

看看 AnthropicProvider.stream 的实现:

async *stream(
  messages: Message[],
  options?: ChatOptions
): AsyncIterable<StreamEvent> {
  const params: Record<string, unknown> = {
    model: this.model,
    max_tokens: options?.maxTokens ?? 4096,
    messages: messages.map((m) => ({ role: m.role, content: m.content })),
  };
  if (options?.system) params.system = options.system;

  const stream = this.client.messages.stream(
    params as Anthropic.MessageCreateParamsNonStreaming
  );

  yield { type: "message_start" };

  for await (const event of stream) {
    if (
      event.type === "content_block_delta" &&
      event.delta.type === "text_delta"
    ) {
      yield { type: "text_delta", text: event.delta.text };
    }
  }

  yield { type: "message_stop" };
}
async def stream(
    self, messages: list[Message], options: ChatOptions | None = None
) -> AsyncIterator[StreamEvent]:
    options = options or ChatOptions()

    params: dict = {
        "model": self._model,
        "max_tokens": options.max_tokens or 4096,
        "messages": [
            {"role": m.role, "content": m.content} for m in messages
        ],
    }
    if options.system:
        params["system"] = options.system

    yield StreamEvent(type="message_start")

    async with self._client.messages.stream(**params) as stream:
        async for event in stream:
            if (
                event.type == "content_block_delta"
                and event.delta.type == "text_delta"
            ):
                yield StreamEvent(type="text_delta", text=event.delta.text)

    yield StreamEvent(type="message_stop")

注意 async * 语法 —— 这是一个 AsyncGenerator,可以使用 yield 逐个产出事件。外部用 for await...of 消费。

Anthropic SSE 原始事件很多(message_startcontent_block_startcontent_block_deltamessage_stop 等),我们只关心 content_block_deltatype === "text_delta" 的事件,它携带实际的文本增量。

4. OpenAI 的流式协议

OpenAI 的流式同样基于 SSE,但数据格式不同。每个 chunk 是一个包含 choices[0].delta.content 的 JSON 对象。

async *stream(
  messages: Message[],
  options?: ChatOptions
): AsyncIterable<StreamEvent> {
  const stream = await this.client.chat.completions.create({
    model: this.model,
    max_tokens: options?.maxTokens ?? 4096,
    messages: this.formatMessages(messages, options?.system),
    stream: true,
  });

  yield { type: "message_start" };

  for await (const chunk of stream) {
    const delta = chunk.choices[0]?.delta;
    if (delta?.content) {
      yield { type: "text_delta", text: delta.content };
    }
  }

  yield { type: "message_stop" };
}
async def stream(
    self, messages: list[Message], options: ChatOptions | None = None
) -> AsyncIterator[StreamEvent]:
    options = options or ChatOptions()

    response = await self._client.chat.completions.create(
        model=self._model,
        max_tokens=options.max_tokens or 4096,
        messages=self._format_messages(messages, options.system),
        stream=True,
    )

    yield StreamEvent(type="message_start")

    async for chunk in response:
        delta = chunk.choices[0].delta if chunk.choices else None
        if delta and delta.content:
            yield StreamEvent(type="text_delta", text=delta.content)

    yield StreamEvent(type="message_stop")

和 Anthropic 几乎一模一样的模式!区别只在于:

两个 Provider 的 stream 方法签名完全一样 —— 都返回 AsyncIterable<StreamEvent>。上层代码不需要知道底层用的是哪家的 API,用 for await 消费就行。这就是接口抽象的威力。

5. 异步迭代与异步生成器

理解异步迭代是理解流式处理的关键。两种语言都有对应的语法:TypeScript 的 async * / for await...of,Python 的 async def + yield / async for。简单梳理一下核心概念。

异步可迭代对象

TypeScript 中是 AsyncIterable<T> 接口,Python 中是 AsyncIterator[T] 协议。你可以用 for await...of(TS)或 async for(Python)来消费它。

异步生成器

TypeScript 用 async function*,Python 用包含 yieldasync def。两者都能在函数内部逐个产出值,让消费者按需拉取。

// Producer: 异步生成器
async function* countdown(n: number): AsyncIterable<number> {
  for (let i = n; i > 0; i--) {
    await new Promise(resolve => setTimeout(resolve, 1000));
    yield i;
  }
}

// Consumer: for await...of
for await (const num of countdown(3)) {
  console.log(num); // 3, 2, 1 (each after 1s)
}
# Producer: async generator
import asyncio
from typing import AsyncIterator

async def countdown(n: int) -> AsyncIterator[int]:
    for i in range(n, 0, -1):
        await asyncio.sleep(1)
        yield i

# Consumer: async for
async for num in countdown(3):
    print(num)  # 3, 2, 1 (each after 1s)
为什么用异步迭代而不是 Callback?
你也可以用回调实现流式:stream(msgs, callback)。但异步迭代有几个明显优势:

6. 测试流式响应

测试流式逻辑的关键是构造一个 mock 的异步可迭代对象。

import { describe, it, expect, vi } from "vitest";
import { AnthropicProvider } from "../src/llm/anthropic.js";
import { OpenAICompatibleProvider } from "../src/llm/openai-compatible.js";
import type { Message, StreamEvent } from "../src/llm/types.js";

// Helper to create async iterable from array
async function* asyncIterable<T>(items: T[]): AsyncIterable<T> {
  for (const item of items) yield item;
}

// Helper to collect all events from stream
async function collectEvents(stream: AsyncIterable<StreamEvent>): Promise<StreamEvent[]> {
  const events: StreamEvent[] = [];
  for await (const event of stream) events.push(event);
  return events;
}

// ── Anthropic Streaming ──

vi.mock("@anthropic-ai/sdk", () => {
  return {
    default: class {
      messages = {
        create: vi.fn(),
        stream: vi.fn().mockReturnValue(
          asyncIterable([
            { type: "content_block_delta", delta: { type: "text_delta", text: "Hello" } },
            { type: "content_block_delta", delta: { type: "text_delta", text: " world" } },
          ])
        ),
      };
    },
  };
});

vi.mock("openai", () => {
  return {
    default: class {
      chat = {
        completions: {
          create: vi.fn().mockResolvedValue(
            asyncIterable([
              { choices: [{ delta: { content: "Hello" } }] },
              { choices: [{ delta: { content: " world" } }] },
            ])
          ),
        },
      };
    },
  };
});

describe("AnthropicProvider streaming", () => {
  it("should yield message_start, text_delta events, and message_stop", async () => {
    const provider = new AnthropicProvider({ apiKey: "test-key" });
    const messages: Message[] = [{ role: "user", content: "Hi" }];
    const events = await collectEvents(provider.stream(messages));

    expect(events[0].type).toBe("message_start");
    expect(events[1]).toEqual({ type: "text_delta", text: "Hello" });
    expect(events[2]).toEqual({ type: "text_delta", text: " world" });
    expect(events[events.length - 1].type).toBe("message_stop");
  });

  it("should produce full text from text_delta events", async () => {
    const provider = new AnthropicProvider({ apiKey: "test-key" });
    const messages: Message[] = [{ role: "user", content: "Hi" }];
    let fullText = "";
    for await (const event of provider.stream(messages)) {
      if (event.type === "text_delta" && event.text) fullText += event.text;
    }
    expect(fullText).toBe("Hello world");
  });
});

describe("OpenAICompatibleProvider streaming", () => {
  it("should yield message_start, text_delta events, and message_stop", async () => {
    const provider = new OpenAICompatibleProvider({
      apiKey: "test-key",
      baseURL: "https://api.deepseek.com",
      model: "deepseek-chat",
    });
    const messages: Message[] = [{ role: "user", content: "Hi" }];
    const events = await collectEvents(provider.stream(messages));

    expect(events[0].type).toBe("message_start");
    expect(events[1]).toEqual({ type: "text_delta", text: "Hello" });
    expect(events[2]).toEqual({ type: "text_delta", text: " world" });
    expect(events[events.length - 1].type).toBe("message_stop");
  });

  it("should produce full text from text_delta events", async () => {
    const provider = new OpenAICompatibleProvider({
      apiKey: "test-key",
      baseURL: "https://api.deepseek.com",
      model: "deepseek-chat",
    });
    const messages: Message[] = [{ role: "user", content: "Hi" }];
    let fullText = "";
    for await (const event of provider.stream(messages)) {
      if (event.type === "text_delta" && event.text) fullText += event.text;
    }
    expect(fullText).toBe("Hello world");
  });
});
from unittest.mock import AsyncMock, MagicMock, patch

import pytest

from src.llm.anthropic_provider import AnthropicProvider, AnthropicConfig
from src.llm.openai_compatible import OpenAICompatibleProvider, OpenAICompatibleConfig
from src.llm.types import Message, StreamEvent


async def collect_events(stream) -> list[StreamEvent]:
    """Collect all events from an async iterator."""
    events = []
    async for event in stream:
        events.append(event)
    return events


@pytest.mark.asyncio
class TestAnthropicStreaming:
    async def test_yield_stream_events(self):
        with patch("src.llm.anthropic_provider.AsyncAnthropic"):
            provider = AnthropicProvider(AnthropicConfig(api_key="test-key"))

        # Mock the stream context manager
        mock_event1 = MagicMock()
        mock_event1.type = "content_block_delta"
        mock_event1.delta.type = "text_delta"
        mock_event1.delta.text = "Hello"

        mock_event2 = MagicMock()
        mock_event2.type = "content_block_delta"
        mock_event2.delta.type = "text_delta"
        mock_event2.delta.text = " world"

        mock_stream = MagicMock()
        mock_stream.__aenter__ = AsyncMock(return_value=mock_stream)
        mock_stream.__aexit__ = AsyncMock(return_value=False)
        async def async_iter():
            yield mock_event1
            yield mock_event2

        mock_stream.__aiter__ = lambda self: async_iter()

        provider._client.messages.stream = MagicMock(return_value=mock_stream)

        messages = [Message(role="user", content="Hi")]
        events = await collect_events(provider.stream(messages))

        assert events[0].type == "message_start"
        assert events[1] == StreamEvent(type="text_delta", text="Hello")
        assert events[2] == StreamEvent(type="text_delta", text=" world")
        assert events[-1].type == "message_stop"

重点看 asyncIterable 这个 helper 函数:它是一个 async function*,接受一个数组,逐个 yield 元素,返回一个 AsyncIterable。这是在测试中模拟流式数据的标准做法。

collectEvents 函数做的事情正好相反:把 AsyncIterable 收集成一个普通数组,方便断言。

7. 运行验证

$ npm test ✓ tests/stream.test.ts (4 tests) 8ms ✓ AnthropicProvider streaming > should yield message_start, text_delta events, and message_stop ✓ AnthropicProvider streaming > should produce full text from text_delta events ✓ OpenAICompatibleProvider streaming > should yield message_start, text_delta events, and message_stop ✓ OpenAICompatibleProvider streaming > should produce full text from text_delta events
$ cd python && python -m pytest tests/test_stream.py -v tests/test_stream.py::TestAnthropicStreaming::test_yield_stream_events PASSED tests/test_stream.py::TestAnthropicStreaming::test_full_text_from_deltas PASSED tests/test_stream.py::TestOpenAICompatibleStreaming::test_yield_stream_events PASSED tests/test_stream.py::TestOpenAICompatibleStreaming::test_full_text_from_deltas PASSED

本章小结

在这一章中,我们理解了:

  1. 流式响应通过 SSE 协议实现逐块传输,将首字延迟从十几秒降到不到一秒。
  2. StreamEvent 有四种类型,事件序列是 start → delta* → stop
  3. AsyncGeneratorasync *)是实现流式生产者的利器,配合 for await...of 消费。
  4. Anthropic 和 OpenAI 的流式协议细节不同,但通过统一的 LLMProvider.stream 接口,上层代码完全不需要感知差异。

下一章我们将实现多轮对话管理 —— 让 Agent 记住上下文,进行连续的对话。