Graph编排进阶:从ReAct到通用DAG实战指南

2026-06-12阅读 0热度 0
其他
好的,作为一位深耕AI编排领域多年的专家,我很乐意将这篇技术内容进行人性化重写。下面就是根据您的需求,完成润色后的文章。 读完这篇你会知道: Graph 编排:不只是 ReAct 的通用 DAG - Graph 能画哪些 ReAct 画不了的图 - 节点、边、分支三个概念怎么组合出复杂流程 - 什么时候该用 Graph,什么时候用 Chain 更简单 - StateGraph 怎么让节点之间共享状态 ## 先看 Eino 的 Graph 能画什么 上一节我们看到了 ReAct 的图:一个 LLM 节点 + 一个工具节点,再加一个分支路由来决定是继续还是结束。这是最简单的循环图。 但 Graph 的能力远不止于此。它能画三种图: ``` ┌─────────────────────────────────────────────────────────────┐ │ Eino Graph 能画什么 │ ├──────────────┬──────────────────────────────────────────────┤ │ 线性管道 │ A → B → C → END │ │ │ 翻译 → 摘要 → 评分 │ ├──────────────┼──────────────────────────────────────────────┤ │ │ ┌─ path1 → D │ │ 条件分支 │ A → B → C ─┤ │ │ │ └─ path2 → E │ │ │ 意图识别 → 路由 → 不同处理 │ ├──────────────┼──────────────────────────────────────────────┤ │ │ ┌──── D ────┐ │ │ 并行+汇聚 │ A → B┤ ├→ F → END │ │ │ └──── E ────┘ │ │ │ 检索 → 并行打分+重排 → 合并 │ ├──────────────┼──────────────────────────────────────────────┤ │ │ ┌─ D ─┐ │ │ 循环(Pregel) │ A → B ─┤├─ B ─→ END │ │ │ └─────┘ │ │ │ ReAct 循环:推理 → 行动 → 再推理 │ └──────────────┴──────────────────────────────────────────────┘ ``` 四种模式,一套 API。区别只在于你怎么连节点和边。 ## 三个基本概念:节点、边、分支 ### 节点(Node):一个执行步骤 节点就是图里的一个“做事的方块”。Eino 提供了 10+ 种节点类型,每种对应一个组件接口: ```go // [compose/generic_graph.go](https://github.com/cloudwego/eino/blob/main/compose/generic_graph.go) g := compose.NewGraph[string, string]() // 最常用:LLM 节点 g.AddChatModelNode("chat", chatModel) // 工具节点 g.AddToolsNode("tools", toolsConfig) // 模板节点 g.AddChatTemplateNode("template", tmpl) // 检索节点 g.AddRetrieverNode("retriever", retriever) // 万能节点:塞任意函数 g.AddLambdaNode("my_logic", compose.InvokableLambda(func(ctx context.Context, input string) (string, error) { return "处理后的: " + input, nil },)) // 甚至可以把另一个图当节点嵌进去 g.AddGraphNode("sub_graph", anotherGraph) ``` 每种 `AddXxxNode` 都是在图里加一个节点。节点之间怎么连?靠边和分支。 ### 边(Edge):数据怎么流 边定义了节点之间的固定连线——数据从上一个节点的输出,流向下一个节点的输入: ```go // [compose/generic_graph.go](https://github.com/cloudwego/eino/blob/main/compose/generic_graph.go) g.AddEdge(compose.START, "template") // 输入 → 模板 g.AddEdge("template", "chat") // 模板 → LLM g.AddEdge("chat", compose.END) // LLM → 输出 ``` 三条边画出了一个最简单的管道:用户输入 → 填模板 → 调 LLM → 返回结果。 边有几个规则: 1. START 和 END 是保留字:`compose.START` 是图入口,`compose.END` 是图出口 2. 边的两端必须先加节点:你不能连一个不存在的节点 3. 不能从 END 出发:终点后面不能有东西 4. 类型必须匹配:上游节点的输出类型必须能赋给下游节点的输入类型(编译期检查) ### 分支(Branch):运行时才决定走哪条路 边是固定的,分支是动态的。程序运行到这里才判断该走哪条路: ```go // [compose/branch.go](https://github.com/cloudwego/eino/blob/main/compose/branch.go) branch := compose.NewGraphBranch( func(ctx context.Context, input string) (string, error) { // 根据输入决定走哪条路 if strings.Contains(input, "天气") { return "weather_agent", nil // → 天气 Agent } return "general_agent", nil // → 通用 Agent }, map[string]bool{ "weather_agent": true, // 允许的目的地 "general_agent": true, }, ) g.AddBranch("router", branch) // 在 router 节点后面加分支 ``` 分支的函数签名是 `func(ctx, input) (目标节点key, error)`,返回值就是“下一个要走的节点”。`endNodes` map 声明了所有合法目的地——如果函数返回了一个不在 map 里的 key,运行时会报错。 ## 实战 1:画一个意图识别 → 分支处理的图 看看最常见的 Graph 实战场景——先让 LLM 识别用户意图,再根据意图路由到不同的处理逻辑。 ```go func BuildIntentGraph(ctx context.Context, model model.BaseChatModel) (compose.Runnable[[]*schema.Message, *schema.Message], error,) { g := compose.NewGraph[[]*schema.Message, *schema.Message]() // ① 意图识别节点:让 LLM 判断用户想干什么 g.AddLambdaNode("intent", compose.InvokableLambda(func(ctx context.Context, msgs []*schema.Message) (string, error) { // 简化版:用最后一条消息的内容判断意图 lastMsg := msgs[len(msgs)-1].Content if strings.Contains(lastMsg, "天气") { return "weather", nil } if strings.Contains(lastMsg, "翻译") { return "translate", nil } return "chat", nil },)) // ② 天气处理 g.AddLambdaNode("weather_handler", compose.InvokableLambda(func(ctx context.Context, intent string) (*schema.Message, error) { return &schema.Message{Role:schema.Assistant,Content: "今天北京晴天,28°C",}, nil },)) // ③ 翻译处理 g.AddLambdaNode("translate_handler", compose.InvokableLambda(func(ctx context.Context, intent string) (*schema.Message, error) { return &schema.Message{Role:schema.Assistant,Content: "Translation result here...",}, nil },)) // ④ 通用对话(用 LLM) g.AddLambdaNode("chat_handler", compose.InvokableLambda(func(ctx context.Context, intent string) (*schema.Message, error) { // 这里实际会调 LLM return &schema.Message{Role:schema.Assistant,Content: "我是通用助手,请问有什么可以帮你?",}, nil },)) // ⑤ 连线 g.AddEdge(compose.START, "intent") g.AddEdge("weather_handler", compose.END) g.AddEdge("translate_handler", compose.END) g.AddEdge("chat_handler", compose.END) // ⑥ 分支路由 g.AddBranch("intent", compose.NewGraphBranch( func(ctx context.Context, intent string) (string, error) { switch intent { case "weather": return "weather_handler", nil case "translate": return "translate_handler", nil default: return "chat_handler", nil } }, map[string]bool{ "weather_handler": true, "translate_handler": true, "chat_handler": true, }, )) // ⑦ 编译 return g.Compile(ctx, compose.WithGraphName("IntentRouter")) } ``` 这张图画出来长这样: ``` START │ ▼ ┌──────────┐ │ intent │ ← 意图识别 │ (Lambda) │ └────┬─────┘ │ ┌────┴──────┬─────────────┐ │ │ │ ▼ ▼ ▼ weather translate chat ← 分支路由 │ │ │ ▼ ▼ ▼ END END END ``` 这就是 Graph 的核心模式:节点定义“做什么”,边定义“按什么顺序”,分支定义“根据条件走哪条路”。 ## 实战 2:画一个 RAG 管道 这是一个来自 `eino-examples` 的真实案例(eino_assistant/orchestration.go),展示了 Graph 在 RAG 场景里的用法: ```go // 真实代码,来自 eino-examples func BuildEinoAgent(ctx context.Context) (compose.Runnable[*UserMessage, *schema.Message], error,) { g := compose.NewGraph[*UserMessage, *schema.Message]() // 4 个节点 g.AddLambdaNode("InputToQuery", ...) // 用户输入 → 查询文本 g.AddRetrieverNode("RedisRetriever", ...) // 查询文本 → 检索文档 g.AddChatTemplateNode("ChatTemplate", ...) // 文档 + 历史 → 提示词 g.AddLambdaNode("ReactAgent", ...) // 提示词 → LLM 回答 // 5 条边 g.AddEdge(compose.START, "InputToQuery") // 输入 → 提取查询 g.AddEdge(compose.START, "InputToHistory") // 输入 → 提取历史(并行!) g.AddEdge("InputToQuery", "RedisRetriever") // 查询 → 检索 g.AddEdge("RedisRetriever", "ChatTemplate") // 检索结果 → 模板 g.AddEdge("InputToHistory", "ChatTemplate") // 历史 → 模板 g.AddEdge("ChatTemplate", "ReactAgent") // 模板 → Agent g.AddEdge("ReactAgent", compose.END) // Agent → 输出 // 关键:用 AllPredecessor 模式,等所有上游节点都完成再执行 return g.Compile(ctx, compose.WithNodeTriggerMode(compose.AllPredecessor)) } ``` 这张图画出来是: ``` ┌─ START ─┐ │ │ ▼ ▼ InputToQuery InputToHistory ← 并行执行 │ │ ▼ │ RedisRetriever │ ← 检索完成 │ │ └────┬─────┘ ▼ ChatTemplate ← 等两个上游都完成 │ ▼ ReactAgent │ ▼ END ``` 注意 `compose.WithNodeTriggerMode(compose.AllPredecessor)` 这一行——它告诉 Graph:ChatTemplate 节点要等 InputToQuery 和 InputToHistory 两个上游都完成后才执行。这就是 DAG(有向无环图)模式。 ## 两种运行模式:Pregel vs DAG Eino 的 Graph 有两种运行模式,在编译时决定(compose/graph.go): | | Pregel 模式 | DAG 模式 | | :--- | :--- | :--- | | 适用 | 有环图(循环) | 无环图(管道) | | 触发 | 任一上游完成就触发下游 | 所有上游完成才触发下游 | | 环检测 | 不检测(允许环) | 编译时检测,有环报错 | | 步数限制 | 需要 MaxRunSteps(防死循环) | 不需要(无环就不会死循环) | | 典型场景 | ReAct 循环 | RAG 管道、并行处理 | 源码里的判断逻辑很简洁(graph.go:680): ```go // 编译时决定运行模式 runType := runTypePregel // 默认 Pregel cb := pregelChannelBuilder // 如果指定了 AllPredecessor 或是 Workflow 类型 → DAG 模式 if (opt.nodeTriggerMode == AllPredecessor) || isWorkflow(g.cmp) { runType = runTypeDAG cb = dagChannelBuilder } ``` DAG 模式的环检测用了一个经典的拓扑排序算法(graph.go:1077): ``` 1. 统计每个节点的入度(有多少条边指向它) 2. 找到所有入度为 0 的节点(没有依赖) 3. "删除"这些节点,把它们下游的入度减 1 4. 重复步骤 2-3 5. 如果最后还有入度 > 0 的节点 → 图里有环 → 报错 ``` ## StateGraph:节点之间共享状态 普通 Graph 的节点之间只能通过边传数据——上游输出什么,下游就接收什么。但当多个节点需要共享一些中间状态时该怎么办? 比如:Agent 运行过程中要统计“调了几次 LLM”、“用了多少 Token”、“用户偏好是什么”——这些不是某个节点的输入输出,而是全局状态。 Eino 用 `WithGenLocalState` 解决这个问题(compose/state.go): ```go // 定义共享状态 type AgentState struct { CallCount int TotalTokens int UserPrefs map[string]string } // 创建带状态的 Graph g := compose.NewGraph[string, string](compose.WithGenLocalState(func(ctx context.Context) *AgentState { return &AgentState{UserPrefs: make(map[string]string)} }),) // 节点 1:读状态 g.AddLambdaNode("check_prefs", compose.InvokableLambda(func(ctx context.Context, input string) (string, error) { // StatePreHandler 可以在节点执行前读写状态 return input, nil },), // 执行前:从状态里读偏好 compose.WithPreHandler(func(ctx context.Context, in string, state *AgentState) (string, error) { if lang, ok := state.UserPrefs["language"]; ok { return in + " (prefers " + lang + ")", nil } return in, nil }), // 执行后:更新调用计数 compose.WithPostHandler(func(ctx context.Context, out string, state *AgentState) (string, error) { state.CallCount++ return out, nil }), ) ``` `WithPreHandler` 和 `WithPostHandler` 是状态的“钩子”:节点执行前可以读状态做预处理,执行后可以更新状态。状态通过 `context.Context` 在整个图的生命周期内共享。 ## Graph vs Chain:什么时候用哪个 Eino 提供了三种编排方式:Graph、Chain、Workflow。初学者最常问的是“我该用哪个”。 ``` 简单 ──────────────────────────────────────── 复杂 │ │ │ Chain Graph StateGraph/Workflow │ │ │ 线性流程 任意拓扑 需要共享状态 A→B→C 有分支有环 多节点协作 最简单 最灵活 最强大但也最重 ``` 选型原则: | 你的需求 | 用什么 | | :--- | :--- | | 线性管道,没有分支 | Chain(最简单) | | 有分支或循环 | Graph | | 多个节点需要共享状态 | StateGraph(`WithGenLocalState`) | | 不同类型节点间需要字段映射 | Workflow(下篇讲) | Chain 的本质是一个“语法糖”——底层还是编译成 Graph,只是 API 更简洁: ```go // Chain 写法(线性,一行接一行) chain := compose.NewChain[string, string]() chain.AppendChatTemplate(tmpl). // 步骤 1 AppendChatModel(model). // 步骤 2 AppendLambda(postProcess) // 步骤 3 // 等价的 Graph 写法(显式连线) g := compose.NewGraph[string, string]() g.AddChatTemplateNode("step1", tmpl) g.AddChatModelNode("step2", model) g.AddLambdaNode("step3", postProcess) g.AddEdge(compose.START, "step1") g.AddEdge("step1", "step2") g.AddEdge("step2", "step3") g.AddEdge("step3", compose.END) ``` 经验法则:如果能用 Chain 写清楚,就用 Chain。一旦需要“根据条件走不同路”或者“循环”,就必须上 Graph。 ## 编译做了什么 所有编排(Graph/Chain/Workflow)使用前都要调 `Compile()`。编译做了四件事: ``` 1. 类型检查 → 上游输出类型 ≠ 下游输入类型 → 编译报错 2. 拓扑校验 → DAG 模式下有环 → 编译报错 3. 构建 channel → 给每个节点分配数据管道 4. 生成 Runnable → 返回一个可反复调用的执行单元 ``` 源码在 graph.go:674,核心流程: ```go func (g *graph) compile(ctx context.Context, opt *graphCompileOptions) (*composableRunnable, error) { // 1. 决定运行模式 runType := runTypePregel // 默认 if opt.nodeTriggerMode == AllPredecessor { runType = runTypeDAG } // 2. 校验起始/终止节点 if len(g.startNodes) == 0 { return nil, errors.New("start node not set") } if len(g.endNodes) == 0 { return nil, errors.New("end node not set") } // 3. 类型检查:未推断出类型的节点 → 报错 for _, v := range g.toValidateMap { if len(v) > 0 { return nil, fmt.Errorf("类型推断失败: %v", v) } } // 4. DAG 模式:环检测 if runType == runTypeDAG { if err := validateDAG(...); err != nil { return nil, err } } // 5. 构建 runner(执行器) r := &runner{chanSubscribeTo: ..., controlPredecessors: ..., ...} g.compiled = true // 标记已编译,后续不能再加节点/边 return r.toComposableRunnable(), nil } ``` 编译完之后,Graph 就变成了一个 `Runnable[I, O]`——一个类型安全的可执行单元。你可以对它调四种方法: ```go runnable, _ := g.Compile(ctx) result, _ := runnable.Invoke(ctx, input) // 同步调用 stream, _ := runnable.Stream(ctx, input) // 流式调用 result, _ := runnable.Collect(ctx, streamReader) // 消费流 → 聚合 stream, _ := runnable.Transform(ctx, streamReader) // 流 → 流 ``` ## 把图嵌进图:子图 Graph 可以嵌套——一个图作为另一个图的节点。这在构建复杂系统时非常有用: ```go // 先构建一个 RAG 子图 ragGraph := compose.NewGraph[string, []*schema.Document]() ragGraph.AddRetrieverNode("retrieve", retriever) ragGraph.AddLambdaNode("rerank", rerankLambda) ragGraph.AddEdge(compose.START, "retrieve") ragGraph.AddEdge("retrieve", "rerank") ragGraph.AddEdge("rerank", compose.END) // 再构建主图,把 RAG 子图当节点用 mainGraph := compose.NewGraph[string, string]() mainGraph.AddLambdaNode("preprocess", preprocessLambda) mainGraph.AddGraphNode("rag", ragGraph) // ← 子图作为节点 mainGraph.AddChatModelNode("generate", chatModel) mainGraph.AddEdge(compose.START, "preprocess") mainGraph.AddEdge("preprocess", "rag") mainGraph.AddEdge("rag", "generate") mainGraph.AddEdge("generate", compose.END) ``` 源码里用 `AddGraphNode` 方法实现(graph.go:441),内部会把子图编译成一个可执行闭包,嵌进父图的运行时。 ## 可视化:把 Graph 变成 Mermaid 图 Eino 自带一个可视化工具,能把 Graph 编译成 Mermaid 语法(eino-examples/devops/visualize): ```go import "github.com/cloudwego/eino-examples/devops/visualize" // 编译时加一个 callback runnable, err := g.Compile(ctx, compose.WithGraphCompileCallbacks(visualize.NewMermaidGenerator("output/")), compose.WithGraphName("my_graph"), ) ``` 编译完成后会自动生成 Mermaid 文件,复制到 mermaid.live 就能看到可视化流程图。这在调试复杂图时非常有用——光看代码很难理清节点之间的关系,一张图一目了然。 ## 小结 | 概念 | 一句话 | 源码位置 | | :--- | :--- | :--- | | 节点(Node) | 一个执行步骤 | graph.go `AddXxxNode` | | 边(Edge) | 固定的数据流 | graph.go `AddEdge` | | 分支(Branch) | 运行时动态路由 | branch.go `NewGraphBranch` | | Pregel 模式 | 允许环,任一上游完成即触发 | graph.go:680 | | DAG 模式 | 不允许环,所有上游完成才触发 | dag.go | | StateGraph | 节点间共享状态 | state.go `WithGenLocalState` | | 子图 | Graph 嵌 Graph | graph.go:441 `AddGraphNode` | | Compile | 类型检查 + 拓扑校验 + 生成 Runnable | graph.go:674 | 记住三句话: 1. Graph = 节点 + 边 + 分支,能画任何流程 2. 编译时做类型检查和拓扑校验,运行时不会遇到结构错误 3. 能用 Chain 就用 Chain,需要分支或循环才上 Graph 下一篇我们看 Chain 和 Workflow——Graph 的两个“简化版”兄弟,什么时候用它们更省事。 *本文涉及的源码版本:eino main 分支。文中代码已简化以突出核心逻辑,完整实现请参考源码链接。*
免责声明

本网站新闻资讯均来自公开渠道,力求准确但不保证绝对无误,内容观点仅代表作者本人,与本站无关。若涉及侵权,请联系我们处理。本站保留对声明的修改权,最终解释权归本站所有。

相关阅读

更多
欢迎回来 登录或注册后,可保存提示词和历史记录
登录后可同步收藏、历史记录和常用模板
注册即表示同意服务条款与隐私政策