LangGraph知识库调用、记忆持久化与Agent可视化评测

2026-06-23阅读 0热度 0
其他

前记:上篇文章结尾提到还有 LangGraph 调用知识库、记忆持久化、智能体链路可视化等还没介绍,本文就接着介绍这些内容。

01

LangGraph 知识库查询 和 Active RAG (主动式 RAG)

1、LangGraph 知识库(向量库)查询

LangGraph 本身并不具备查询能力,它只是通过 Python 函数去调用 LangChain 的组件(Retriever)或数据库的原生 SDK。在图中,查询向量库通常被封装在一个标准的 Python 函数(Node)中,流程很清晰:

  • 入参:从 LangGraph 的 State 中拿到用户的问题;
  • 动作:在函数内部调用向量库接口——这是最关键的一步;
  • 出参:把查到的文档(Documents)塞回 State,供后续节点使用。

常见的实现方式有两种:直接调用 Retriever(适合自定义工作流)、作为 Tool 供 Agent 调用(适合 ReAct 架构)。先看看第一种,直接调用 Retriever 的代码:

# 1. 准备工作:在图构建之外,先定义好 Retriever
# 这一步纯粹是 LangChain 的写法,跟 LangGraph 无关
from langchain_community.vectorstores import Chroma
from langchain_openai import OpenAIEmbeddings
# 假设这里是你连接向量库的代码
vectorstore = Chroma(persist_directory="./db", embedding_function=OpenAIEmbeddings())
# 将向量库转换为检索器对象
retriever = vectorstore.as_retriever(search_kwargs={"k": 3})

# ==========================================
# 2. 核心:在 LangGraph 节点中调用它
# ==========================================
def retrieve_node(state):
    """这个函数就是 LangGraph 中的一个 Node。"""
    print("--- 正在查询向量库 ---")
    # [第一步] 从 State 获取问题
    question = state["question"]
    # [第二步] 执行查询动作
    # 这里直接调用 LangChain 的 invoke 接口
    # 它会自动完成:问题 Embedding -> 向量搜索 -> 返回 Document 列表
    documents = retriever.invoke(question)
    # [第三步] 更新 State
    # 返回的字典会自动合并到全局 State 中
    return {"context": documents}

接下来我们用 LangChain 的 InMemoryVectorStore(内存向量库)和 FakeEmbeddings(模拟 Embeddings)来跑一个完整示例,不需要 LLM API Key 或外部向量库,模拟最简化的流程:

  1. 构建知识库:初始化内存向量库,存入几条关于“公司政策”的 Mock 数据;
  2. 定义图:使用 LangGraph 构建一个包含 retrieve 节点的图;
  3. 运行:输入问题,查看图是如何把向量库的数据查出来并更新到状态(State)里的。

安装基础依赖 pip install langgraph langchain langchain-community,然后看代码:

import operator
from typing import Annotated, List, TypedDict
# LangChain 组件
from langchain_core.documents import Document
from langchain_core.vectorstores import InMemoryVectorStore
from langchain_core.embeddings import FakeEmbeddings
from langchain_core.runnables import RunnableLambda
# LangGraph 组件
from langgraph.graph import StateGraph, END

# --- 1. 准备 Mock 向量库数据 ---
print("--- [1] 初始化向量库 (Mock) ---")
embeddings = FakeEmbeddings(size=768)
vector_store = InMemoryVectorStore(embeddings)
mock_docs = [
    Document(page_content="差旅费报销政策:所有超过 500 元的支出都需要发片。", metadata={"source": "policy_v1"}),
    Document(page_content="年假政策:员工入职满一年后,每年享有 10 天带薪年假。", metadata={"source": "hr_handbook"}),
    Document(page_content="远程办公政策:每周三允许员工在家办公。", metadata={"source": "remote_work"}),
]
vector_store.add_documents(mock_docs)
print(f"成功存入 {len(mock_docs)} 条文档到内存向量库。")
retriever = vector_store.as_retriever(search_kwargs={"k": 1})

# --- 2. 定义 LangGraph 的状态 (State) ---
class AgentState(TypedDict):
    question: str
    retrieved_docs: List[str]

# --- 3. 定义节点 (Node) ---
def retrieve_node(state: AgentState):
    question = state["question"]
    print(f"--- [Node: Retrieve] 正在检索关于 '{question}' 的内容 ---")
    documents = retriever.invoke(question)
    for i, doc in enumerate(documents):
        print(f"> 找到文档 {i+1}: {doc.page_content}")
    return {"retrieved_docs": [doc.page_content for doc in documents]}

# --- 4. 构建图 (Graph) ---
workflow = StateGraph(AgentState)
workflow.add_node("retrieve_step", retrieve_node)
workflow.set_entry_point("retrieve_step")
workflow.add_edge("retrieve_step", END)
app = workflow.compile()

# --- 5. 运行测试 ---
if __name__ == "__main__":
    print("--- [5] 开始运行 LangGraph ---")
    inputs = {"question": "差旅费报销"}
    result = app.invoke(inputs)
    print("--- [6] 最终状态 (Final State) ---")
    print(f"用户问题: {result['question']}")
    print(f"检索结果: {result['retrieved_docs']}")

代码运行结果如下:

2、LangGraph 的 RAG = Active RAG(主动式 RAG)

如果说 MCP 解决了“手脚”怎么动的问题,那么 LangGraph 的 RAG 就是解决了“大脑”怎么查阅资料并自我修正的问题。在传统的 Chain(链式)结构中,RAG 往往是“一锤子买卖”:检索 -> 生成。如果检索错了,回答也就错了。

但在 LangGraph 中,RAG 变成了一个“不知疲倦的研究员”:它会先检索,读完觉得不对,换个关键词再检索,直到找到满意的资料为止。这种模式被称为 Active RAG(主动式 RAG)或 Self-RAG。

这里先简单了解概念,后续有机会再深入研究。

02

LangGraph 记忆持久化

之前的文章介绍过,LangGraph 的记忆分为上下文管理和长期持久化记忆。上下文是通过 State 来管理的,State 就是上下文的窗口,所有节点函数中定义的 state 参数,就是当前的全部上下文。

基于 LangGraph 官方文档(Persistence 部分),可以将 LangGraph 的记忆与持久化机制清晰地划分为两个核心概念:Checkpoints(检查点/短时记忆)Store(存储/长时记忆)

1、Checkpoints(检查点):会话级持久化

Checkpoint 是对 Graph State(图状态)的快照。它的核心作用是“记录当下,以便回溯”。它与特定的 thread_id(线程/会话 ID)强绑定。主要功能包括:

  • 容错与恢复 (Fault Tolerance):如果程序崩溃,只要有 thread_id,就可以从上一个步骤中断的地方继续运行,而不用从头开始。
  • 时光倒流 (Time Tra vel):既然保存了每一步的快照,你可以随时查看(Get)甚至回滚到之前的某个步骤。
  • 人机交互 (Human-in-the-loop):允许程序暂停(等待人类审批),人类修改状态后,程序基于修改后的状态继续运行。

2、Store(存储):跨会话/长久记忆

Store 是一个层级化的 Key-Value 数据库。它的核心作用是“跨越时空共享数据”。它不依赖于当前的对话上下文,数据可以跨 thread_id 存在。主要功能包括:

  • 跨会话记忆 (Cross-Thread Memory):用户昨天(Thread A)说喜欢红色,今天(Thread B)开启新对话时,Agent 依然能通过查询 Store 知道他喜欢红色。
  • 共享知识:可以存储所有 Agent 共享的全局规则或知识。
  • 语义搜索:高级的 Store 实现(如 PostgresStore)支持通过向量(Embeddings)进行语义检索,即使记不清 Key 也能查到相关记忆。

3、总结对比

4、LangGraph 中两种持久化方式代码实战

安装依赖包 pip install langgraph langchain-core,然后看代码:

import uuid
from typing import Annotated, Literal, TypedDict, Union, Dict, Any
from datetime import datetime
from langchain_core.messages import SystemMessage, HumanMessage, AIMessage, ToolMessage, BaseMessage
from langchain_core.runnables import RunnableConfig
from langgraph.graph import StateGraph, END
from langgraph.graph.message import add_messages
from langgraph.checkpoint.memory import MemorySa ver
from langgraph.store.memory import InMemoryStore

# ==========================================
# 1. 生产级数据结构定义 (Schema)
# ==========================================
class AgentState(TypedDict):
    messages: Annotated[list[BaseMessage], add_messages]

class UserProfile(TypedDict):
    name: str | None
    preference: str | None
    last_updated: str

# ==========================================
# 2. 模拟 LLM(无需 API Key)
# ==========================================
class MockLLM:
    def invoke(self, messages: list, user_profile: dict):
        last_msg = messages[-1].content
        if "我叫" in last_msg:
            name = last_msg.split("我叫")[-1].strip()
            print(f"[MockLLM] 检测到用户信息,决定调用工具保存: {name}")
            return AIMessage(content="", 
                tool_calls=[{"id": "call_"+str(uuid.uuid4())[:8],
                             "name": "update_profile",
                             "args": {"name": name}}])
        elif "我是谁" in last_msg or "记得" in last_msg:
            name = user_profile.get("name", "未知用户")
            return AIMessage(content=f"你是 {name}啊!这是我从长期记忆里读出来的。")
        elif "刚才" in last_msg and ("说" in last_msg or "什么" in last_msg):
            user_messages = [msg for msg in messages[:-1] if isinstance(msg, HumanMessage)]
            if user_messages:
                previous_msg = user_messages[-1].content
                print(f"[MockLLM] 从 Checkpointer 恢复的对话历史中找到:{previous_msg}")
                return AIMessage(content=f"你刚才说:'{previous_msg}'。这是我从 Checkpointer 保存的对话历史中读取的!")
            else:
                return AIMessage(content="我没有找到你之前说的话。")
        else:
            return AIMessage(content=f"收到了:{last_msg}")

model = MockLLM()

# ==========================================
# 3. 定义节点 (Nodes)
# ==========================================
long_term_store = None

def agent_node(state: AgentState, config: RunnableConfig):
    user_id = config["configurable"]["user_id"]
    stored_data = long_term_store.get(("users", user_id), "profile")
    user_profile = stored_data.value if stored_data else {"name": None}
    print(f"--- [Node: Agent] 当前加载的用户画像: {user_profile} ---")
    response = model.invoke(state["messages"], user_profile)
    return {"messages": [response]}

def update_profile_tool(state: AgentState, config: RunnableConfig):
    user_id = config["configurable"]["user_id"]
    last_message = state["messages"][-1]
    tool_call = last_message.tool_calls[0]
    new_name = tool_call["args"]["name"]
    print(f"--- [Node: Tool] 正在将 '{new_name}' 写入长期存储 Store ---")
    new_profile: UserProfile = {
        "name": new_name,
        "preference": "default",
        "last_updated": datetime.now().isoformat()
    }
    long_term_store.put(("users", user_id), "profile", new_profile)
    return {"messages": [ToolMessage(tool_call_id=tool_call["id"],
                                     content=f"已更新用户资料: Name={new_name}")]}

def should_continue(state: AgentState) -> Literal["update_profile", "__end__"]:
    messages = state["messages"]
    last_message = messages[-1]
    if last_message.tool_calls:
        return "update_profile"
    return "__end__"

# ==========================================
# 4. 构建图
# ==========================================
workflow = StateGraph(AgentState)
workflow.add_node("agent", agent_node)
workflow.add_node("update_profile", update_profile_tool)
workflow.set_entry_point("agent")
workflow.add_conditional_edges("agent", should_continue)
workflow.add_edge("update_profile", "agent")

checkpointer = MemorySa ver()
long_term_store = InMemoryStore()
app = workflow.compile(checkpointer=checkpointer, store=long_term_store)

# ==========================================
# 5. 运行模拟
# ==========================================
if __name__ == "__main__":
    GLOBAL_USER_ID = "user_999"
    # 演示1: Checkpointer 的作用
    config_session_a = {"configurable": {"thread_id": "thread_A", "user_id": GLOBAL_USER_ID}}
    input_msg = {"messages": [HumanMessage(content="你好,我叫 Gemini")]}
    for event in app.stream(input_msg, config=config_session_a):
        pass
    final_state = app.get_state(config_session_a)
    print(f"AI 最后回复: {final_state.values['messages'][-1].content}")
    # 演示2: 同一 thread_id 继续对话
    input_msg_continue = {"messages": [HumanMessage(content="我刚才说了什么?")]}
    for event in app.stream(input_msg_continue, config=config_session_a):
        pass
    final_state_continue = app.get_state(config_session_a)
    print(f"AI 最后回复: {final_state_continue.values['messages'][-1].content}")
    # 演示3: 不同 thread_id 隔离
    config_session_b = {"configurable": {"thread_id": "thread_B", "user_id": GLOBAL_USER_ID}}
    input_msg_2 = {"messages": [HumanMessage(content="你还记得我是谁吗?")]}
    for event in app.stream(input_msg_2, config=config_session_b):
        pass
    final_state_b = app.get_state(config_session_b)
    print(f"AI 最后回复: {final_state_b.values['messages'][-1].content}")
    # 验证 Store 共享
    print(f"Store 中存储的数据: {long_term_store.get(('users', GLOBAL_USER_ID), 'profile').value}")

运行结果如下:

5、题外话:对比 DeepSeek 和 Gemini 的个人记忆

这里聊点有意思的——其实和 LangGraph 不太直接相关,纯粹是好奇现在主流大模型关于记忆是怎么处理的。同时问了 DeepSeek 和 Gemini 相同的问题:“我在xxx的记忆有哪些?”

从实际使用来看,DeepSeek 目前应该没有开启记忆功能,而 Gemini 是有的。截图如下:

03

LangGraph Agent 可视化 —— LangSmith

1、LangSmith 介绍

基于 LangChain 官方文档(LangSmith 部分),我们可以将 LangSmith 定义为:一个用于 LLM 应用全生命周期的 DevOps 平台。如果说 LangChain/LangGraph 是用来“造车”的(构建应用),那么 LangSmith 就是“仪表盘 + 维修车间 + 试车场”(监控、调试、评估)。

在智能体开发中,我们经常面临“黑盒”问题:为什么这个 Agent 在这一步卡住了?为什么这次回答的质量突然变差了?这个复杂的 LangGraph 图到底走了哪条分支?LangSmith 就是为了解决这些可观测性(Observability)和评估(Evaluation)难题而生的。主要功能有三个:

  • Tracing(全链路追踪/调试)
  • Evaluation(评估/测试)
  • Monitoring(生产监控)

目前大部分核心功能可以免费试用,有使用量(额度)和数据保留时间的限制,但用于学习、开发调试和个人项目来说,应该是够用的。

2、LangSmith 使用介绍

1)配置方法

2)启动应用

3)登录 LangSmith 页面(https://smith.langchain.com/),选择绑定的项目名,查看页面提供的相关功能。页面具体功能这里就不逐一分析了,提供几个截图供参考:

登录初始页面

历史上跑过的一些自主规划 Agent 数据

打开指定链路,查看每个 LangGraph 节点的耗时等监控数据

查看其他监控相关数据

04

结语

关于 LangGraph 系列的学习总结文章就介绍得差不多了。最终还是要能落地到生产环境,能解决问题、有业务价值的才是有意义的,不然就是空谈。未来如果在生产环境真正落地后,再来做一些相关的分享。

免责声明

本网站新闻资讯均来自公开渠道,力求准确但不保证绝对无误,内容观点仅代表作者本人,与本站无关。若涉及侵权,请联系我们处理。本站保留对声明的修改权,最终解释权归本站所有。

相关阅读

更多
欢迎回来 登录或注册后,可保存提示词和历史记录
登录后可同步收藏、历史记录和常用模板
注册即表示同意服务条款与隐私政策