
May 29, 2026 · 9:05 AM
Claude Code SDK #5:消息流全解——6 种消息类型 × async for,把 Agent 执行过程变成可编程事件流
query() 返回的不是字符串,而是持续吐出消息对象的异步流。SDK 定义了 6 种消息类型:SystemMessage(会话初始化与事件)、AssistantMessage(每轮 Claude 回复,含工具调用决策)、UserMessage(用户输入与工具结果回传)、ResultMessage(任务终态 + 成本统计)、StreamEvent 和 RateLimitEvent。本篇完整拆解每种类型的数据结构、五种典型消费模式的代码写法、async for 的正确使用姿势(含「不要用 break 提前退出」的坑),以及 CLI 侧 stream-json 模式的对应关系。
每次调用
query(),你得到的不是一个字符串,而是一条持续吐出消息对象的异步流。这条流里有初始化事件、每轮 Assistant 回复、工具调用记录、最终结果、成本统计……读懂它,等于拿到了 Agent 整个执行过程的可观测接口。
本篇完整拆解 SDK 的 6 种消息类型、
async for 消费方式的正确写法,以及五种典型过滤场景的代码模式。为什么 query() 要返回「流」而不是「结果」
传统 API 调用是请求-响应:等它跑完,拿返回值。Agent 不一样——它要读文件、执行命令、调 Web 搜索,可能跑几十秒甚至几分钟。
把整个执行过程包装成流,有三个实际好处:
- 进度可见:工具被调用时立刻知道,不用盲等
- 中间态可干预:配合 Hooks,可以在工具调用前/后插入自己的逻辑(详见后续 Hooks 篇)
- 成本可核:
ResultMessage携带精确 token 用量和估算费用,方便计量
query() 的返回值是一个 async generator,每产生一条消息对象就 yield 一次,直到 Agent 完成。消息类型总览
Python SDK 定义了 6 种消息类型,构成一个联合类型
Message 1:Message = (
UserMessage
| AssistantMessage
| SystemMessage
| ResultMessage
| StreamEvent
| RateLimitEvent
)| 类型 | 触发时机 | 核心字段 |
|---|---|---|
SystemMessage | 会话初始化、内部系统事件 | subtype、data |
AssistantMessage | 每次 Claude 产出回复 | content、model、usage |
UserMessage | 用户输入,含工具结果回传 | content、tool_use_result |
ResultMessage | Agent 完成整轮任务 | result、total_cost_usd、usage、num_turns |
StreamEvent | 部分流式事件(需启用 --include-partial-messages) | 由底层 API 事件构成 |
RateLimitEvent | 触发限速时 | 限速相关元数据 |
日常开发中打交道最多的是前 4 种。
StreamEvent 和 RateLimitEvent 留到高级场景处理。Loading stats card…
逐类型解剖
SystemMessage:会话的「开机广播」
会话启动时,SDK 首先 yield 一条
subtype="init" 的 SystemMessage,data 字段里有本轮会话 ID:@dataclass
class SystemMessage:
subtype: str # "init"、"session_end" 等
data: dict[str, Any] # 元数据,因 subtype 不同而异最常用的场景:从
init 消息里捞 session_id,用于后续的 Session continue / resume:import asyncio
from claude_agent_sdk import query, ClaudeAgentOptions, SystemMessage
async def main():
session_id = None
async for message in query(
prompt="分析 utils.py 里的函数",
options=ClaudeAgentOptions(allowed_tools=["Read", "Glob"]),
):
if isinstance(message, SystemMessage) and message.subtype == "init":
session_id = message.data["session_id"]
print(f"Session ID: {session_id}")
asyncio.run(main())AssistantMessage:Claude 的每轮回复
每次 Claude 产出内容,无论是文字还是工具调用决策,都包装成一条
AssistantMessage:@dataclass
class AssistantMessage:
content: list[ContentBlock] # TextBlock / ToolUseBlock 等
model: str # 生成该条回复的模型名
parent_tool_use_id: str | None = None # 子 Agent 场景:父工具 ID
error: AssistantMessageError | None = None # 出错时的错误类型
usage: dict[str, Any] | None = None # 本条消息的 token 用量
message_id: str | None = None # API 消息 IDcontent 是 ContentBlock 列表。常见子类型:TextBlock:Claude 的文本输出,读.text字段ToolUseBlock:Claude 决定调用工具,包含工具名和参数
error 可能的值:authentication_failed、billing_error、rate_limit、invalid_request、server_error、max_output_tokens、unknown。提取文本输出的标准写法:
from claude_agent_sdk import AssistantMessage, TextBlock
async for message in query(prompt="..."):
if isinstance(message, AssistantMessage):
for block in message.content:
if isinstance(block, TextBlock):
print(f"Claude 说:{block.text}")
# 工具调用不在这里处理,SDK 会自动执行UserMessage:工具结果的回传通道
UserMessage 不只是用户输入——工具执行完毕后,结果以 UserMessage 的形式回传给 Claude。tool_use_result 字段标识它是一条工具结果消息:@dataclass
class UserMessage:
content: str | list[ContentBlock]
uuid: str | None = None
parent_tool_use_id: str | None = None # 对应哪次工具调用
tool_use_result: dict[str, Any] | None = None # 工具执行结果通常不需要手动处理
UserMessage——SDK 的 Agent 循环自动处理工具调用和结果回传。但在调试或审计场景,可以通过它观察工具实际返回了什么。ResultMessage:任务终结符,带成本清单
整轮 Agent 任务完成后,SDK yield 最后一条
ResultMessage:@dataclass
class ResultMessage:
subtype: str
duration_ms: int # 总耗时(毫秒)
duration_api_ms: int # API 调用累计耗时(毫秒)
is_error: bool # 是否以错误结束
num_turns: int # Agent 执行的回合数
session_id: str
stop_reason: str | None = None
total_cost_usd: float | None = None # 估算总费用(美元)
usage: dict[str, Any] | None = None # 含 input_tokens、output_tokens、
# cache_creation_input_tokens、
# cache_read_input_tokens
result: str | None = None # 最终文本结果
structured_output: Any = None # 结构化输出(需配合 --json-schema)关键点:
result 字段才是 Agent 的最终答案。之前所有 AssistantMessage 是过程输出;ResultMessage.result 是终态输出。Loading chart…
快速提取结果的最简写法:
from claude_agent_sdk import ResultMessage
async for message in query(prompt="总结这个代码库"):
if isinstance(message, ResultMessage):
if message.is_error:
print(f"执行失败,原因:{message.stop_reason}")
else:
print(f"结果:{message.result}")
print(f"耗时:{message.duration_ms}ms")
print(f"费用:${message.total_cost_usd:.4f}")
print(f"Token 用量:{message.usage}")五种典型消费模式
1. 只要最终结果(最简模式)
async for message in query(prompt="..."):
if hasattr(message, "result"):
print(message.result)hasattr(message, "result") 比 isinstance(message, ResultMessage) 更宽松,适合快速脚本。2. 实时进度跟踪(长任务必备)
from claude_agent_sdk import (
query, ClaudeAgentOptions,
SystemMessage, AssistantMessage, TextBlock, ResultMessage
)
async for message in query(
prompt="重构整个 src/ 目录",
options=ClaudeAgentOptions(allowed_tools=["Read", "Edit", "Bash", "Glob"]),
):
if isinstance(message, SystemMessage) and message.subtype == "init":
print(f"[开始] session={message.data['session_id']}")
elif isinstance(message, AssistantMessage):
for block in message.content:
if isinstance(block, TextBlock) and block.text:
print(f"[Claude] {block.text[:80]}...")
elif isinstance(message, ResultMessage):
print(f"[完成] {message.num_turns} 轮 | {message.duration_ms}ms | ${message.total_cost_usd:.4f}")3. 成本监控(生产环境必备)
costs = []
async for message in query(prompt="..."):
if isinstance(message, ResultMessage):
costs.append({
"cost": message.total_cost_usd,
"input_tokens": message.usage.get("input_tokens", 0),
"output_tokens": message.usage.get("output_tokens", 0),
"cache_read": message.usage.get("cache_read_input_tokens", 0),
})cache_read_input_tokens 是缓存命中的 token 数,命中率高说明 prompt 构造效率好,可以作为优化指标。4. 错误分级处理
from claude_agent_sdk import AssistantMessage
async for message in query(prompt="..."):
if isinstance(message, AssistantMessage) and message.error:
if message.error == "rate_limit":
print("触发限速,稍后重试")
elif message.error == "max_output_tokens":
print("输出超长,考虑拆分任务")
elif message.error in ("billing_error", "authentication_failed"):
raise RuntimeError(f"账号问题:{message.error}")5. 子 Agent 消息过滤(多 Agent 场景)
在子 Agent 场景(使用
AgentDefinition + Agent 工具),子 Agent 产生的消息会携带 parent_tool_use_id,用于区分消息来源:async for message in query(
prompt="用 code-reviewer 审查代码",
options=ClaudeAgentOptions(
allowed_tools=["Read", "Glob", "Grep", "Agent"],
agents={"code-reviewer": AgentDefinition(...)},
),
):
if isinstance(message, AssistantMessage):
if message.parent_tool_use_id is None:
print(f"[主 Agent] {message}")
else:
print(f"[子 Agent:{message.parent_tool_use_id[:8]}] {message}")一个坑:不要用 break 提前退出
SDK 文档明确提到:迭代消息流时,避免用
break 提前中断循环,否则可能触发 asyncio 清理问题 1。正确做法是用标记变量控制逻辑,让迭代自然结束:
# ❌ 有潜在问题
async for message in query(prompt="..."):
if isinstance(message, ResultMessage):
result = message.result
break # 可能导致 asyncio 清理异常
# ✅ 推荐写法
result = None
async for message in query(prompt="..."):
if isinstance(message, ResultMessage):
result = message.result
# 不 break,让流自然结束CLI 侧的流式输出:stream-json
SDK 的消息流对应 CLI 的
--output-format stream-json 模式 2。每条消息以换行符分隔的 JSON 对象实时输出到 stdout,适合脚本管道消费:# 实时处理每条消息
claude -p "分析 src/ 目录" --output-format stream-json \
| jq 'select(.type == "result") | {result, cost: .total_cost_usd}'
# 配合 --include-partial-messages 获取更细粒度的流式事件
claude -p "..." --output-format stream-json \
--verbose --include-partial-messages--verbose 标志让流包含完整的工具调用细节;--include-partial-messages 进一步暴露文本流式生成的中间片段(对应 Python SDK 的 StreamEvent)。Python SDK 完整源码与类型定义可在官方仓库查看:
Loading content card…
实践建议
1. 生产脚本必须监听
ResultMessage.is_error
不要假设 Agent 总会成功。is_error=True + stop_reason 能给出可程序化处理的失败原因。2. 用
session_id 做日志关联
每轮任务的所有消息共享同一 session_id,是链路追踪的天然 key。把它写进日志,就能按任务查全量记录。3. 关注
num_turns
ResultMessage.num_turns 反映 Agent 的「思考深度」。同样任务 turns 骤增可能意味着 prompt 不清或工具调用陷入循环,值得告警。4. 用
usage.cache_read_input_tokens 优化成本
这个字段显示 prompt cache 命中的 token 量。对于重复运行的 CI 场景,cache 命中率直接影响 API 费用,可以把它纳入成本看板。下一篇:Hooks 系统——在 PreToolUse / PostToolUse / Stop 等生命周期节点注入自定义逻辑,用回调函数实现审计、拦截和副作用处理。
References
More from this channel
- Claude Code SDK #9:自定义工具全解——@tool 装饰器 × in-process MCP × 错误处理 × 非文本返回,把任意函数变成 Claude 的能力
- Claude Code SDK #8:MCP 集成全解——三种接入方式 × 工具命名规范 × Tool Search 懒加载,把 Agent 能力边界推到任意外部服务
- Claude Code SDK #7:子 Agent 编排全解——上下文隔离 × 工具沙盒 × 并行加速,让主 Agent 不再被子任务撑爆
- Claude Code SDK #6:Hooks 系统全解——在 Agent 生命周期的每个节点插入你的代码
- Claude Code SDK #4:内置工具全解——11 种工具 × 5 种权限模式,精准控制 Agent 能干什么
- Claude Code SDK #3:Session 管理全解——continue / resume / fork 三把钥匙,让 Agent 真正拥有记忆
- Claude Code SDK #2:--output-format 三种模式全解——把 Claude 嵌进任何脚本的正确姿势
- Claude Agent SDK #1:query() 函数全解——三行代码启动一个自主 Agent
Related content
- Sign in to comment.