上一章的 chat 方法是"一次性"的 —— 发送请求后等待 LLM 生成完所有内容,再一次性返回。对于短回复这没问题,但 LLM 经常需要生成几百甚至上千 token 的回复,等待时间可能长达 10-30 秒。
流式响应(Streaming)解决了这个问题:LLM 每生成一小段文本就立即发送给客户端,用户看到文字一个一个蹦出来,感知延迟从十几秒降到不到一秒。这就是你在 ChatGPT、Claude 聊天界面看到的"打字机效果"。
Content-Type 是 text/event-stream,服务端持续发送 data: 开头的事件行,客户端逐行读取解析。在上一章的类型定义文件中,我们已经定义了 StreamEvent:
// Streaming event types
export interface StreamEvent {
type: "message_start" | "text_delta" | "message_stop" | "error";
text?: string;
}
@dataclass
class StreamEvent:
"""Event emitted during streaming."""
type: str # "message_start", "text_delta", "message_stop", "error"
text: str | None = None
这四种事件构成了流式通信的完整生命周期:
| 事件 | 含义 | text 字段 |
|---|---|---|
message_start | 流开始,准备接收 | 无 |
text_delta | 一小段文本到达 | 增量文本 |
message_stop | 流结束,生成完毕 | 无 |
error | 出错 | 错误信息 |
事件序列始终是:message_start → 若干 text_delta → message_stop。完整文本 = 所有 text_delta 的 text 拼接。
Anthropic SDK 提供了 messages.stream() 方法,返回一个异步可迭代对象。底层是 SSE 协议,SDK 已经帮我们解析好了。
看看 AnthropicProvider.stream 的实现:
async *stream(
messages: Message[],
options?: ChatOptions
): AsyncIterable<StreamEvent> {
const params: Record<string, unknown> = {
model: this.model,
max_tokens: options?.maxTokens ?? 4096,
messages: messages.map((m) => ({ role: m.role, content: m.content })),
};
if (options?.system) params.system = options.system;
const stream = this.client.messages.stream(
params as Anthropic.MessageCreateParamsNonStreaming
);
yield { type: "message_start" };
for await (const event of stream) {
if (
event.type === "content_block_delta" &&
event.delta.type === "text_delta"
) {
yield { type: "text_delta", text: event.delta.text };
}
}
yield { type: "message_stop" };
}
async def stream(
self, messages: list[Message], options: ChatOptions | None = None
) -> AsyncIterator[StreamEvent]:
options = options or ChatOptions()
params: dict = {
"model": self._model,
"max_tokens": options.max_tokens or 4096,
"messages": [
{"role": m.role, "content": m.content} for m in messages
],
}
if options.system:
params["system"] = options.system
yield StreamEvent(type="message_start")
async with self._client.messages.stream(**params) as stream:
async for event in stream:
if (
event.type == "content_block_delta"
and event.delta.type == "text_delta"
):
yield StreamEvent(type="text_delta", text=event.delta.text)
yield StreamEvent(type="message_stop")
注意 async * 语法 —— 这是一个 AsyncGenerator,可以使用 yield 逐个产出事件。外部用 for await...of 消费。
Anthropic SSE 原始事件很多(message_start、content_block_start、content_block_delta、message_stop 等),我们只关心 content_block_delta 中 type === "text_delta" 的事件,它携带实际的文本增量。
OpenAI 的流式同样基于 SSE,但数据格式不同。每个 chunk 是一个包含 choices[0].delta.content 的 JSON 对象。
async *stream(
messages: Message[],
options?: ChatOptions
): AsyncIterable<StreamEvent> {
const stream = await this.client.chat.completions.create({
model: this.model,
max_tokens: options?.maxTokens ?? 4096,
messages: this.formatMessages(messages, options?.system),
stream: true,
});
yield { type: "message_start" };
for await (const chunk of stream) {
const delta = chunk.choices[0]?.delta;
if (delta?.content) {
yield { type: "text_delta", text: delta.content };
}
}
yield { type: "message_stop" };
}
async def stream(
self, messages: list[Message], options: ChatOptions | None = None
) -> AsyncIterator[StreamEvent]:
options = options or ChatOptions()
response = await self._client.chat.completions.create(
model=self._model,
max_tokens=options.max_tokens or 4096,
messages=self._format_messages(messages, options.system),
stream=True,
)
yield StreamEvent(type="message_start")
async for chunk in response:
delta = chunk.choices[0].delta if chunk.choices else None
if delta and delta.content:
yield StreamEvent(type="text_delta", text=delta.content)
yield StreamEvent(type="message_stop")
和 Anthropic 几乎一模一样的模式!区别只在于:
this.client.messages.stream(params)(同步返回 stream 对象)await this.client.chat.completions.create({...params, stream: true})(stream: true 参数触发流式模式,await 返回 stream)event.delta.text,OpenAI 在 chunk.choices[0].delta.contentstream 方法签名完全一样 —— 都返回 AsyncIterable<StreamEvent>。上层代码不需要知道底层用的是哪家的 API,用 for await 消费就行。这就是接口抽象的威力。
理解异步迭代是理解流式处理的关键。两种语言都有对应的语法:TypeScript 的 async * / for await...of,Python 的 async def + yield / async for。简单梳理一下核心概念。
TypeScript 中是 AsyncIterable<T> 接口,Python 中是 AsyncIterator[T] 协议。你可以用 for await...of(TS)或 async for(Python)来消费它。
TypeScript 用 async function*,Python 用包含 yield 的 async def。两者都能在函数内部逐个产出值,让消费者按需拉取。
// Producer: 异步生成器
async function* countdown(n: number): AsyncIterable<number> {
for (let i = n; i > 0; i--) {
await new Promise(resolve => setTimeout(resolve, 1000));
yield i;
}
}
// Consumer: for await...of
for await (const num of countdown(3)) {
console.log(num); // 3, 2, 1 (each after 1s)
}
# Producer: async generator
import asyncio
from typing import AsyncIterator
async def countdown(n: int) -> AsyncIterator[int]:
for i in range(n, 0, -1):
await asyncio.sleep(1)
yield i
# Consumer: async for
async for num in countdown(3):
print(num) # 3, 2, 1 (each after 1s)
stream(msgs, callback)。但异步迭代有几个明显优势:break 跳出循环即可中断流测试流式逻辑的关键是构造一个 mock 的异步可迭代对象。
import { describe, it, expect, vi } from "vitest";
import { AnthropicProvider } from "../src/llm/anthropic.js";
import { OpenAICompatibleProvider } from "../src/llm/openai-compatible.js";
import type { Message, StreamEvent } from "../src/llm/types.js";
// Helper to create async iterable from array
async function* asyncIterable<T>(items: T[]): AsyncIterable<T> {
for (const item of items) yield item;
}
// Helper to collect all events from stream
async function collectEvents(stream: AsyncIterable<StreamEvent>): Promise<StreamEvent[]> {
const events: StreamEvent[] = [];
for await (const event of stream) events.push(event);
return events;
}
// ── Anthropic Streaming ──
vi.mock("@anthropic-ai/sdk", () => {
return {
default: class {
messages = {
create: vi.fn(),
stream: vi.fn().mockReturnValue(
asyncIterable([
{ type: "content_block_delta", delta: { type: "text_delta", text: "Hello" } },
{ type: "content_block_delta", delta: { type: "text_delta", text: " world" } },
])
),
};
},
};
});
vi.mock("openai", () => {
return {
default: class {
chat = {
completions: {
create: vi.fn().mockResolvedValue(
asyncIterable([
{ choices: [{ delta: { content: "Hello" } }] },
{ choices: [{ delta: { content: " world" } }] },
])
),
},
};
},
};
});
describe("AnthropicProvider streaming", () => {
it("should yield message_start, text_delta events, and message_stop", async () => {
const provider = new AnthropicProvider({ apiKey: "test-key" });
const messages: Message[] = [{ role: "user", content: "Hi" }];
const events = await collectEvents(provider.stream(messages));
expect(events[0].type).toBe("message_start");
expect(events[1]).toEqual({ type: "text_delta", text: "Hello" });
expect(events[2]).toEqual({ type: "text_delta", text: " world" });
expect(events[events.length - 1].type).toBe("message_stop");
});
it("should produce full text from text_delta events", async () => {
const provider = new AnthropicProvider({ apiKey: "test-key" });
const messages: Message[] = [{ role: "user", content: "Hi" }];
let fullText = "";
for await (const event of provider.stream(messages)) {
if (event.type === "text_delta" && event.text) fullText += event.text;
}
expect(fullText).toBe("Hello world");
});
});
describe("OpenAICompatibleProvider streaming", () => {
it("should yield message_start, text_delta events, and message_stop", async () => {
const provider = new OpenAICompatibleProvider({
apiKey: "test-key",
baseURL: "https://api.deepseek.com",
model: "deepseek-chat",
});
const messages: Message[] = [{ role: "user", content: "Hi" }];
const events = await collectEvents(provider.stream(messages));
expect(events[0].type).toBe("message_start");
expect(events[1]).toEqual({ type: "text_delta", text: "Hello" });
expect(events[2]).toEqual({ type: "text_delta", text: " world" });
expect(events[events.length - 1].type).toBe("message_stop");
});
it("should produce full text from text_delta events", async () => {
const provider = new OpenAICompatibleProvider({
apiKey: "test-key",
baseURL: "https://api.deepseek.com",
model: "deepseek-chat",
});
const messages: Message[] = [{ role: "user", content: "Hi" }];
let fullText = "";
for await (const event of provider.stream(messages)) {
if (event.type === "text_delta" && event.text) fullText += event.text;
}
expect(fullText).toBe("Hello world");
});
});
from unittest.mock import AsyncMock, MagicMock, patch
import pytest
from src.llm.anthropic_provider import AnthropicProvider, AnthropicConfig
from src.llm.openai_compatible import OpenAICompatibleProvider, OpenAICompatibleConfig
from src.llm.types import Message, StreamEvent
async def collect_events(stream) -> list[StreamEvent]:
"""Collect all events from an async iterator."""
events = []
async for event in stream:
events.append(event)
return events
@pytest.mark.asyncio
class TestAnthropicStreaming:
async def test_yield_stream_events(self):
with patch("src.llm.anthropic_provider.AsyncAnthropic"):
provider = AnthropicProvider(AnthropicConfig(api_key="test-key"))
# Mock the stream context manager
mock_event1 = MagicMock()
mock_event1.type = "content_block_delta"
mock_event1.delta.type = "text_delta"
mock_event1.delta.text = "Hello"
mock_event2 = MagicMock()
mock_event2.type = "content_block_delta"
mock_event2.delta.type = "text_delta"
mock_event2.delta.text = " world"
mock_stream = MagicMock()
mock_stream.__aenter__ = AsyncMock(return_value=mock_stream)
mock_stream.__aexit__ = AsyncMock(return_value=False)
async def async_iter():
yield mock_event1
yield mock_event2
mock_stream.__aiter__ = lambda self: async_iter()
provider._client.messages.stream = MagicMock(return_value=mock_stream)
messages = [Message(role="user", content="Hi")]
events = await collect_events(provider.stream(messages))
assert events[0].type == "message_start"
assert events[1] == StreamEvent(type="text_delta", text="Hello")
assert events[2] == StreamEvent(type="text_delta", text=" world")
assert events[-1].type == "message_stop"
重点看 asyncIterable 这个 helper 函数:它是一个 async function*,接受一个数组,逐个 yield 元素,返回一个 AsyncIterable。这是在测试中模拟流式数据的标准做法。
而 collectEvents 函数做的事情正好相反:把 AsyncIterable 收集成一个普通数组,方便断言。
$ npm test
✓ tests/stream.test.ts (4 tests) 8ms
✓ AnthropicProvider streaming > should yield message_start, text_delta events, and message_stop
✓ AnthropicProvider streaming > should produce full text from text_delta events
✓ OpenAICompatibleProvider streaming > should yield message_start, text_delta events, and message_stop
✓ OpenAICompatibleProvider streaming > should produce full text from text_delta events
$ cd python && python -m pytest tests/test_stream.py -v
tests/test_stream.py::TestAnthropicStreaming::test_yield_stream_events PASSED
tests/test_stream.py::TestAnthropicStreaming::test_full_text_from_deltas PASSED
tests/test_stream.py::TestOpenAICompatibleStreaming::test_yield_stream_events PASSED
tests/test_stream.py::TestOpenAICompatibleStreaming::test_full_text_from_deltas PASSED
在这一章中,我们理解了:
start → delta* → stop。async *)是实现流式生产者的利器,配合 for await...of 消费。LLMProvider.stream 接口,上层代码完全不需要感知差异。下一章我们将实现多轮对话管理 —— 让 Agent 记住上下文,进行连续的对话。