第四章 人机协作中断:让 Agent 在关键节点停下来等你
"完全自动化的 Agent 是危险的——你永远不知道它会调用什么工具、花掉多少钱、泄露什么数据。真正可控的 Agent 知道在哪里停下来,等人类点头。"
一、问题
前三章的图是全自动的:模型自动思考、自动调用工具、自动返回结果。但放到真实业务中,"全自动"是双刃剑:
- 不可审查的决策:模型决定调用
send_email或delete_user,没有设闸就会真的执行。 - 不可控的副作用:第三方 API 可能花钱、可能触发不可逆操作、可能泄露隐私。
- 调试困难:自动执行的流程出错后只能翻日志。如果能暂停在"执行工具"之前,排查会快得多。
- 信任危机:用户不信任一言不合就自动执行的系统。
核心矛盾:我们解决了"状态怎么流转"和"状态怎么持久化",但没有解决"流程怎么被人类控制"。Agent 需要暂停点。
LangGraph 把这个能力叫做 Human-in-the-loop(HIL)——在图的任意节点之前或之后插入"中断点",执行流到达时停下来,暴露状态给调用方,等待人类介入后再继续。
二、解决方案
三件事:
- 声明中断点:编译图时告诉 LangGraph,在
tools节点前暂停。 - 改用 stream 运行:用
stream把执行变成"可暂停的直播"。 - 实现审查与恢复:用
get_state()查看断点,用input()收集用户意愿,用Command(resume=True)继续。
START → agent → [should_continue 返回 "tools"] → [中断点] → 人类审查 → tools → agent → END
↑
interrupt_before=["tools"]
三、工作原理
1. 为什么 invoke 不行?
final_state = app.invoke(inputs, config)
# 跑到 tools 前被 interrupt 拦住 → 抛出 GraphInterrupt → 程序崩溃
invoke 的设计是"一次性跑完并给我最终结果"。遇到中断时它不知道该怎么返回,只能抛异常。人机协作需要的是"平静地停下来"。
2. stream:把"黑盒"变成"直播"
# ❌ 不遍历 = 图根本不跑
app.stream(inputs, config, stream_mode="values")
# ✅ for 循环驱动生成器,图一步步执行
for chunk in app.stream(inputs, config, stream_mode="values"):
pass # pass = 不看中间战报,但要遍历完
stream 返回生成器,Python 生成器是"懒"的——你不索要下一个值,它就原地不动。遇到中断点时,stream 平静结束,不抛异常,你可以在循环结束后查看状态。
3. stream_mode="values"
每个 chunk 是当前完整状态快照:
# 第一次 stream:跑完 agent,在中断点停住
{
'messages': [HumanMessage(...), AIMessage(tool_calls=[...])], ...}
# 第二次 stream(Command(resume=True)):继续跑完剩余流程
{
'messages': [..., ToolMessage(...), AIMessage("上海现在30℃...")], ...}
4. 判断中断:get_state().next
state = app.get_state(config)
if state.next: # 空 () 表示跑完;('tools',) 表示被拦住了
print(f">>> 中断:图即将进入节点 {state.next}")
| 属性 | 含义 |
|---|---|
state.values |
当前所有状态字段的值 |
state.next |
待执行的节点名元组。空=跑完,有值=中断暂停 |
5. 审查工具调用请求
last_message = state.values["messages"][-1]
if hasattr(last_message, "tool_calls") and last_message.tool_calls:
for tc in last_message.tool_calls:
print(f"工具名: {tc['name']}, 参数: {tc['args']}")
中断发生时,模型已经输出了 tool_calls,但工具函数还没有执行。这是人类介入的最佳时机。
6. Command(resume=True) 的含义
# 用户同意,继续执行
for chunk in app.stream(Command(resume=True), config, stream_mode="values"):
pass
Command(resume=True) 告诉 LangGraph:"从中断点继续执行,被拦住的节点(tools)自己去状态里读取所需数据,继续跑。"
resume 参数是"喂给被中断节点的新输入"。但 ToolNode 是全自动的——它会自己从 state["messages"][-1].tool_calls 读取工具调用指令,不需要你额外喂数据。所以 True 就是"我同意,继续"。
7. 完整执行流程
# 第1步:启动 stream,图跑到中断点停住
for chunk in app.stream(inputs, config, stream_mode="values"):
pass
# 第2步:检查是否真的被拦住了
state = app.get_state(config)
if state.next:
# 第3步:审查
print("模型想调用:", state.values["messages"][-1].tool_calls)
user_input = input("是否同意?(y/n): ")
# 第4步:恢复或终止
if user_input.lower() == "y":
for chunk in app.stream(Command(resume=True), config, stream_mode="values"):
pass
else:
print("用户拒绝执行")
# 第5步:输出最终结果
final = app.get_state(config)
print(final.values["messages"][-1].content)
四、核心组件一览
| 组件 | 类 / 函数 / 参数 | 作用 |
|---|---|---|
| 中断声明 | interrupt_before=["tools"] |
编译图时声明:执行流进入 tools 节点前自动暂停 |
| 流式执行 | app.stream(...) |
返回生成器,遍历驱动图执行,遇中断平静结束 |
| 战报格式 | stream_mode="values" |
每次 chunk 返回当前完整状态快照 |
| 状态读取 | app.get_state(config) |
获取指定 thread 的最新检查点 + 中断信息 |
| 断点判断 | state.next |
元组,空表示跑完,有元素表示在中断点暂停 |
| 恢复命令 | Command(resume=True) |
从中断点继续执行被拦住的节点 |
| 用户输入 | input() |
阻塞等待用户键盘输入,实现真正的人机协作 |
五、试一试
前置条件
.env 配置(同第三章):
OPENAI_API_KEY=sk-...
POSTGRES_URI=postgresql://postgres:你的密码@localhost:5432/langgraph_demo
执行脚本
cd demo-01
python langgraph-04.py
预期交互:
>>> [人机协作中断] 图即将进入节点: ('tools',)
模型请求调用以下工具:
- 工具名: search
参数: {'query': '上海天气'}
是否同意执行?(y/n): y
=== 最终回答 ===
上海目前的天气情况:温度30℃,有雾。
验证中断机制
- 输入
n拒绝:程序打印"用户拒绝执行"并结束,工具函数不会被执行。 - 查看检查点:用 SQL 查询
checkpoints表,能看到步骤数只累加到中断前的那一轮——拒绝后不会生成新的完整快照。 - 脏数据问题:如果之前运行失败(比如流被意外中断),同一个
thread_id可能残留"模型喊了 tool_calls 但没等工具结果"的脏状态,下次运行会报 400 错误。解决方案:换个新thread_id或清理数据库。
六、完整代码差异
与第三章相比,只有执行逻辑变了。图的定义、状态、工具、模型、检查点——全部复用。
from langgraph.types import Command # 新增导入
# 编译图时声明中断点
app = workflow.compile(
checkpointer=checkpointer,
interrupt_before=["tools"]
)
thread_id = "chapter-5"
config = {
"configurable": {
"thread_id": thread_id}}
inputs = {
"messages": [HumanMessage(content="你好,我想查询一下上海的天气")],
"temperature_unit": "摄氏度"
}
# 第1步:启动,跑到中断点停住
for chunk in app.stream(inputs, config, stream_mode="values"):
pass
# 第2步:检查中断
state = app.get_state(config)
if state.next:
print(f">>> [人机协作中断] 图即将进入节点: {state.next}")
last_message = state.values["messages"][-1]
if hasattr(last_message, "tool_calls") and last_message.tool_calls:
print("模型请求调用以下工具:")
for tc in last_message.tool_calls:
print(f" - 工具名: {tc['name']}")
print(f" 参数: {tc['args']}")
# 第3步:人工确认
user_input = input("是否同意执行?(y/n):")
if user_input.lower() == "y":
# 第4步:恢复执行
for chunk in app.stream(Command(resume=True), config, stream_mode="values"):
pass
else:
print("用户拒绝执行")
# 第5步:输出最终回答
final_state = app.get_state(config)
print("\n=== 最终回答 ===")
print(final_state.values["messages"][-1].content)
七、其他
三种人工干预模式
LangGraph 的中断点不是只能"看和放行",你可以根据业务需求选择三种干预深度:
| 模式 | 操作 | 代码体现 |
|---|---|---|
| 审查放行 | 只看不改,用户确认后继续 | Command(resume=True) |
| 修改参数 | 审查后改掉工具调用的参数 | app.update_state(config, {...}) + Command(resume=True) |
| 拒绝执行 | 不让工具跑,塞一条假结果让模型重新回答 | app.update_state(config, {"messages": [...]}) + Command(resume=True) |
interrupt_before vs interrupt_after
| 参数 | 中断时机 | 适用场景 |
|---|---|---|
interrupt_before=["tools"] |
进入节点之前 | 审查模型想调用什么工具,人工确认后再执行 |
interrupt_after=["tools"] |
离开节点之后 | 审查工具返回的结果,人工修正后再交给模型 |
可以同时使用:interrupt_before=["tools"], interrupt_after=["tools"],实现"进入前审查请求 + 离开后审查结果"的双重控制。
关于 resume 参数的版本差异
理论上 Command(resume=None) 表示"不给被中断节点额外输入,让它自己读状态"。但部分 LangGraph 版本在处理 None 时存在内部变量未初始化的 bug,会报 UnboundLocalError: cannot access local variable 'resume_is_map'。如果你的版本遇到这个问题,改用 Command(resume=True) 即可。
脏数据陷阱
每次实验建议用新的 thread_id,或在数据库中清理旧记录:
DELETE FROM checkpoints WHERE thread_id = 'chapter-5';
DELETE FROM checkpoint_writes WHERE thread_id = 'chapter-5';
DELETE FROM checkpoint_blobs WHERE thread_id = 'chapter-5';
原因是:如果之前某次运行流被意外中断(比如程序崩溃),状态里可能残留一条"模型喊了 tool_calls 但还没等工具结果"的消息。下次恢复运行时,模型 API 会拒绝这种不完整的历史,报 400 - tool_call_ids did not have response messages。