
April 30, 2026 · 9:57 AM
OpenAI Agents SDK #9:让 Agent「边跑边说」——Streaming 流式输出全解析
从「盯着空白屏幕干等 Agent 结果」的开发者痛点切入,系统拆解 OpenAI Agents SDK 的 Streaming 机制。覆盖 Runner.run_streamed() 完整接口签名与 RunResultStreaming 关键成员、StreamEvent 三层事件类型(RawResponsesStreamEvent / RunItemStreamEvent / AgentUpdatedStreamEvent)及 11 种 RunItemStreamEvent name 枚举、Tool Call 事件的「调用-输出」两阶段处理与 Sub-agent 事件透传机制、流式 Guardrail 的输入/输出不对称行为。附两个完整带注释代码示例(打字机效果 + 工作流追踪),结尾给出 3 条可直接落地的实践建议,并预告 #10 Context 变量管理。
你有没有遇到过这种体验:把一个任务甩给 Agent,然后盯着空白屏幕干等,不知道它在干什么、跑了多久,也不知道它是卡死了还是在思考——直到某一刻,满屏结果一下子全涌出来。
这种「黑盒等待感」,是
Runner.run() 的设计取舍。run() 只会在整个 Agent Loop 跑完后才把结果交给你。一个需要多次工具调用的任务,可能要等十几秒甚至更久——对聊天类应用几乎不可接受,对长时间任务来说更是直接「失联」。解法很直接:
Runner.run_streamed()1。一句话说清楚 Streaming 是什么
run_streamed() 不等 Agent 跑完,而是把 Agent Loop 的每一个「动作」实时包装成事件,推给你的消费者代码。你拿到的不是最终答案,而是一条持续流入的事件流——每收到一个事件,就能立刻处理、渲染、记录或转发。类比一下:
run() 像点外卖等配送,run_streamed() 像在厨房旁边坐着,厨师每做好一道工序就立刻告诉你。这个类比有一处不成立——它不是按「菜」的粒度推送,而是分两个层次:原始 token 粒度(字字涌出)和高级语义粒度(工具调用、消息创建、Agent 切换)。选哪层,取决于你的场景。
run_streamed() 接口
完整签名如下2:
Runner.run_streamed(
starting_agent: Agent[TContext],
input: str | list[TResponseInputItem] | RunState[TContext],
context: TContext | None = None,
max_turns: int = DEFAULT_MAX_TURNS,
hooks: RunHooks[TContext] | None = None,
run_config: RunConfig | None = None,
previous_response_id: str | None = None,
auto_previous_response_id: bool = False,
conversation_id: str | None = None,
session: Session | None = None,
*,
error_handlers: RunErrorHandlers[TContext] | None = None,
) -> RunResultStreaming参数和
run() 几乎一模一样,区别在于返回值:不是 RunResult,而是 RunResultStreaming。RunResultStreaming 上有这几个关键成员:stream_events()—— 异步迭代器,消费事件流的核心入口interruptions—— 用于 Human-in-the-Loop 工具审批的中断列表cancel()/cancel(mode="after_turn")—— 中止任务(立即 vs 当前轮结束后)to_state()—— 序列化当前状态,用于持久化或恢复is_complete—— 布尔属性,表示流是否已结束
StreamEvent 三种类型全解析
事件类型定义为
Union 三元组,每种类型有不同的语义和用途4:Loading stats card…
① RawResponsesStreamEvent
type == "raw_response_event",携带 data: TResponseStreamEvent,即 Responses API 的原始流事件。这是粒度最细的事件,包含逐 token 的文字增量(ResponseTextDeltaEvent)。想实现聊天界面「字字涌现」的打字效果,就靠它。② RunItemStreamEvent
type == "run_item_stream_event",带 name 和 item 两个字段。name 的完整枚举值:| name 值 | 含义 |
|---|---|
message_output_created | 模型完成一条消息输出 |
tool_called | 函数工具被调用(含参数) |
tool_output | 函数工具执行完毕(含返回值) |
tool_search_called | 搜索工具被触发 |
tool_search_output_created | 搜索工具返回结果 |
handoff_requested | Agent 发起 Handoff 请求 |
handoff_occured | Handoff 完成,控制权已转移 |
reasoning_item_created | 模型产生推理步骤(CoT) |
mcp_approval_requested | MCP 工具请求用户审批 |
mcp_approval_response | MCP 审批结果写入 |
mcp_list_tools | 列出可用 MCP 工具 |
用这层事件,你不关心模型在输出哪个字,只关心「调了什么工具」「工具返回了什么」「任务有没有被甩给其他 Agent」。
③ AgentUpdatedStreamEvent
type == "agent_updated_stream_event",携带 new_agent: Agent[Any]。每次 Handoff 完成后触发,告诉你现在是哪个 Agent 在跑。如果你的工作流里有多 Agent 协作,这个事件可以用来更新 UI 上的「当前处理中:XX Agent」进度提示。完整代码示例:从 token 级到语义级

示例一:实现打字机效果(token 粒度)5
import asyncio
from openai.types.responses import ResponseTextDeltaEvent
from agents import Agent, Runner
agent = Agent(
name="chat_agent",
instructions="你是一个友好的助手,用简洁的中文回答问题。",
)
async def main():
# run_streamed() 立即返回,不等 Agent Loop 完成
result = await Runner.run_streamed(agent, input="用三句话介绍一下量子计算。")
print("=== 开始流式输出 ===")
async for event in result.stream_events():
# 只关心原始 token 事件
if event.type == "raw_response_event" and isinstance(
event.data, ResponseTextDeltaEvent
):
# 逐 token 打印,end="" 不换行,flush=True 立即刷新缓冲区
print(event.data.delta, end="", flush=True)
print("\n=== 流式完成 ===")
asyncio.run(main())核心就三行:调用
run_streamed(),async for 遍历事件,检测 ResponseTextDeltaEvent 打印 delta。示例二:追踪工具调用与 Agent 切换(语义粒度)6
import asyncio
from agents import Agent, Runner
from agents.items import ToolCallItem, ToolCallOutputItem, MessageOutputItem
# 假设已定义一个带工具调用的 agent
agent = Agent(
name="research_agent",
instructions="你是一个研究助手,使用工具查找信息后综合回答。",
tools=[search_web], # 自定义工具
)
async def main():
result = await Runner.run_streamed(agent, input="搜索 OpenAI 最新动态")
current_agent_name = agent.name
print(f"▶ 开始(Agent: {current_agent_name})")
async for event in result.stream_events():
# 跳过 token 级噪音,只看语义事件
if event.type == "raw_response_event":
continue
# Agent 切换事件 —— 更新当前 Agent 名称
if event.type == "agent_updated_stream_event":
current_agent_name = event.new_agent.name
print(f"\n🔀 Agent 切换 → {current_agent_name}")
continue
# 语义事件处理
if event.type == "run_item_stream_event":
item = event.item
if isinstance(item, ToolCallItem):
# 工具被触发:打印工具名和参数
print(f"\n🔧 工具调用: {item.raw_item.name}")
print(f" 参数: {item.raw_item.arguments}")
elif isinstance(item, ToolCallOutputItem):
# 工具执行完毕:打印返回结果(截断避免刷屏)
output_preview = str(item.output)[:100]
print(f" ✓ 结果: {output_preview}...")
elif isinstance(item, MessageOutputItem):
# 最终消息输出完成
print(f"\n💬 消息输出完成")
print("\n◼ 完成")
asyncio.run(main())这段代码完整捕获了 Agent 工作流的全部「动作节点」。没有黑盒,每一步都看得见。
处理工具调用事件的细节
流式模式下处理 Tool Call,有几个细节容易踩坑6:
区分「调用」和「输出」:
tool_called 在工具触发时立刻发出,这时工具还没跑完;tool_output 在工具返回结果后才发出。想在 UI 上显示「正在调用工具 X...」和「工具 X 完成」两种状态,就得分别处理这两个事件。Sub-agent 事件透传:Agent 通过 Handoffs 调用子 Agent 时,子 Agent 的流式事件会透传到外层的
stream_events() 里8。嵌套工作流里,从顶层就能拿到所有层级的事件,不需要在每一层单独订阅。MCP 审批流程:工具需要用户确认时(Human-in-the-Loop),流里会先出现
mcp_approval_requested 事件,此时流暂停,你处理 result.interruptions 完成审批后,流才继续推进。流式 Guardrail 的特殊行为
Guardrail 在流式模式下的行为,和非流式有一处关键差异9:
Input Guardrail(输入检查):
blocking 模式下,Input Guardrail 在流开始前就跑完了。输入触发 Tripwire 的话,流根本不会启动——run_streamed() 返回后立刻抛 InputGuardrailTripwireTriggered,stream_events() 没有任何数据。Output Guardrail(输出检查):流式模式下,Output Guardrail 在所有事件流发出之后才执行。token 已经流给你了,最后才检查。如果 Tripwire 触发,
OutputGuardrailTripwireTriggered 在流结束时抛出,但内容已被部分消费。应用不能接受「先流出再撤回」的话,需要在自己这层做缓冲——先攒着,检查通过后再渲染给用户。run_in_parallel 的影响:Input Guardrail 支持 run_in_parallel=True 并发执行。对流式场景来说,多个 Guardrail 并发意味着结果更快收拢,流的等待时延更短。但任一 Guardrail 触发 Tripwire,整个流还是会被中止。一句话总结:流式模式下,Input Guardrail 是「前门」,Output Guardrail 是「事后复核」——两者不对称,设计检查逻辑时要分开考虑。
两种实际应用场景
场景一:聊天界面实时打字效果
最常见的用法。用
RawResponsesStreamEvent + ResponseTextDeltaEvent,每收到一个 delta 就 append 到前端文本框。用户感受从「等 10 秒看到完整回答」变成「0.3 秒开始出现文字,持续流入」。ChatGPT、Claude、Gemini 默认开流式输出不是偶然——不是因为更快,是因为让用户感觉更快,等待焦虑大幅降低。
场景二:进度感知与任务监控
对于需要多步工具调用的 Agent(比如搜索 + 总结 + 格式化的 Research Agent),用
RunItemStreamEvent 驱动一个进度面板:- 收到
tool_called→ 在 UI 显示「正在搜索...」 - 收到
tool_output→ 更新为「搜索完成,正在分析...」 - 收到
agent_updated_stream_event→ 显示「当前 Agent:Summarizer」 - 收到
message_output_created→ 显示「任务完成」
长周期任务里,这种可见性让用户有事可盯,不至于盯着空白屏幕发呆;开发者调试时也能直接看哪个工具最慢,不用猜10。
3 条落地建议
① 按需分层订阅,别把两个层次混在一起
RawResponsesStreamEvent 和 RunItemStreamEvent 的用途不一样。聊天输出用 Raw,任务监控用 Item。两者混在同一个 async for 里,条件判断会越写越乱。建议封装成两个独立函数:handle_token_stream() 和 handle_item_stream(),按场景选用或组合。② 永远处理 stream 结束后的异常
stream_events() 迭代结束不等于任务成功。Output Guardrail 触发、max_turns 超限、工具超时,都可能在流结束后才抛异常。包一个 try/except:try:
async for event in result.stream_events():
# 处理事件
pass
# 流正常结束后,访问最终结果
final_output = result.final_output
except OutputGuardrailTripwireTriggered as e:
# 输出检查失败
handle_guardrail_failure(e)
except MaxTurnsExceeded:
# 超过最大轮次
handle_timeout()③ 长任务用
cancel(mode="after_turn") 优雅退出用户点「停止」时,直接
cancel() 立刻终止,工具可能卡在中间态。cancel(mode="after_turn") 等当前轮动作跑完再停,工具状态完整,history 也是干净的,后续想从 to_state() 恢复会容易很多。下一篇预告
#10 将深入 Context 变量管理:
RunContextWrapper、本地 Context 与 LLM-visible Context 的边界在哪里、ToolContext 的额外元数据怎么用,以及多 Agent 工作流下的状态共享策略。封面图:AI 生成,视觉概念来自 Streaming 数据流主题
References
- 1OpenAI Agents SDK – Streaming
- 2OpenAI Agents SDK – Runner API Reference
- 3OpenAI Agents SDK – Running Agents
- 4OpenAI Agents SDK – Stream Events API Reference
- 5GitHub – stream_text.py 示例
- 6GitHub – stream_items.py 示例
- 7GitHub Releases – v0.14.8
- 8知乎 – OpenAI Agents SDK 源码看 Agent 设计理念
- 9OpenAI Agents SDK – Guardrails
- 10知乎 – Agent Runtime:让 AI 智能体在生产环境中真正跑起来
More from this channel
- OpenAI Agents SDK #13:每次多轮对话都要手写 `.to_input_list()`?Sessions 帮你彻底告别这个坑
- OpenAI Agents SDK #12:你的 Agent 跑到第 8 轮突然停了——你却看不到任何错误信息
- OpenAI Agents SDK #11:多模型调度背后,你不知道的优先级覆盖链
- OpenAI Agents SDK #10:99% 的开发者都搞错了——Context 到底传没传给 LLM?
- OpenAI Agents SDK #8:为 Agent 装上「双保险」——Guardrails 防护栏全解析
- OpenAI Agents SDK #7:Tracing——让 Agent 的每一步都「可见」
- OpenAI Agents SDK #6:把 Agent 关进「安全箱」——Sandbox 执行环境全解析
- OpenAI Agents SDK #5:Memory——让 Agent 真正「记住」你
Related content
- Sign in to comment.