Graph编排进阶:从ReAct到通用DAG实战指南
- Graph 能画哪些 ReAct 画不了的图
- 节点、边、分支三个概念怎么组合出复杂流程
- 什么时候该用 Graph,什么时候用 Chain 更简单
- StateGraph 怎么让节点之间共享状态
## 先看 Eino 的 Graph 能画什么
上一节我们看到了 ReAct 的图:一个 LLM 节点 + 一个工具节点,再加一个分支路由来决定是继续还是结束。这是最简单的循环图。
但 Graph 的能力远不止于此。它能画三种图:
```
┌─────────────────────────────────────────────────────────────┐
│ Eino Graph 能画什么 │
├──────────────┬──────────────────────────────────────────────┤
│ 线性管道 │ A → B → C → END │
│ │ 翻译 → 摘要 → 评分 │
├──────────────┼──────────────────────────────────────────────┤
│ │ ┌─ path1 → D │
│ 条件分支 │ A → B → C ─┤ │
│ │ └─ path2 → E │
│ │ 意图识别 → 路由 → 不同处理 │
├──────────────┼──────────────────────────────────────────────┤
│ │ ┌──── D ────┐ │
│ 并行+汇聚 │ A → B┤ ├→ F → END │
│ │ └──── E ────┘ │
│ │ 检索 → 并行打分+重排 → 合并 │
├──────────────┼──────────────────────────────────────────────┤
│ │ ┌─ D ─┐ │
│ 循环(Pregel) │ A → B ─┤├─ B ─→ END │
│ │ └─────┘ │
│ │ ReAct 循环:推理 → 行动 → 再推理 │
└──────────────┴──────────────────────────────────────────────┘
```
四种模式,一套 API。区别只在于你怎么连节点和边。
## 三个基本概念:节点、边、分支
### 节点(Node):一个执行步骤
节点就是图里的一个“做事的方块”。Eino 提供了 10+ 种节点类型,每种对应一个组件接口:
```go
// [compose/generic_graph.go](https://github.com/cloudwego/eino/blob/main/compose/generic_graph.go)
g := compose.NewGraph[string, string]()
// 最常用:LLM 节点
g.AddChatModelNode("chat", chatModel)
// 工具节点
g.AddToolsNode("tools", toolsConfig)
// 模板节点
g.AddChatTemplateNode("template", tmpl)
// 检索节点
g.AddRetrieverNode("retriever", retriever)
// 万能节点:塞任意函数
g.AddLambdaNode("my_logic", compose.InvokableLambda(func(ctx context.Context, input string) (string, error) {
return "处理后的: " + input, nil
},))
// 甚至可以把另一个图当节点嵌进去
g.AddGraphNode("sub_graph", anotherGraph)
```
每种 `AddXxxNode` 都是在图里加一个节点。节点之间怎么连?靠边和分支。
### 边(Edge):数据怎么流
边定义了节点之间的固定连线——数据从上一个节点的输出,流向下一个节点的输入:
```go
// [compose/generic_graph.go](https://github.com/cloudwego/eino/blob/main/compose/generic_graph.go)
g.AddEdge(compose.START, "template") // 输入 → 模板
g.AddEdge("template", "chat") // 模板 → LLM
g.AddEdge("chat", compose.END) // LLM → 输出
```
三条边画出了一个最简单的管道:用户输入 → 填模板 → 调 LLM → 返回结果。
边有几个规则:
1. START 和 END 是保留字:`compose.START` 是图入口,`compose.END` 是图出口
2. 边的两端必须先加节点:你不能连一个不存在的节点
3. 不能从 END 出发:终点后面不能有东西
4. 类型必须匹配:上游节点的输出类型必须能赋给下游节点的输入类型(编译期检查)
### 分支(Branch):运行时才决定走哪条路
边是固定的,分支是动态的。程序运行到这里才判断该走哪条路:
```go
// [compose/branch.go](https://github.com/cloudwego/eino/blob/main/compose/branch.go)
branch := compose.NewGraphBranch(
func(ctx context.Context, input string) (string, error) {
// 根据输入决定走哪条路
if strings.Contains(input, "天气") {
return "weather_agent", nil // → 天气 Agent
}
return "general_agent", nil // → 通用 Agent
},
map[string]bool{
"weather_agent": true, // 允许的目的地
"general_agent": true,
},
)
g.AddBranch("router", branch) // 在 router 节点后面加分支
```
分支的函数签名是 `func(ctx, input) (目标节点key, error)`,返回值就是“下一个要走的节点”。`endNodes` map 声明了所有合法目的地——如果函数返回了一个不在 map 里的 key,运行时会报错。
## 实战 1:画一个意图识别 → 分支处理的图
看看最常见的 Graph 实战场景——先让 LLM 识别用户意图,再根据意图路由到不同的处理逻辑。
```go
func BuildIntentGraph(ctx context.Context, model model.BaseChatModel) (compose.Runnable[[]*schema.Message, *schema.Message], error,) {
g := compose.NewGraph[[]*schema.Message, *schema.Message]()
// ① 意图识别节点:让 LLM 判断用户想干什么
g.AddLambdaNode("intent", compose.InvokableLambda(func(ctx context.Context, msgs []*schema.Message) (string, error) {
// 简化版:用最后一条消息的内容判断意图
lastMsg := msgs[len(msgs)-1].Content
if strings.Contains(lastMsg, "天气") {
return "weather", nil
}
if strings.Contains(lastMsg, "翻译") {
return "translate", nil
}
return "chat", nil
},))
// ② 天气处理
g.AddLambdaNode("weather_handler", compose.InvokableLambda(func(ctx context.Context, intent string) (*schema.Message, error) {
return &schema.Message{Role:schema.Assistant,Content: "今天北京晴天,28°C",}, nil
},))
// ③ 翻译处理
g.AddLambdaNode("translate_handler", compose.InvokableLambda(func(ctx context.Context, intent string) (*schema.Message, error) {
return &schema.Message{Role:schema.Assistant,Content: "Translation result here...",}, nil
},))
// ④ 通用对话(用 LLM)
g.AddLambdaNode("chat_handler", compose.InvokableLambda(func(ctx context.Context, intent string) (*schema.Message, error) {
// 这里实际会调 LLM
return &schema.Message{Role:schema.Assistant,Content: "我是通用助手,请问有什么可以帮你?",}, nil
},))
// ⑤ 连线
g.AddEdge(compose.START, "intent")
g.AddEdge("weather_handler", compose.END)
g.AddEdge("translate_handler", compose.END)
g.AddEdge("chat_handler", compose.END)
// ⑥ 分支路由
g.AddBranch("intent", compose.NewGraphBranch(
func(ctx context.Context, intent string) (string, error) {
switch intent {
case "weather":
return "weather_handler", nil
case "translate":
return "translate_handler", nil
default:
return "chat_handler", nil
}
},
map[string]bool{
"weather_handler": true,
"translate_handler": true,
"chat_handler": true,
},
))
// ⑦ 编译
return g.Compile(ctx, compose.WithGraphName("IntentRouter"))
}
```
这张图画出来长这样:
```
START
│
▼
┌──────────┐
│ intent │ ← 意图识别
│ (Lambda) │
└────┬─────┘
│
┌────┴──────┬─────────────┐
│ │ │
▼ ▼ ▼
weather translate chat ← 分支路由
│ │ │
▼ ▼ ▼
END END END
```
这就是 Graph 的核心模式:节点定义“做什么”,边定义“按什么顺序”,分支定义“根据条件走哪条路”。
## 实战 2:画一个 RAG 管道
这是一个来自 `eino-examples` 的真实案例(eino_assistant/orchestration.go),展示了 Graph 在 RAG 场景里的用法:
```go
// 真实代码,来自 eino-examples
func BuildEinoAgent(ctx context.Context) (compose.Runnable[*UserMessage, *schema.Message], error,) {
g := compose.NewGraph[*UserMessage, *schema.Message]()
// 4 个节点
g.AddLambdaNode("InputToQuery", ...) // 用户输入 → 查询文本
g.AddRetrieverNode("RedisRetriever", ...) // 查询文本 → 检索文档
g.AddChatTemplateNode("ChatTemplate", ...) // 文档 + 历史 → 提示词
g.AddLambdaNode("ReactAgent", ...) // 提示词 → LLM 回答
// 5 条边
g.AddEdge(compose.START, "InputToQuery") // 输入 → 提取查询
g.AddEdge(compose.START, "InputToHistory") // 输入 → 提取历史(并行!)
g.AddEdge("InputToQuery", "RedisRetriever") // 查询 → 检索
g.AddEdge("RedisRetriever", "ChatTemplate") // 检索结果 → 模板
g.AddEdge("InputToHistory", "ChatTemplate") // 历史 → 模板
g.AddEdge("ChatTemplate", "ReactAgent") // 模板 → Agent
g.AddEdge("ReactAgent", compose.END) // Agent → 输出
// 关键:用 AllPredecessor 模式,等所有上游节点都完成再执行
return g.Compile(ctx, compose.WithNodeTriggerMode(compose.AllPredecessor))
}
```
这张图画出来是:
```
┌─ START ─┐
│ │
▼ ▼
InputToQuery InputToHistory ← 并行执行
│ │
▼ │
RedisRetriever │ ← 检索完成
│ │
└────┬─────┘
▼
ChatTemplate ← 等两个上游都完成
│
▼
ReactAgent
│
▼
END
```
注意 `compose.WithNodeTriggerMode(compose.AllPredecessor)` 这一行——它告诉 Graph:ChatTemplate 节点要等 InputToQuery 和 InputToHistory 两个上游都完成后才执行。这就是 DAG(有向无环图)模式。
## 两种运行模式:Pregel vs DAG
Eino 的 Graph 有两种运行模式,在编译时决定(compose/graph.go):
| | Pregel 模式 | DAG 模式 |
| :--- | :--- | :--- |
| 适用 | 有环图(循环) | 无环图(管道) |
| 触发 | 任一上游完成就触发下游 | 所有上游完成才触发下游 |
| 环检测 | 不检测(允许环) | 编译时检测,有环报错 |
| 步数限制 | 需要 MaxRunSteps(防死循环) | 不需要(无环就不会死循环) |
| 典型场景 | ReAct 循环 | RAG 管道、并行处理 |
源码里的判断逻辑很简洁(graph.go:680):
```go
// 编译时决定运行模式
runType := runTypePregel // 默认 Pregel
cb := pregelChannelBuilder
// 如果指定了 AllPredecessor 或是 Workflow 类型 → DAG 模式
if (opt.nodeTriggerMode == AllPredecessor) || isWorkflow(g.cmp) {
runType = runTypeDAG
cb = dagChannelBuilder
}
```
DAG 模式的环检测用了一个经典的拓扑排序算法(graph.go:1077):
```
1. 统计每个节点的入度(有多少条边指向它)
2. 找到所有入度为 0 的节点(没有依赖)
3. "删除"这些节点,把它们下游的入度减 1
4. 重复步骤 2-3
5. 如果最后还有入度 > 0 的节点 → 图里有环 → 报错
```
## StateGraph:节点之间共享状态
普通 Graph 的节点之间只能通过边传数据——上游输出什么,下游就接收什么。但当多个节点需要共享一些中间状态时该怎么办?
比如:Agent 运行过程中要统计“调了几次 LLM”、“用了多少 Token”、“用户偏好是什么”——这些不是某个节点的输入输出,而是全局状态。
Eino 用 `WithGenLocalState` 解决这个问题(compose/state.go):
```go
// 定义共享状态
type AgentState struct {
CallCount int
TotalTokens int
UserPrefs map[string]string
}
// 创建带状态的 Graph
g := compose.NewGraph[string, string](compose.WithGenLocalState(func(ctx context.Context) *AgentState {
return &AgentState{UserPrefs: make(map[string]string)}
}),)
// 节点 1:读状态
g.AddLambdaNode("check_prefs", compose.InvokableLambda(func(ctx context.Context, input string) (string, error) {
// StatePreHandler 可以在节点执行前读写状态
return input, nil
},),
// 执行前:从状态里读偏好
compose.WithPreHandler(func(ctx context.Context, in string, state *AgentState) (string, error) {
if lang, ok := state.UserPrefs["language"]; ok {
return in + " (prefers " + lang + ")", nil
}
return in, nil
}),
// 执行后:更新调用计数
compose.WithPostHandler(func(ctx context.Context, out string, state *AgentState) (string, error) {
state.CallCount++
return out, nil
}),
)
```
`WithPreHandler` 和 `WithPostHandler` 是状态的“钩子”:节点执行前可以读状态做预处理,执行后可以更新状态。状态通过 `context.Context` 在整个图的生命周期内共享。
## Graph vs Chain:什么时候用哪个
Eino 提供了三种编排方式:Graph、Chain、Workflow。初学者最常问的是“我该用哪个”。
```
简单 ──────────────────────────────────────── 复杂
│ │ │
Chain Graph StateGraph/Workflow
│ │ │
线性流程 任意拓扑 需要共享状态
A→B→C 有分支有环 多节点协作
最简单 最灵活 最强大但也最重
```
选型原则:
| 你的需求 | 用什么 |
| :--- | :--- |
| 线性管道,没有分支 | Chain(最简单) |
| 有分支或循环 | Graph |
| 多个节点需要共享状态 | StateGraph(`WithGenLocalState`) |
| 不同类型节点间需要字段映射 | Workflow(下篇讲) |
Chain 的本质是一个“语法糖”——底层还是编译成 Graph,只是 API 更简洁:
```go
// Chain 写法(线性,一行接一行)
chain := compose.NewChain[string, string]()
chain.AppendChatTemplate(tmpl). // 步骤 1
AppendChatModel(model). // 步骤 2
AppendLambda(postProcess) // 步骤 3
// 等价的 Graph 写法(显式连线)
g := compose.NewGraph[string, string]()
g.AddChatTemplateNode("step1", tmpl)
g.AddChatModelNode("step2", model)
g.AddLambdaNode("step3", postProcess)
g.AddEdge(compose.START, "step1")
g.AddEdge("step1", "step2")
g.AddEdge("step2", "step3")
g.AddEdge("step3", compose.END)
```
经验法则:如果能用 Chain 写清楚,就用 Chain。一旦需要“根据条件走不同路”或者“循环”,就必须上 Graph。
## 编译做了什么
所有编排(Graph/Chain/Workflow)使用前都要调 `Compile()`。编译做了四件事:
```
1. 类型检查 → 上游输出类型 ≠ 下游输入类型 → 编译报错
2. 拓扑校验 → DAG 模式下有环 → 编译报错
3. 构建 channel → 给每个节点分配数据管道
4. 生成 Runnable → 返回一个可反复调用的执行单元
```
源码在 graph.go:674,核心流程:
```go
func (g *graph) compile(ctx context.Context, opt *graphCompileOptions) (*composableRunnable, error) {
// 1. 决定运行模式
runType := runTypePregel // 默认
if opt.nodeTriggerMode == AllPredecessor {
runType = runTypeDAG
}
// 2. 校验起始/终止节点
if len(g.startNodes) == 0 { return nil, errors.New("start node not set") }
if len(g.endNodes) == 0 { return nil, errors.New("end node not set") }
// 3. 类型检查:未推断出类型的节点 → 报错
for _, v := range g.toValidateMap {
if len(v) > 0 { return nil, fmt.Errorf("类型推断失败: %v", v) }
}
// 4. DAG 模式:环检测
if runType == runTypeDAG {
if err := validateDAG(...); err != nil { return nil, err }
}
// 5. 构建 runner(执行器)
r := &runner{chanSubscribeTo: ..., controlPredecessors: ..., ...}
g.compiled = true // 标记已编译,后续不能再加节点/边
return r.toComposableRunnable(), nil
}
```
编译完之后,Graph 就变成了一个 `Runnable[I, O]`——一个类型安全的可执行单元。你可以对它调四种方法:
```go
runnable, _ := g.Compile(ctx)
result, _ := runnable.Invoke(ctx, input) // 同步调用
stream, _ := runnable.Stream(ctx, input) // 流式调用
result, _ := runnable.Collect(ctx, streamReader) // 消费流 → 聚合
stream, _ := runnable.Transform(ctx, streamReader) // 流 → 流
```
## 把图嵌进图:子图
Graph 可以嵌套——一个图作为另一个图的节点。这在构建复杂系统时非常有用:
```go
// 先构建一个 RAG 子图
ragGraph := compose.NewGraph[string, []*schema.Document]()
ragGraph.AddRetrieverNode("retrieve", retriever)
ragGraph.AddLambdaNode("rerank", rerankLambda)
ragGraph.AddEdge(compose.START, "retrieve")
ragGraph.AddEdge("retrieve", "rerank")
ragGraph.AddEdge("rerank", compose.END)
// 再构建主图,把 RAG 子图当节点用
mainGraph := compose.NewGraph[string, string]()
mainGraph.AddLambdaNode("preprocess", preprocessLambda)
mainGraph.AddGraphNode("rag", ragGraph) // ← 子图作为节点
mainGraph.AddChatModelNode("generate", chatModel)
mainGraph.AddEdge(compose.START, "preprocess")
mainGraph.AddEdge("preprocess", "rag")
mainGraph.AddEdge("rag", "generate")
mainGraph.AddEdge("generate", compose.END)
```
源码里用 `AddGraphNode` 方法实现(graph.go:441),内部会把子图编译成一个可执行闭包,嵌进父图的运行时。
## 可视化:把 Graph 变成 Mermaid 图
Eino 自带一个可视化工具,能把 Graph 编译成 Mermaid 语法(eino-examples/devops/visualize):
```go
import "github.com/cloudwego/eino-examples/devops/visualize"
// 编译时加一个 callback
runnable, err := g.Compile(ctx,
compose.WithGraphCompileCallbacks(visualize.NewMermaidGenerator("output/")),
compose.WithGraphName("my_graph"),
)
```
编译完成后会自动生成 Mermaid 文件,复制到 mermaid.live 就能看到可视化流程图。这在调试复杂图时非常有用——光看代码很难理清节点之间的关系,一张图一目了然。
## 小结
| 概念 | 一句话 | 源码位置 |
| :--- | :--- | :--- |
| 节点(Node) | 一个执行步骤 | graph.go `AddXxxNode` |
| 边(Edge) | 固定的数据流 | graph.go `AddEdge` |
| 分支(Branch) | 运行时动态路由 | branch.go `NewGraphBranch` |
| Pregel 模式 | 允许环,任一上游完成即触发 | graph.go:680 |
| DAG 模式 | 不允许环,所有上游完成才触发 | dag.go |
| StateGraph | 节点间共享状态 | state.go `WithGenLocalState` |
| 子图 | Graph 嵌 Graph | graph.go:441 `AddGraphNode` |
| Compile | 类型检查 + 拓扑校验 + 生成 Runnable | graph.go:674 |
记住三句话:
1. Graph = 节点 + 边 + 分支,能画任何流程
2. 编译时做类型检查和拓扑校验,运行时不会遇到结构错误
3. 能用 Chain 就用 Chain,需要分支或循环才上 Graph
下一篇我们看 Chain 和 Workflow——Graph 的两个“简化版”兄弟,什么时候用它们更省事。
*本文涉及的源码版本:eino main 分支。文中代码已简化以突出核心逻辑,完整实现请参考源码链接。*