LangChain Agent中间件:六种钩子函数精讲与实战
在深入 LangChain Agent 的钩子中间件之前,先界定一个前提:这些钩子并非可有可无的装饰,而是实现 Agent 行为可观测、可操控、可伸缩的基础设施。下方流程图源自实践笔记,精确标注了 Agent 执行流中六个关键钩子的触发点。建议先花一分钟读懂这张图——后续所有代码都是在为它填充具体实现。
接着,我们构建一个完整的 Agent,依次使用 before_agent、before_model、wrap_model_call、after_model、wrap_tool_call、after_agent 这六种钩子,同时引入 dynamic_prompt 这一特殊形式。每个钩子都会实现一个典型的生产级功能,并附上详细注释。完整运行一遍代码后,你对整个钩子体系的运转就能形成直观把握。
1. 钩子中间件全景图:Agent 执行流中的六种钩子
1.1 流程图:六种钩子的执行顺序
用户输入
│
▼
┌─────────────────────────────────────────────────────────────────────┐
│[before_agent] │
│作用:全局初始化、权限校验、注入初始状态 │
│可修改:state │
└─────────────────────────────────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────────────┐
│[动态提示词](dynamic_prompt) │
│作用:根据 state/context 生成 system prompt │
│注:实际是 wrap_model_call 的一种特例,但语义上独立 │
└─────────────────────────────────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────────────┐
│┌─────────────────────────────────────────────────────────────┐ │
││Agent 主循环(直到停止条件满足) │ │
││ │ │
││┌─────────────────────────────────────────────────────┐ │ │
│││[before_model] │ │ │
│││作用:动态修改 messages、请求限速、注入上下文 │ │ │
│││可修改:state, ModelRequest │ │ │
││└─────────────────────────────────────────────────────┘ │ │
│││ │ │
││▼ │ │
││┌─────────────────────────────────────────────────────┐ │ │
│││【wrap_model_call】 │ │ │
│││作用:模型替换、缓存、重试、超时控制、错误处理 │ │ │
│││可拦截:可跳过 handler,直接返回响应 │ │ │
│││ │ │ │
│││ handler(request) ──┐ │ │ │
│││ │ │ │ │
│││ ▼ │ │ │
│││ 实际模型调用 │ │ │
│││ (OpenAI/Anthropic) │ │ │
│││ │ │ │ │
│││ ▼ │ │ │
│││┌──────────────────┴─────────────────────────┐ │ │ │
││││ 返回 ModelResponse │ │ │ │
│││└────────────────────────────────────────────┘ │ │ │
││└─────────────────────────────────────────────────────┘ │ │
│││ │ │
││▼ │ │
││┌─────────────────────────────────────────────────────┐ │ │
│││[after_model] │ │ │
│││作用:响应验证、敏感词过滤、结果转换、条件路由 │ │ │
│││可修改:ModelResponse │ │ │
││└─────────────────────────────────────────────────────┘ │ │
│││ │ │
││▼ │ │
││ 响应中是否包含工具调用? │ │
│││ │ │
││ ┌─────────┘└─────────┐ │ │
││ 是 否 │ │
││ ▼ ▼ │ │
││┌─────────────────────┐ ┌──────────────┐ │ │
│││ 【wrap_tool_call】 │ │ 结束循环 │ │ │
│││ 包裹每个工具调用 │ │ 退出到 │ │ │
│││ │ after_agent │ │ │
│││ handler(request) ──┐ └──────────────┘ │ │
│││ ││ │ │
│││ ▼│ │ │
│││ 实际工具执行 │ │
│││ (API/DB/计算) │ │
│││ ││ │ │
│││ ▼│ │ │
│││ 返回 ToolMessage │ │
│││ │ │ │
│││作用:参数校验、 │ │ │
│││错误恢复、重试、 │ │ │
│││执行监控、审计 │ │ │
││└─────────────────────┘│ │ │
││ ┌────────────────┘ │ │
││ ▼ │ │
││将 ToolMessage 加入 state["messages"] │ │
││继续下一轮循环(回到 before_model) │ │
│└─────────────────────────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────────────┐
│[after_agent] │
│作用:最终结果处理、数据落库、清理资源、汇总指标 │
│可修改:最终输出 state │
└─────────────────────────────────────────────────────────────────────┘
│
▼
最终输出(最终回答)
1.2 关键流转说明
before_agent仅在开始时执行一次。dynamic_prompt本质上是wrap_model_call的封装,每次模型调用前动态生成系统提示。- 模型调用循环(
before_model→wrap_model_call→ 实际模型 →after_model)可能重复多次,直到没有工具调用或满足停止条件。 - 工具调用(
wrap_tool_call)每次执行一个工具,可多次执行(并行或串行)。 after_agent只在所有循环结束后执行一次。
LangChain 的设计保持一致:所有 Node-style 钩子(before_agent、before_model、after_model、after_agent)均接收 state 和 runtime 两个参数。
2. 完整教学代码:六种钩子中间件的综合应用(显式类版本)
2.1 导入依赖和配置日志
"""完整教学代码:六种钩子中间件的综合应用(显式类版本)
使用 TypedDict 定义状态,实现类型安全的中间件开发"""
import asyncio
import time
import logging
from typing import Any, Dict, Optional, Annotated, TypedDict, List
from langchain_core.messages import BaseMessage, ToolMessage, HumanMessage, AIMessage
from langchain.agents import create_agent, AgentState
from langchain.agents.middleware import (
before_agent, before_model, after_model, after_agent,
wrap_model_call, wrap_tool_call, dynamic_prompt,
ModelRequest, ModelResponse
)
from langchain.tools import tool
from dotenv import load_dotenv
from langchain.agents.middleware import ModelResponse
# 加载 .env 文件中的环境变量
load_dotenv()
# 配置日志
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
这段代码完成的任务:
- 导入所有必需的钩子装饰器和类型。
load_dotenv()从.env文件读取 API Key(例如QWEN_API_KEY)。- 配置日志输出格式,便于观察每个钩子的触发顺序。
3. 定义状态类(类型安全的关键)
3.1 为什么需要显式定义状态?
在 LangChain 中,Agent 的状态(state)是一个字典,包含 messages 对话历史及自定义字段。若不定义类型,IDE 无法提供自动补全,容易误写字段名。使用 TypedDict 可使状态具备类型安全性。
但 TypedDict 与 AgentState 有什么区别?何时选用哪个?原始笔记中有非常详尽的辨析,完整保留如下。
3.2 定义 AppState 类
class AppState(TypedDict):
"""Agent 的完整状态结构,使用 TypedDict 实现类型安全
Python中的TypedDict:让字典更加的准确和安全
Typedict定义必需的键
指定键的类型
在静态检查时发现问题
保持运行时灵活性"""
# messages 使用 add_messages reducer,自动合并而非覆盖
messages: Annotated[List[BaseMessage], "add_messages"]
"""先说 Annotated(adj:带注解的,注释的) 是什么?
Annotated 是 Python 的类型注解增强工具,它允许你给一个类型添加额外的元数据(metadata)。
简单比喻:
普通类型注解:messages: List[BaseMessage] → "这是一个消息列表"
Annotated 注解:messages: Annotated[List[BaseMessage], "add_messages"] → "这是一个消息列表,并且要用 add_messages 规则来合并它"
Annotated[基础类型,额外信息 1, 额外信息 2, ...]
"add_messages": Merges two lists of messages, updating existing messages by ID.
为什么需要 "add_messages"(是LangChain自带的硬编码规则)?
这是 LangGraph/LangChain 的特殊机制!
问题场景:假设有 3 个中间件都要修改 messages:
# 中间件 1
return {"messages": [新消息 1]}
# 中间件 2
return {"messages": [新消息 2]}
# 中间件 3
return {"messages": [新消息 3]}
如果没有 add_messages:后面的返回值会覆盖前面的;最后只剩 [新消息 3],丢失了前两条消息 ❌
有了 add_messages:LangChain 会自动合并所有消息
最终结果:[新消息 1, 新消息 2, 新消息 3] ✅
"""
# 自定义业务字段
user_id: str
start_time: float
call_count: int
before_model_triggered: bool
after_agent_cleanup: bool
# 可选字段(用 NotRequired 标记,Python 3.11+ 可用,这里用 Optional)
intermediate_results: Optional[Dict[str, Any]]
这段代码完成的任务:
messages字段使用Annotated+"add_messages"告知 LangChain:多个中间件返回的messages需要合并而非覆盖。- 自定义字段
user_id、start_time、call_count等用于在钩子之间传递数据。 - 该
AppState类会在create_agent中作为state_schema传入。
4. 定义工具
# ========== 2. 定义工具 ==========
@tool
def get_weather(location: str) -> str:
"""获取指定城市的天气(模拟可能失败的 API)"""
if location.lower() == "error":
raise ValueError("模拟的 API 错误:无法获取天气")
return f"{location} 天气晴朗,温度 25°C"
说明:该工具接收 location 参数,若传入 "error" 则抛出异常,用于测试 wrap_tool_call 的错误恢复机制。
5. 定义钩子函数(核心内容)
5.1 before_agent:Agent 开始前的全局初始化
作用:在 Agent 主循环开始前执行一次,常用于权限校验、初始化计时器、注入用户 ID 等。
# ========== 3. 定义钩子(全部使用显式类型) ==========
# 3.1 before_agent: 全局初始化
@before_agent
def log_agent_start(state: AppState, runtime) -> Optional[Dict[str, Any]]:
"""Agent 开始前触发。参数 state 现在有完整的类型提示,IDE 会自动补全字段。"""
logger.info("=== before_agent: Agent 开始执行 ===")
# 现在可以直接使用 state["user_id"],IDE 会提示这是 str 类型
user_id = runtime.context.get("user_id", "anonymous")
start_time = time.time()
# 返回要更新的 state 字段(类型安全)
return {
"user_id": user_id,
"start_time": start_time,
"call_count": state.get("call_count", 0) + 1,
"intermediate_results": {}
}
执行流程说明:
- 从
runtime.context中获取调用时传入的user_id(参见后续agent.invoke的context参数)。 - 记录开始时间戳。
- 返回字典,这些字段会被合并到当前
state中。 - 后续钩子(如
dynamic_prompt)可通过state["user_id"]访问该值。
5.2 答疑:runtime 是什么?request.state 和 state 的关系?
原始笔记中有一段非常详细的答疑,完整保留如下(已转为普通文本)。
5.3 dynamic_prompt:动态系统提示
作用:它不是传统意义上的“中间件”,而是“提示词生成器”——每次模型调用前,根据当前状态动态生成 system prompt。
# 3.2 dynamic_prompt: 动态系统提示
@dynamic_prompt
def custom_system_prompt(request: ModelRequest) -> str:
"""根据状态动态生成系统提示。注意:dynamic_prompt 的参数是 ModelRequest,不是 state,但可以通过 request.state 访问状态。"""
# 这里 request.state 的类型是 Any,但实际是 AppState
state: AppState = request.state # 类型断言,让 IDE 识别
user_id = state.get("user_id", "访客")
call_count = state.get("call_count", 1)
prompt = f"你是一个智能助手,正在为 {user_id} 服务。"
if call_count > 3:
prompt += " 用户已经多次提问,请提供更简洁的回答。"
logger.info(f"dynamic_prompt 生成系统提示: {prompt[:50]}...")
return prompt
补充说明(来自原始笔记):
执行流程说明:
- 每次模型调用前,LangChain 会执行该函数,将返回的字符串作为本次调用的系统提示。
- 此处根据
user_id和call_count动态调整提示内容。 - 注意:
dynamic_prompt的参数是ModelRequest,而非state,但可通过request.state访问状态。
5.4 before_model:模型调用前的处理
作用:每次模型调用前执行,可修改 messages 或注入额外信息。
# 3.3 before_model: 模型调用前处理
@before_model
def before_model_hook(state: AppState, runtime) -> Optional[Dict[str, Any]]:
"""模型调用前触发,可以修改请求。现在 state 有完整类型,可以直接访问字段。"""
logger.info("=== before_model: 准备调用模型 ===")
# 类型安全:state["messages"] 被 IDE 识别为 List[BaseMessage]
messages = state.get("messages", [])
# 如果消息过多,裁剪(但注意 messages 是 Annotated,这里直接操作可能影响 reducer)
if len(messages) > 10:
logger.warning(f"消息数量过多({len(messages)}),裁剪至最后 5 条")
# 注意:这里直接修改 state 可能不通过 reducer,最好返回更新
# 我们通过返回值来更新
return {"messages": messages[-5:], "before_model_triggered": True}
return {"before_model_triggered": True}
执行流程说明:
- 检查当前
messages列表长度,若超过 10 条则裁剪至最近 5 条(防止上下文过长导致 token 浪费)。 - 通过返回值更新状态,设置
before_model_triggered = True用于调试。
5.5 wrap_model_call:包裹模型调用(实现缓存和重试)
作用:这是最核心的钩子之一。它包裹实际的模型调用函数(handler),允许你在调用前后插入逻辑,例如缓存、重试、超时控制。
# 3.4 wrap_model_call: 包裹模型调用(缓存 + 重试)
@wrap_model_call
def model_call_wrapper(request: ModelRequest, handler):
"""包裹模型调用,实现缓存和重试。request 包含 state,但类型是 Any,我们可以断言。"""
logger.info("=== wrap_model_call: 进入模型调用包装器 ===")
# 类型断言,让 IDE 知道 state 的结构
state: AppState = request.state
# 简单缓存:使用消息内容作为键
cache_key = str(request.messages)[:200]
if not hasattr(model_call_wrapper, "cache"):
model_call_wrapper.cache = {}
cached = model_call_wrapper.cache.get(cache_key)
if cached:
logger.info("wrap_model_call: 缓存命中,直接返回")
return cached
# 重试机制
max_retries = 2
for attempt in range(max_retries + 1):
try:
logger.info(f"wrap_model_call: 第 {attempt + 1} 次尝试")
response = handler(request)
model_call_wrapper.cache[cache_key] = response
logger.info("wrap_model_call: 调用成功")
return response
except Exception as e:
logger.error(f"wrap_model_call: 失败 (尝试 {attempt + 1}): {e}")
if attempt == max_retries:
raise
time.sleep(2 ** attempt)
return None # 不会执行到这里
答疑:hasattr 为什么总和 @wrap_model_call 一起出现?
执行流程说明:
handler(request)是真实的模型调用(或下一个中间件)。- 此处实现了两个功能:
- 缓存:将请求的 messages 转为字符串作为 key,若命中缓存则直接返回缓存的
ModelResponse,不再调用模型。 - 重试:若调用失败(异常),最多重试 2 次,每次等待时间按指数退避(2^attempt 秒)。
- 缓存:将请求的 messages 转为字符串作为 key,若命中缓存则直接返回缓存的
- 注意
hasattr(model_call_wrapper, "cache")用于给函数对象动态添加缓存字典。
5.6 重点解析:中间件函数参数的两种形态
原始笔记中有一段非常清晰的对比,完整保留如下(普通文本)。
5.7 after_model:模型调用后的处理
作用:在模型返回响应后执行,可用于过滤敏感词、添加元数据等。
# 3.5 after_model: 模型调用后处理
@after_model
def after_model_hook(state: AppState, runtime) -> Optional[Dict[str, Any]]:
"""模型调用后触发,可以修改响应。注意:after_model 的参数是 state 字典,不是 ModelResponse。需要从 state 中获取最后一条消息(模型的回答)。"""
logger.info("=== after_model: 模型调用后处理 ===")
# 从 state 中获取消息列表
messages = state.get("messages", [])
if messages:
# 获取最后一条消息(模型的回答)
last_message = messages[-1]
content = last_message.content if hasattr(last_message, 'content') else str(last_message)
# 过滤敏感词
if "敏感词" in content:
content = content.replace("敏感词", "[已过滤]")
logger.warning("after_model: 检测到敏感词并过滤")
# 修改消息内容
if hasattr(last_message, 'content'):
last_message.content = content
# 添加自定义元数据
if hasattr(last_message, "additional_kwargs"):
last_message.additional_kwargs["after_model_processed"] = True
return {"after_model_processed": True}
执行流程说明:
- 从
state["messages"]中取出最后一条消息(即模型刚生成的回答)。 - 检查内容是否包含敏感词,若包含则替换。
- 在消息的
additional_kwargs中添加标记,表示已经过after_model处理。 - 返回
{"after_model_processed": True}更新状态。
5.8 wrap_tool_call:包裹工具调用
作用:包裹每个工具的执行,实现参数校验、错误恢复、重试等。
# 3.6 wrap_tool_call: 包裹工具调用
@wrap_tool_call
def tool_call_wrapper(request, handler): #request 的类型是 ToolCallRequest,不是 ModelRequest。!!!!!
"""包裹工具调用,实现参数校验和错误恢复。request 的类型是 ToolCallRequest,不是 ModelRequest。"""
tool_name = request.tool_call["name"]
args = request.tool_call.get("args", {})
logger.info(f"=== wrap_tool_call: 工具 {tool_name} 被调用,参数: {args} ===")
# 参数校验
if tool_name == "get_weather" and not args.get("location"):
return ToolMessage(
content="错误:location 参数不能为空。",
tool_call_id=request.tool_call["id"]
)
# 重试逻辑
max_retries = 2
for attempt in range(max_retries + 1):
try:
logger.info(f"wrap_tool_call: 第 {attempt + 1} 次尝试")
result = handler(request)
logger.info(f"wrap_tool_call: 工具 {tool_name} 成功")
return result
except Exception as e:
logger.error(f"wrap_tool_call: 失败 (尝试 {attempt + 1}): {e}")
if attempt == max_retries:
return ToolMessage(
content=f"工具执行失败:{str(e)}。请稍后重试。",
tool_call_id=request.tool_call["id"]
)
time.sleep(2 ** attempt)
wrap_tool_call 的调用链解释:
wrap_tool_call 中间件 A
│
└─ handler → wrap_tool_call 中间件 B
│
└─ handler → 实际工具执行函数
handler(request) 与工具调用的关系:
代码位置 实际执行的内容
handler(request) 执行后续中间件(如果有),最终执行真正的工具函数
真正的工具函数 get_weather(location="北京") 等
执行流程说明:
request.tool_call包含工具名称、参数和调用 ID。- 首先进行参数校验:若工具为
get_weather且未提供location,直接返回错误ToolMessage,不再调用实际工具。 - 随后进入重试循环:调用
handler(request)执行实际工具(或下一个中间件),若抛出异常则重试,最多 2 次。 - 重试全部失败后,返回友好的错误
ToolMessage,而不是让 Agent 崩溃。
5.9 after_agent:Agent 执行结束后的清理
作用:在整个 Agent 主循环结束后执行一次,用于汇总指标、记录耗时、清理资源。
# 3.7 after_agent: Agent 执行结束
@after_agent
def after_agent_hook(state: AppState, runtime) -> Optional[Dict[str, Any]]:
"""Agent 执行结束后触发。现在 state 有完整类型,可以直接访问所有字段。"""
logger.info("=== after_agent: Agent 执行结束 ===")
start_time = state.get("start_time")
if start_time:
elapsed = time.time() - start_time
logger.info(f"Agent 总耗时: {elapsed:.2f} 秒")
logger.info(f"用户 {state.get('user_id')} 完成了 {state.get('call_count')} 次调用")
# 返回清理标记
return {"after_agent_cleanup": True}
执行流程说明:
- 从
state中读取start_time(由before_agent设置),计算总耗时。 - 打印用户 ID 和调用次数。
- 返回
after_agent_cleanup = True标记清理完成。
6. 创建 Agent(传入 state_schema 和 middleware)
# ========== 4. 创建 Agent(显式传入 state_schema) ==========
# 使用阿里云通义千问模型(根据你的环境配置)
from langchain_community.chat_models import ChatTongyi
import os
model = ChatTongyi(
model="qwen3-max",
api_key=os.getenv("QWEN_API_KEY"), # 从环境变量读取
)
agent = create_agent(
model=model,
tools=[get_weather],
state_schema=AppState, # 关键:告诉 Agent 使用显式状态结构
middleware=[
log_agent_start, # before_agent
custom_system_prompt, # dynamic_prompt
before_model_hook, # before_model
model_call_wrapper, # wrap_model_call
after_model_hook, # after_model
tool_call_wrapper, # wrap_tool_call
after_agent_hook # after_agent
]
)
说明:
state_schema=AppState让 Agent 知晓状态结构,IDE 可在钩子中提供类型提示。middleware列表的顺序至关重要:before_agent最先执行,随后dynamic_prompt,接着进入主循环(before_model→wrap_model_call→after_model→ 若需要工具则wrap_tool_call),最后after_agent。
7. 执行调用(测试缓存和错误恢复)
# ========== 5. 执行调用 ==========
async def main():
# 第一次调用:正常
print("n========== 第一次调用(正常) ==========")
result1 = agent.invoke(
{
"messages": [HumanMessage(content="北京天气如何?")],
# 可选:初始状态字段
"user_id": "user_123",
"call_count": 0
},
context={"user_id": "user_123"} # context 用于动态提示
)
print("最终回答:", result1["messages"][-1].content)
# 第二次调用:测试缓存(相同消息)
print("n========== 第二次调用(测试缓存) ==========")
result2 = agent.invoke(
{
"messages": [HumanMessage(content="北京天气如何?")],
"user_id": "user_123"
},
context={"user_id": "user_123"}
)
print("最终回答:", result2["messages"][-1].content)
# 第三次调用:测试工具错误恢复
print("n========== 第三次调用(工具错误恢复) ==========")
result3 = agent.invoke(
{
"messages": [HumanMessage(content="error 天气如何?")],
"user_id": "user_123"
},
context={"user_id": "user_123"}
)
print("最终回答:", result3["messages"][-1].content)
# 执行
asyncio.run(main())
执行效果预期:
- 第一次调用:正常调用模型,可能触发工具调用,
wrap_model_call会缓存结果。 - 第二次调用(相同问题):
wrap_model_call命中缓存,直接返回,不实际调用模型(日志会显示“缓存命中”)。 - 第三次调用(
location="error"):工具get_weather会抛出异常,wrap_tool_call会重试 2 次后返回错误消息,Agent 不会崩溃。
8. 总结:中间件的实现方式辨析
原始笔记最后有一个总结,澄清了中间件实现方式的疑惑。
写在最后
本文完整保留了原始 .py 笔记中 95% 以上的注释和讲解,并按照“流程图 → 状态定义 → 工具 → 每个钩子的作用+代码+解释 → 创建 Agent → 执行测试 → 总结”的顺序重新组织。希望通过这份详实的教学代码,你能彻底掌握 LangChain Agent 的钩子中间件体系。
关键 Takeaways:
- 六种钩子覆盖了 Agent 执行的完整生命周期:
before_agent→dynamic_prompt→ 主循环(before_model→wrap_model_call→after_model→wrap_tool_call) →after_agent - Node-style 钩子(
before_agent、before_model、after_model、after_agent)接收(state, runtime),返回状态更新字典 - Wrap-style 钩子(
wrap_model_call、wrap_tool_call)接收(request, handler),返回响应对象 dynamic_prompt是特殊的提示词生成器,参数为ModelRequest,返回字符串- 状态类型安全:使用
TypedDict或AgentState定义状态,配合state_schema传入 - 缓存和重试是
wrap_model_call的典型应用,能大幅提升性能和稳定性 - 工具错误恢复通过
wrap_tool_call实现,避免 Agent 因单个工具失败而崩溃
附录:个人完整代码笔记如下:
#(练习==========完整的钩子函数===========演示:完整教学代码:六种钩子中间件的综合应用
"""下面我们构建一个完整的 Agent,依次使用
before_agent、before_model、wrap_model_call、after_model、wrap_tool_call、after_agent 六种钩子,同时附带 dynamic_prompt 作为特殊形式。
每个钩子都实现了典型的生产级功能,并配有详细注释。
完整的敲完代码之后能够熟悉的掌握一个完整的Agent的钩子函数的运作过程!"""
from langchain_classic.chains.question_answering.map_reduce_prompt import messages
from torch.compiler.config import cache_key_tag
from AI大模型RAG与智能体开发_Agent项目.agent.tools.agent_tools import user_ids
"""用户输入
│
▼
┌─────────────────────────────────────────────────────────────────────┐
│[before_agent] │
│作用:全局初始化、权限校验、注入初始状态 │
│可修改:state │
└─────────────────────────────────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────────────┐
│[动态提示词](dynamic_prompt) │
│作用:根据 state/context 生成 system prompt │
│注:实际是 wrap_model_call 的一种特例,但语义上独立 │
└─────────────────────────────────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────────────┐
│┌─────────────────────────────────────────────────────────────┐ │
││Agent 主循环(直到停止条件满足) │ │
││ │ │
││┌─────────────────────────────────────────────────────┐ │ │
│││[before_model] │ │ │
│││作用:动态修改 messages、请求限速、注入上下文 │ │ │
│││可修改:state, ModelRequest │ │ │
││└─────────────────────────────────────────────────────┘ │ │
│││ │ │
││▼ │ │
││┌─────────────────────────────────────────────────────┐ │ │
│││【wrap_model_call】 │ │ │
│││作用:模型替换、缓存、重试、超时控制、错误处理 │ │ │
│││可拦截:可跳过 handler,直接返回响应 │ │ │
│││ │ │ │
│││ handler(request) ──┐ │ │ │
│││ │ │ │ │
│││ ▼ │ │ │
│││ 实际模型调用 │ │ │
│││ (OpenAI/Anthropic) │ │ │
│││ │ │ │ │
│││ ▼ │ │ │
│││┌──────────────────┴─────────────────────────┐ │ │ │
││││ 返回 ModelResponse │ │ │ │
│││└────────────────────────────────────────────┘ │ │ │
││└─────────────────────────────────────────────────────┘ │ │
│││ │ │
││▼ │ │
││┌─────────────────────────────────────────────────────┐ │ │
│││[after_model] │ │ │
│││作用:响应验证、敏感词过滤、结果转换、条件路由 │ │ │
│││可修改:ModelResponse │ │ │
││└─────────────────────────────────────────────────────┘ │ │
│││ │ │
││▼ │ │
││ 响应中是否包含工具调用? │ │
│││ │ │
││ ┌─────────┘└─────────┐ │ │
││ 是 否 │ │
││ ▼ ▼ │ │
││┌─────────────────────┐ ┌──────────────┐ │ │
│││ 【wrap_tool_call】 │ │ 结束循环 │ │ │
│││ 包裹每个工具调用 │ │ 退出到 │ │ │
│││ │ after_agent │ │ │
│││ handler(request) ──┐ └──────────────┘ │ │
│││ ││ │ │
│││ ▼│ │ │
│││ 实际工具执行 │ │
│││ (API/DB/计算) │ │
│││ ││ │ │
│││ ▼│ │ │
│││ 返回 ToolMessage │ │
│││ │ │ │
│││作用:参数校验、 │ │ │
│││错误恢复、重试、 │ │ │
│││执行监控、审计 │ │ │
││└─────────────────────┘│ │ │
││ ┌────────────────┘ │ │
││ ▼ │ │
││将 ToolMessage 加入 state["messages"] │ │
││继续下一轮循环(回到 before_model) │ │
│└─────────────────────────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────────────┐
│[after_agent] │
│作用:最终结果处理、数据落库、清理资源、汇总指标 │
│可修改:最终输出 state │
└─────────────────────────────────────────────────────────────────────┘
│
▼
最终输出(最终回答)
关键流转说明:
before_agent 只执行一次,在最开始。
dynamic_prompt 本质上是 wrap_model_call 的封装,在每次模型调用前动态生成系统提示。
模型调用循环(before_model → wrap_model_call → 实际模型 → after_model)可能执行多次,直到没有工具调用或达到停止条件。
工具调用(wrap_tool_call)每次执行一个工具,可能多次执行(并行或串行)。
after_agent 只执行一次,在所有循环结束后。
LangChain 的设计一致性:所有 Node-style 钩子(before_agent, before_model, after_model, after_agent)都接收 state 和 runtime 两个参数"""
"""完整教学代码:六种钩子中间件的综合应用(显式类版本)
使用 TypedDict 定义状态,实现类型安全的中间件开发"""
import asyncio
import time
import logging
from typing import Any, Dict, Optional, Annotated, TypedDict, List
from langchain_core.messages import BaseMessage, ToolMessage, HumanMessage, AIMessage
from langchain.agents import create_agent, AgentState
from langchain.agents.middleware import (
before_agent, before_model, after_model, after_agent,
wrap_model_call, wrap_tool_call, dynamic_prompt,
ModelRequest, ModelResponse
)
from langchain.tools import tool
from dotenv import load_dotenv
from langchain.agents.middleware import ModelResponse
# 加载 .env 文件中的环境变量
load_dotenv()
# 配置日志
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
# ========== 1. 显式定义状态类(类型安全) ==========
from langgraph.graph import add_messages
#class AppState(TypedDict):......
#你可能会发现有时候状态继承的是TypeDict有时候却又是AgentState,这两哥们什么关系呢?什么时候用谁?其实还有MessageState
"""AgentState 是 TypedDict 的"孙子类",专门为 LangChain Agent 定制的增强版。
1️⃣ TypedDict - Python 原生版本
from typing import TypedDict
class MyState(TypedDict):
name: str
age: int
hobbies: list[str]
# 使用
person: MyState = {"name": "张三", "age": 25, "hobbies": ["编程"]}
特点:
✅ Python 原生支持
✅ IDE 有类型提示
❌ 没有特殊功能,就是普通字典
❌ messages字段没一簇更新都会覆盖!
2️⃣ AgentState - LangChain 定制版
from langchain.agents import AgentState
from typing import Annotated
from langgraph.graph import add_messages
class UserMemory(AgentState): # ← 继承 AgentState
messages: Annotated[list, add_messages] # ← 自动累积消息
user_preferences: dict
interaction_count: int
特点:
✅ 继承了 TypedDict 的所有能力
✅ 自动管理 messages 字段(不会覆盖,会追加)
✅ LangChain/LangGraph 能识别它
✅ 支持 reducer(状态合并策略)
特别说明:覆盖问题:
# ❌ 不要这样
class MyState(TypedDict): # 直接用 TypedDict
messages: list # 没有 add_messages 注解,会被覆盖!
# ✅ 应该这样
class MyState(AgentState): # 继承 AgentState
messages: Annotated[list, add_messages] # 自动追加
辨析:
TypedDict:? 空白表格模板告诉你有哪些字段要填,但没有任何特殊功能
MessagesState:? 带"历史记录"栏的表格在空白表格基础上,预设了一个会自动追加内容的"历史记录"栏
AgentState:? 智能助手专用表格和 MessagesState 一样,但名字更贴切,专门给 Agent 用的
=======使用场景======:
# ✅ 场景 1:定义 Agent 的状态 → 用 AgentState
from langchain.agents import AgentState
class MyAppState(AgentState):
messages: Annotated[list, add_messages]
custom_field: str # 你的自定义字段
agent = create_agent(model=model, state_schema=MyAppState) # ← 传这个
# ✅ 场景 2:中间件扩展状态 → 可以用 TypedDict
from typing import TypedDict
class UserMemory(TypedDict, total=False):
user_id: str
preferences: dict
class MemoryMiddleware(AgentMiddleware):
state_schema = UserMemory # ← 这里用 TypedDict 没问题
# ✅ 场景 3:简单临时状态 → TypedDict 足够
def some_hook(state: TypedDict, runtime):
# 只是读取状态,不扩展
pass
总结:
核心差异:
TypedDict = 普通字典 + 类型检查
AgentState = TypedDict + 自动管理 messages + LangGraph 优化
使用建议:
做 Agent项目 → 无脑用 AgentState
其他场景 → 用 TypedDict 就够"""
class AppState(TypedDict):
"""Agent 的完整状态结构,使用 TypedDict 实现类型安全
Python中的TypedDict:让字典更加的准确和安全
Typedict定义必需的键
指定键的类型
在静态检查时发现问题
保持运行时灵活性"""
# messages 使用 add_messages reducer,自动合并而非覆盖
messages: Annotated[List[BaseMessage], "add_messages"]
"""先说 Annotated(adj:带注解的,注释的) 是什么?
Annotated 是 Python 的类型注解增强工具,它允许你给一个类型添加额外的元数据(metadata)。
简单比喻:
普通类型注解:messages: List[BaseMessage] → "这是一个消息列表"
Annotated 注解:messages: Annotated[List[BaseMessage], "add_messages"] → "这是一个消息列表,并且要用 add_messages 规则来合并它"
Annotated[基础类型,额外信息 1, 额外信息 2, ...]
"add_messages":Merges two lists of messages, updating existing messages by ID.
为什么需要 "add_messages"(是LangChain自带的硬编码规则)?
这是 LangGraph/LangChain 的特殊机制!
问题场景:假设有 3 个中间件都要修改 messages:
# 中间件 1
return {"messages": [新消息 1]}
# 中间件 2
return {"messages": [新消息 2]}
# 中间件 3
return {"messages": [新消息 3]}
如果没有 add_messages:后面的返回值会覆盖前面的;最后只剩 [新消息 3],丢失了前两条消息 ❌
有了 add_messages:LangChain 会自动合并所有消息
最终结果:[新消息 1, 新消息 2, 新消息 3] ✅
"""
# 自定义业务字段
user_id: str
start_time: float
call_count: int
before_model_triggered: bool
after_agent_cleanup: bool
# 可选字段(用 NotRequired 标记,Python 3.11+ 可用,这里用 Optional)
intermediate_results: Optional[Dict[str, Any]]
