LangGraph 入门:用图结构构建你的第一个多智能体工作流

简介: LangGraph 是面向多智能体系统的图编排框架,以有向状态图替代线性链式调用。通过节点(智能体)、边(条件/静态跳转)和类型化共享状态三者解耦,天然支持分支、循环、并行与汇合;内置检查点、原子状态更新与Reducer机制,保障一致性、可调试性与容错恢复能力。

LangGraph 设计的一个核心是:多智能体工作流本质上是图结构,而非线性链。早期 LLM 应用普遍采用"提示 → LLM → 响应"的线性模式,但这种架构难以应对真实智能体系统的复杂性。比如生产环境中的多智能体协作需要分支(基于数据选择不同执行路径)、循环(支持重试与迭代优化)、汇合(多个智能体向共享状态写入数据),以及条件路由(根据执行结果动态决定后续流程)。

LangGraph 如何表示工作流

LangGraph 里每个工作流都是一个 StateGraph——本质上是有向图。节点就是智能体,或者说处理状态的函数;边是智能体之间的转换;状态则是在整个图中流动的共享数据结构。

 from langgraph.graph import StateGraph, END  
from typing import TypedDict

# Define your state schema  
class IncidentState(TypedDict):  
    incident_id: str  
    current_metrics: dict  
    proposed_solution: dict  
    issue_resolved: bool  
    retry_count: int

# Create the graph  
workflow = StateGraph(IncidentState)

# Add agent nodes  
workflow.add_node("diagnose", diagnose_agent)  
workflow.add_node("plan_fix", planning_agent)  
workflow.add_node("execute_fix", worker_agent)  
workflow.add_node("verify", verification_agent)

# Define transitions  
workflow.add_edge("diagnose", "plan_fix")  
workflow.add_edge("plan_fix", "execute_fix")  
workflow.add_edge("execute_fix", "verify")

# Conditional: retry or exit  
workflow.add_conditional_edges(  
    "verify",  
    lambda state: "resolved" if state["issue_resolved"] else "retry",  
    {  
        "resolved": END,  
        "retry": "diagnose"  # Loop back  
    }  
)

 workflow.set_entry_point("diagnose")

这样做的好处非常明显:图本身就可以当作开发文档文档,一眼能看懂流程;加减节点不用动协调逻辑;状态有类型约束;循环有内置的终止条件,不会跑成死循环。

节点、边、状态三者各司其职。节点封装具体的逻辑操作,只管做事;边定义节点间怎么交互、谁先谁后;状态承载共享上下文,让节点可以保持无状态。这种职责分离让系统好理解、好调试、好扩展,节点还能跨工作流复用。

运行时到底发生了什么

图定义是声明式的,但真正让编排变得有意义的是运行时行为。

工作流启动后,LangGraph 用状态机来管理执行。首先从入口节点的初始状态开始,然后调用智能体函数并传入当前状态。智能体返回的是增量更新而不是整个状态的替换,LangGraph 拿到更新后原子性地合并到当前状态,接着根据图定义决定下一个节点,同时创建检查点把当前状态和执行位置持久化下来。这个过程一直重复,直到走到 END 节点或者达到最大迭代次数。

有一点很关键:智能体永远不会直接改共享状态。它们拿到的是只读副本,算完之后返回更新,实际的状态修改由 LangGraph 来做,可以保证了原子性和一致性。

边遍历机制

边定义了哪些转换是允许的,但具体什么时候转换由运行时决定。

静态边没什么花样:

 workflow.add_edge("diagnose", "plan_fix")

diagnose 节点跑完、检查点创建好之后,LangGraph 立刻拿更新后的状态去调 plan_fix。

条件边就灵活多了:

 workflow.add_conditional_edges(  
     "verify",  
     route_function,  
     {"retry": "diagnose", "resolved": END}  
 )

verify 完成后,LangGraph 调用 route_function(state) 来判断下一步走哪条边。函数返回 retry 就回到 diagnose,返回 resolved 就结束。

任何节点在执行前它的所有前置节点必须已经完成并创建了检查点,这就避免了 Pub/Sub 系统里常见的那种"前面还没跑完后面就开始了"的问题。

状态管理的特殊之处

LangGraph 的状态跟传统系统不太一样。

它不是存在 Redis 或数据库里让智能体直接访问的共享内存。LangGraph 在内部维护状态,给智能体的是受控访问。对智能体来说状态是不可变的——拿到的是快照,不能直接改,只能返回想要的变更。

多个智能体并行跑的时候(通过并行边),LangGraph 收集所有更新,用 reducer 原子性地一起应用。读-修改-写的竞态条件就这么解决了。

每个检查点还会创建一个状态版本。想看执行历史中任意时刻的状态?直接查检查点就行,这就是所谓的时间旅行调试。

检查点持久化

检查点不只是日志,它们是恢复点。

每个检查点记录完整的状态快照、当前在图中的位置(刚执行完哪个节点)、还有元数据(时间戳、创建检查点的节点、执行路径)。

创建时机有三个:每个节点成功完成后、条件边评估前、以及工作流暂停时(比如等人工审批)。

这样如果节点执行到一半崩了,可以从最后一个检查点重试就行;长时间运行的工作流可以暂停再恢复,进度不会丢;调试的时候能从任意检查点开始重放。

一个完整的运行时示例

假设用户发起请求:"修复服务延迟问题"。

 T0: Workflow starts  
    - Initial state: {incident_id: "INC-123", retry_count: 0}  
    - Entry point: "diagnose"

T1: "diagnose" node executes  
    - Receives: {incident_id: "INC-123", retry_count: 0}  
    - Agent calls Data Agent, fetches metrics  
    - Returns: {current_metrics: {cpu: 95, latency: 500ms}}  
    - LangGraph merges: state now has metrics  
    - Checkpoint created  

T2: Static edge triggers: "diagnose" → "plan_fix"  
    - "plan_fix" node executes  
    - Receives merged state (incident_id + retry_count + current_metrics)  
    - Agent calls Knowledge Agent for runbook  
    - Returns: {proposed_solution: "restart_service"}  
    - LangGraph merges  
    - Checkpoint created

T3: Static edge triggers: "plan_fix" → "execute_fix"  
    - "execute_fix" node executes  
    - Calls Worker Agent  
    - Returns: {action_status: "completed"}  
    - Checkpoint created

T4: Static edge triggers: "execute_fix" → "verify"  
    - "verify" node executes  
    - Calls Data Agent again  
    - Returns: {current_metrics: {cpu: 90, latency: 480ms}, issue_resolved: false}  
    - Checkpoint created

T5: Conditional edge evaluation  
    - LangGraph calls route function with current state  
    - route_function checks: state["issue_resolved"] == false and retry_count < 3  
    - Returns: "retry"  
    - LangGraph increments retry_count  
    - Routes back to "diagnose" (cycle)

T6: "diagnose" executes again (retry [#1](#1))  
     - Process repeats with updated state...

状态在节点间累积——指标、方案、操作结果都在里面。每个节点都能看到之前所有节点产出的完整信息。重试逻辑是图结构强制的,不是写在智能体代码里。出了故障检查点可以让程序随时恢复运行。

用 LangGraph 的话,智能体只管返回自己的更新。协调、状态合并、路由、持久化,运行时全包了。

关键架构模式

传统多智能体系统喜欢累积对话历史:

 # Common pattern - append-only log  
 messages= [  
     {"role": "user", "content": "Service X is slow"},  
     {"role": "data", "content": "CPU at 95%"},  
     {"role": "knowledge", "content": "Try restarting"},  
     {"role": "action", "content": "Restarted service"},  
     ...  
 ]

这东西会无限增长,智能体每次都得在历史里翻来翻去找有用的数据。

LangGraph 换了个思路,状态就是当前世界的快照:

 classState(TypedDict):  
     # Current values, not history  
     incident_id: str  
     current_cpu: float  
     recommended_action: str  
     action_status: str  
     retry_count: int

智能体读当前值、更新当前值。历史通过检查点单独维护,调试用得着,但工作状态保持精简。访问状态 O(1),不用解析历史;数据所有权清晰,一眼看出哪个字段归谁管;推理也简单,当前状态是啥就是啥。

Reducer 解决并行协调

多个智能体要往同一个状态字段写数据怎么办?LangGraph 提供 reducer——专门合并并发更新的函数。

传统 A2A 模型里,智能体得自己搞协调:抢锁、读-修改-写、重试、冲突检测。这套东西各团队实现得五花八门,一旦出现部分故障就容易出问题。Reducer 把冲突解决挪到编排层,智能体级别的协调逻辑直接省掉。

比如说下面的例子,三个监控智能体并行检查不同的服务副本:

 fromtypingimportAnnotated  
 fromoperatorimportadd

 classState(TypedDict):  
     # Reducer: combine all health check results  
     health_checks: Annotated[list, add]

三个 Data Agent 各自返回健康检查结果,reducer(这里就是列表的 add 操作)自动把三份结果合成一个列表。没有智能体需要知道其他智能体的存在,不用抢锁,不用协调更新。

没有 reducer 的话,需要手动加锁防覆盖、写协调逻辑合并结果、还得担心更新丢失。有了 reducer,编排层自动处理。

检查点用于调试和恢复

每次节点执行都会创建检查点,状态和执行位置的快照会持久化到 Postgres、Redis 或文件系统。

生产环境出故障了?可以检查检查点的内容,看看每个智能体观察到了什么、做了什么决定。这相当于给智能体工作流装了黑匣子,决策链条一清二楚。

服务器中途崩了也可以从最后一个检查点恢复,不用从头来。对那些要调用昂贵 API 或者收集大量数据的长时间任务来说,这太重要了。

而且工作流可以暂停几小时甚至几天,状态通过检查点保持现有状态,从暂停的地方精确恢复,上下文完整保留。

修改工作流的灵活性

LangGraph的另外一个卖点是工作流改起来容易。

假设初始工作流是 Diagnose → Fix → Verify,现在要加个需求:"修复之前先查一下 Jira 有没有已知问题"。

代码改动就这么点:

 # Add the new agent  
workflow.add_node("check_jira", jira_agent)

# Rewire the flow  
workflow.add_edge("diagnose", "check_jira")  # New path  
workflow.add_conditional_edges(  
    "check_jira",  
    lambda state: "known_issue" if state["jira_ticket"] else "unknown",  
    {  
        "known_issue": "apply_known_fix",  # New path  
        "unknown": "plan_fix"              # Original path  
    }  
 )

单个智能体的实现不用动,状态协调逻辑不用动,检查点处理不用动,错误恢复不用动。

如果换成换成 Pub/Sub 呢?事件路由逻辑要改,完成跟踪要改(现在是 4 个智能体不是 3 个了),状态模式协调要改,所有集成点都得重新测。

再看重试逻辑的修改。原来是最多重试 3 次:

 # Before  
 workflow.add_conditional_edges(  
     "verify",  
     lambda state: "retry" if state["retry_count"] < 3 else "end",  
     {"retry": "diagnose", "end": END}  
 )

新需求:"只有临时性错误(网络问题)才重试,永久性错误(配置问题)不重试"。改条件函数就行:

 # After - just change the condition function  
def should_retry(state):  
    if state["issue_resolved"]:  
        return "success"  
    if state["error_type"] == "config":  
        return "escalate"  # Don't retry config errors  
    if state["retry_count"] >= 3:  
        return "max_retries"  
    return "retry"

workflow.add_conditional_edges(  
    "verify",  
    should_retry,  
    {  
        "success": END,  
        "retry": "diagnose",  
        "escalate": "human_review",  
        "max_retries": "alert_team"  
    }  
 )

业务逻辑在工作流结构里一目了然,改起来也顺手。

LangGraph 支持的典型模式

生成的方案不够好,可以直接加个循环:

 workflow.add_node("generate_solution", llm_agent)  
workflow.add_node("validate_solution", validation_agent)  
workflow.add_node("refine_solution", refinement_agent)

workflow.add_conditional_edges(  
    "validate_solution",  
    lambdastate: "valid"ifstate["solution_quality"] >0.8else"refine",  
    {  
        "valid": "execute_fix",  
        "refine": "refine_solution"  
    }  
)

 workflow.add_edge("refine_solution", "generate_solution")  # Loop back

方案不断迭代,直到质量达标。

并行信息收集时需要同时从多个来源拉数据:

 fromlanggraph.graphimportSTART

# Parallel nodes  
workflow.add_node("fetch_metrics", data_agent)  
workflow.add_node("fetch_logs", elasticsearch_agent)  
workflow.add_node("fetch_config", knowledge_agent)

# All start in parallel  
workflow.add_edge(START, "fetch_metrics")  
workflow.add_edge(START, "fetch_logs")  
workflow.add_edge(START, "fetch_config")

# All must complete before analysis  
workflow.add_node("analyze", analysis_agent)  
workflow.add_edge("fetch_metrics", "analyze")  
workflow.add_edge("fetch_logs", "analyze")  
 workflow.add_edge("fetch_config", "analyze")

LangGraph 保证 analyze 节点在三个数据源都拿完之后才开始跑。

高风险操作需要人来进行确认:

 workflow.add_node("propose_fix", planning_agent)  
workflow.add_node("await_approval", approval_gate)  
workflow.add_node("execute_fix", action_agent)

workflow.add_edge("propose_fix", "await_approval")

# Workflow pauses at await_approval  
# State is persisted  
# When human approves, workflow resumes

workflow.add_conditional_edges(  
    "await_approval",  
    lambdastate: "approved"ifstate["human_approved"] else"rejected",  
    {  
        "approved": "execute_fix",  
        "rejected": "propose_alternative"  
    }  
 )

这个确认过程可以等几小时甚至几天,不消耗任何的资源。

什么场景适合 LangGraph

复杂工作流(5 个以上智能体、有条件逻辑、有循环)、业务逻辑经常变、需要事后调试分析、有人工审批或质量门控、长时间任务需要崩溃恢复——这些场景 LangGraph 很合适。

简单的线性流程(A → B → C,没分支)、智能体完全独立不需要协调、对延迟极度敏感(编排开销要控制在 10ms 以内)、或者团队有深厚的分布式系统功底想自己搞状态机——这些场景替代方案也挺好。

总结

编排框架在复杂系统中的价值已经被反复验证:Kubernetes 之于容器、Airflow 之于数据管道、Temporal 之于通用工作流。LangGraph 将同样的理念带入多智能体 AI 领域,提供了 LLM 感知的编排能力。

其核心价值在于:图结构让工作流易于修改和扩展,检查点机制保障了可调试性和故障恢复,reducer 和原子状态更新解决了并行协调难题。开发者可以专注于智能体逻辑本身,而非协调管道的实现细节。

对于正在构建多智能体系统的团队,LangGraph 提供了一条从实验原型到生产系统的可行路径。

https://avoid.overfit.cn/post/207f7dd3b4b2488983645d365c9e0b89

作者:ravikiran veldanda

目录
相关文章
|
6天前
|
人工智能 自然语言处理 Shell
🦞 如何在 Moltbot 配置阿里云百炼 API
本教程指导用户在开源AI助手Clawdbot中集成阿里云百炼API,涵盖安装Clawdbot、获取百炼API Key、配置环境变量与模型参数、验证调用等完整流程,支持Qwen3-max thinking (Qwen3-Max-2026-01-23)/Qwen - Plus等主流模型,助力本地化智能自动化。
🦞 如何在 Moltbot 配置阿里云百炼 API
|
4天前
|
人工智能 JavaScript 应用服务中间件
零门槛部署本地AI助手:Windows系统Moltbot(Clawdbot)保姆级教程
Moltbot(原Clawdbot)是一款功能全面的智能体AI助手,不仅能通过聊天互动响应需求,还具备“动手”和“跑腿”能力——“手”可读写本地文件、执行代码、操控命令行,“脚”能联网搜索、访问网页并分析内容,“大脑”则可接入Qwen、OpenAI等云端API,或利用本地GPU运行模型。本教程专为Windows系统用户打造,从环境搭建到问题排查,详细拆解全流程,即使无技术基础也能顺利部署本地AI助理。
5632 13
|
10天前
|
人工智能 API 开发者
Claude Code 国内保姆级使用指南:实测 GLM-4.7 与 Claude Opus 4.5 全方案解
Claude Code是Anthropic推出的编程AI代理工具。2026年国内开发者可通过配置`ANTHROPIC_BASE_URL`实现本地化接入:①极速平替——用Qwen Code v0.5.0或GLM-4.7,毫秒响应,适合日常编码;②满血原版——经灵芽API中转调用Claude Opus 4.5,胜任复杂架构与深度推理。
7011 11
|
4天前
|
人工智能 JavaScript API
零门槛部署本地 AI 助手:Clawdbot/Meltbot 部署深度保姆级教程
Clawdbot(Moltbot)是一款智能体AI助手,具备“手”(读写文件、执行代码)、“脚”(联网搜索、分析网页)和“脑”(接入Qwen/OpenAI等API或本地GPU模型)。本指南详解Windows下从Node.js环境搭建、一键安装到Token配置的全流程,助你快速部署本地AI助理。(239字)
3502 19
|
2天前
|
人工智能 机器人 Linux
保姆级 OpenClaw (原 Clawdbot)飞书对接教程 手把手教你搭建 AI 助手
OpenClaw(原Clawdbot)是一款开源本地AI智能体,支持飞书等多平台对接。本教程手把手教你Linux下部署,实现数据私有、系统控制、网页浏览与代码编写,全程保姆级操作,240字内搞定专属AI助手搭建!
2763 7
保姆级 OpenClaw (原 Clawdbot)飞书对接教程 手把手教你搭建 AI 助手
|
5天前
|
人工智能 安全 Shell
在 Moltbot (Clawdbot) 里配置调用阿里云百炼 API 完整教程
Moltbot(原Clawdbot)是一款开源AI个人助手,支持通过自然语言控制设备、处理自动化任务,兼容Qwen、Claude、GPT等主流大语言模型。若需在Moltbot中调用阿里云百炼提供的模型能力(如通义千问3系列),需完成API配置、环境变量设置、配置文件编辑等步骤。本文将严格遵循原教程逻辑,用通俗易懂的语言拆解完整流程,涵盖前置条件、安装部署、API获取、配置验证等核心环节,确保不改变原意且无营销表述。
2121 6
|
5天前
|
机器人 API 数据安全/隐私保护
只需3步,无影云电脑一键部署Moltbot(Clawdbot)
本指南详解Moltbot(Clawdbot)部署全流程:一、购买无影云电脑Moltbot专属套餐(含2000核时);二、下载客户端并配置百炼API Key、钉钉APP KEY及QQ通道;三、验证钉钉/群聊交互。支持多端,7×24运行可关闭休眠。
3450 7
|
3天前
|
人工智能 JavaScript 安全
Clawdbot 对接飞书详细教程 手把手搭建你的专属 AI 助手
本教程手把手教你将 Moltbot(原 Clawdbot)部署在 Linux 服务器,并对接飞书打造专属 AI 助手:涵盖环境准备、Node.js/NVM 安装、Moltbot 快速安装(支持 Qwen 模型)、Web 管理面板配置及飞书应用创建、权限设置与事件回调对接,全程图文指引,安全可靠。
2210 3
Clawdbot 对接飞书详细教程 手把手搭建你的专属 AI 助手
|
5天前
|
存储 安全 数据库
使用 Docker 部署 Clawdbot(官方推荐方式)
Clawdbot 是一款开源、本地运行的个人AI助手,支持 WhatsApp、Telegram、Slack 等十余种通信渠道,兼容 macOS/iOS/Android,可渲染实时 Canvas 界面。本文提供基于 Docker Compose 的生产级部署指南,涵盖安全配置、持久化、备份、监控等关键运维实践(官方无预构建镜像,需源码本地构建)。
2426 7
|
5天前
|
人工智能 应用服务中间件 API
刚刚,阿里云上线Clawdbot全套云服务!
阿里云上线Moltbot(原Clawdbot)全套云服务,支持轻量服务器/无影云电脑一键部署,可调用百炼平台百余款千问模型,打通iMessage与钉钉消息通道,打造开箱即用的AI智能体助手。
2742 24
刚刚,阿里云上线Clawdbot全套云服务!