年最新系统架构设计深度对比:LangGraph状态机与多源异构RAG全面解析

2026-06-20阅读 0热度 0
架构设计

RAG 系统架构设计:基于 LangGraph 的多源异构 RAG 系统

在企业级知识管理的实际场景中,知识来源从来都不是单一、规整的。本地散落着大量 PDF 文档,语雀上维护着协作更新的在线文档,Gitee 代码仓库里躺着不断迭代的工程代码——这些知识源格式各异、存储分散,甚至连语义空间都不一致。真正要把它们统一管理起来,并提供精准的问答能力,恰恰是 RAG 系统面对的最棘手的挑战之一。

01-系统架构设计-LangGraph状态机与多源异构RAG

这套系统名为 O-RAG,专门为多源异构知识场景设计。它支持本地文档、语雀文档、代码仓库这三类典型知识源的统一导入和联合检索。下面,我们就从系统架构、工作流编排、状态管理等几个核心维度,拆解一下它的设计思路和实现细节。

一、技术栈选型

组件类型 技术选型 选型理由
Web 框架 FastAPI + Uvicorn 高性能异步框架,原生支持 SSE 流式输出
RAG 编排 LangGraph 基于状态图的 DAG 工作流,支持条件路由和并行分支
向量数据库 Qdrant 原生支持稠密+稀疏双向量混合检索,RRF 融合
图数据库 Neo4j 构建跨知识源的语义关系图谱
对象存储 MinIO 存储原始文件和图片资源
历史记录 MySQL 持久化对话历史
Embedding 模型 BGE-M3 原生支持稠密+稀疏向量,L2 归一化
Rerank 模型 BGE-Reranker 精排模型,支持动态 TopK
前端 UI Element Plus Vue 组件库,三标签页设计

二、系统整体架构

O-RAG 在架构上做了一个很关键的分手——将导入和查询拆成两个独立服务。

┌─────────────────────────────────────────────────────────────────┐ │用户界面(Element Plus) │ │ 三标签页:本地文档 | 语雀文档 | 代码仓库 │ └─────────────────────────────────────────────────────────────────┘ ↓ ┌─────────────────────────────────────────────────────────────────┐ │ query_process 服务(端口 8001) │ │ ┌──────────────────────────────────────────────────────────┐ │ │ │ 查询工作流(LangGraph) │ │ │ │ item_name_confirm → search_embedding ─┐ │ │ │ │ → search_hyde ├→ RRF → Rerank │ │ │ │ → web_search │ │ │ │ │ → query_kg ──────────┘ │ │ │ │ → Answer │ │ │ └──────────────────────────────────────────────────────────┘ │ └─────────────────────────────────────────────────────────────────┘ ↓ ┌─────────────────────────────────────────────────────────────────┐ │ import_process 服务(端口 8000) │ │ ┌──────────────────────────────────────────────────────────┐ │ │ │ 导入工作流(LangGraph) │ │ │ │ entry → pdf_to_md → md_img → document_split │ │ │ │ → item_name_recognition → bge_embedding │ │ │ │ → import_qdrant → import_kg │ │ │ └──────────────────────────────────────────────────────────┘ │ └─────────────────────────────────────────────────────────────────┘ ↓ ┌─────────────────────────────────────────────────────────────────┐ │ 存储层 │ │ ┌────────┐ ┌────────┐ ┌────────┐ ┌────────┐ ┌────────┐ │ │ │ Qdrant │ │ Neo4j │ │ MinIO │ │ MySQL │ │ 本地 │ │ │ │ 向量库 │ │ 图谱库 │ │ 对象库 │ │ 历史库 │ │ 文件 │ │ │ └────────┘ └────────┘ └────────┘ └────────┘ └────────┘ │ └─────────────────────────────────────────────────────────────────┘

这套设计背后有几个核心理念值得留意:

  1. 导入与查询分离import_process 专心做知识导入流水线,query_process 负责检索和生成。职责拆清楚了,各自才能独立演进和扩展。
  2. 统一前端入口:所有 UI 页面都在 query_process 这一侧管理,避免了前端碎片化的问题。
  3. 三模数据源隔离:在 Qdrant 里,不同数据源使用独立的集合(kb_docskb_yuquekb_code),语义空间各自独立,生命周期也可以单独管理。

三、LangGraph 状态机编排

LangGraph 是 LangChain 生态里用来编排工作流的框架,核心思路是用状态图(StateGraph)来控制 DAG 流程。O-RAG 用它来编排导入和查询这两条核心流水线。

3.1 状态定义:TypedDict 类型安全

每个工作流都有一个 TypedDict 状态类,定义了流程中流转的所有数据字段:

class QueryGraphState(TypedDict): """查询流程状态""" session_id: str # 会话唯一标识 original_query: str # 用户原始问题 # 检索类型控制(三模支持) search_type: str # auto/doc/code/all/cross_kb source_types: List[str] # 指定数据源类型列表 # 检索过程中的中间数据 embedding_chunks: list # 普通向量检索结果 hyde_embedding_chunks: list # HyDE 检索结果 web_search_docs: list # 网络搜索结果 kg_docs: list # 知识图谱查询结果 cross_kb_docs: list # 跨库链路检索结果 # 排序与生成 rrf_chunks: list # RRF 融合排序后 reranked_docs: list # Rerank 精排后 answer: str # 最终答案

这种设计的优势很明显:类型安全(TypedDict 提供代码补全和类型检查)、状态共享(所有节点通过 state 字典读写数据,无需全局变量)、以及深拷贝隔离(create_default_state()deepcopy 避免状态污染)。

3.2 导入工作流:从 PDF 到向量库

导入工作流处理的是知识入库的完整链路:

entry → pdf_to_md → md_img → document_split → item_name_recognition → bge_embedding → import_qdrant → import_kg → END

根据文件类型,系统会走不同的路径:

def route_after_entry(state: ImportGraphState) -> str: """根据文件类型路由""" if state["is_pdf_read_enabled"]: return "node_pdf_to_md" # PDF → 转 Markdown elif state["is_md_read_enabled"]: return "node_md_img" # MD → 直接处理图片 else: return END # 无效文件 → 结束

每个核心节点的职责和技术实现如下:

节点 职责 技术实现
node_pdf_to_md PDF 转 Markdown MinerU 模型
node_md_img 提取/下载 MD 中的图片 MinIO 存储
node_document_split 文档切分 标题切割 + 长度控制
node_item_name_recognition 提取商品名 LLM 识别
node_bge_embedding 生成向量 BGE-M3 模型
node_import_qdrant 向量入库 Qdrant 集合
node_import_kg 知识图谱导入 Neo4j 节点/关系

3.3 查询工作流:多路召回与融合

查询工作流是 O-RAG 的核心,实现了多路并行召回 + 融合排序 + 精排生成的完整链路:

item_name_confirm ─┬→ search_embedding ────┐ ├→ search_embedding_hyde ├→ RRF → Rerank → Answer ├→ web_search_ta vily │ ├→ query_kg ──────────────┘ └→ cross_kb_search ──────┘ (条件路由)

条件路由根据 search_type 分流:

def route_after_confirm(state: QueryGraphState) -> str: """根据检索类型路由""" search_type = state.get("search_type", "all") if search_type == "cross_kb": return "cross_kb" # 跨库链路检索 return "normal" # 普通多路检索

在普通检索路径下,4 个检索节点会并行执行:

# 普通检索路径:多路并行召回 builder.add_edge("node_search_embedding", "node_rrf") builder.add_edge("node_search_embedding_hyde", "node_rrf") builder.add_edge("node_web_search_ta vily", "node_rrf") builder.add_edge("node_query_kg", "node_rrf")

四、三模数据源管理

O-RAG 支持的这三类知识源,在 Qdrant 里有各自的独立集合:

class SourceType(str, Enum): LOCAL_DOC = "local_doc" # 本地上传文档 YUQUE_DOC = "yuque_doc" # 语雀在线文档 CODE_REPO = "code_repo" # 代码仓库 # 数据源类型到 Qdrant 集合的映射 SOURCE_TYPE_COLLECTION_MAP = { SourceType.LOCAL_DOC: "kb_docs", SourceType.YUQUE_DOC: "kb_yuque", SourceType.CODE_REPO: "kb_code", }

分集合设计的优势:

  1. 语义空间隔离:不同数据源的向量空间独立,避免语义混淆。
  2. 生命周期独立:可以单独清理某个数据源的数据,不影响其他。
  3. 灵活检索:支持单集合、多集合、跨库链路三种检索模式。

4.1 语雀文档导入

语雀文档通过 API 客户端拉取,经过清洗后入库:

# app/import_process/processors/yuque_cleaner.py def clean_yuque_content(content: str) -> str: """语雀内容清洗""" # 去除 HTML 注释 content = re.sub(r'', '', content, flags=re.DOTALL) # 去除 CDN 参数 content = re.sub(r'?x-oss-process=.*?$', '', content, flags=re.MULTILINE) # 去除锚点标记 content = re.sub(r'', '', content) return content

4.2 代码仓库导入

代码仓库通过 Gitee API 抓取,经过 AST 解析后构建代码图谱:

# app/import_process/processors/ast_parser.py class ASTParser: """多语言 AST 解析器""" def parse(self, code: str, language: str, file_path: str) -> List[CodeSymbol]: if language == "python": return self._parse_python(code, file_path) # 使用 ast 模块 elif language in ("ja vascript", "typescript"): return self._parse_js(code, language, file_path) # 正则解析 elif language == "ja va": return self._parse_ja va(code, file_path) elif language == "go": return self._parse_go(code, file_path)

代码图谱的节点类型包括 CodeFile(文件)、CodeClass(类)、CodeFunction(函数/方法)。关系类型则涵盖了 CONTAINS(包含)、INHERITS(继承)、CALLS(调用)、IMPORTS(导入)。

五、文档切分策略

文档切分是 RAG 系统绕不开的关键环节,直接影响检索质量。O-RAG 采用两阶段切分策略。

5.1 第一阶段:语义粗切(按标题)

def step_2_split_by_title(md_content, file_title): """根据 Markdown 标题进行语义切割""" title_pattern = r'^\s*#{1,6}\s+.+' lines = md_content.split('\n') is_code_block = False for line in lines: strip_line = line.strip() # 判断代码块状态(避免误识别代码注释为标题) if strip_line.startswith('```') or strip_line.startswith('~~~'): is_code_block = not is_code_block # 判断是否为标题 is_title = (not is_code_block) and re.match(title_pattern, strip_line) if is_title: # 保存当前标题的内容,开始新标题 if current_title: sections.append({"title": current_title, "content": ...}) current_title = strip_line current_lines = [current_title]

这里有个特别的设计亮点:代码块感知。通过状态标记,避免将代码注释误识别为标题,保证了切分的准确性。按标题切割也能让每个 Chunk 保持语义完整。

5.2 第二阶段:精细控制(长度+重叠)

def step_3_refine_chunks(sections, max_length, min_length): """精细控制 Chunk 大小""" # 1. 超长段落二次切割 for section in sections: sub_section = split_long_section(section, max_length) final_sections.extend(sub_section) # 2. 过短段落合并(同一 parent_title) final_sections = merge_short_sections(final_sections, min_length)

参数配置:

  • DEFAULT_MAX_CONTENT_LENGTH = 2000:单个 Chunk 最大字符数
  • MIN_CONTENT_LENGTH = 500:短 Chunk 合并阈值
  • chunk_overlap = 100:二次切割的重叠长度

六、BGE-M3 向量化

O-RAG 采用 BGE-M3 模型生成稠密+稀疏混合向量,这是实现混合检索的基础:

# app/lm/embedding_utils.py def generate_embeddings(texts): """生成稠密+稀疏混合向量""" model = get_bge_m3_ef() # 单例模式 embeddings = model.encode_documents(texts) # 处理稀疏向量(CSR 格式 → 字典格式) processed_sparse = [] for i in range(len(texts)): sparse_indices = embeddings["sparse"].indices[...].tolist() sparse_data = embeddings["sparse"].data[...].tolist() sparse_dict = {k: v for k, v in zip(sparse_indices, sparse_data)} processed_sparse.append(sparse_dict) return { "dense": [emb.tolist() for emb in embeddings["dense"]], "sparse": processed_sparse }

技术亮点:

  1. 原生 L2 归一化:开启 normalize_embeddings=True,单位化后内积等价于余弦相似度。
  2. NumPy 类型转换np.int64 → intnp.float32 → float,解决了序列化问题。
  3. 单例模式:模型只加载一次,避免重复初始化。

七、Qdrant 混合检索

Qdrant 原生支持稠密+稀疏双向量混合检索,通过 RRF 融合两路结果:

# app/clients/qdrant_utils.py def hybrid_search(client, collection_name, dense_vector, sparse_vector, ...): """稠密+稀疏向量混合搜索""" # 构建预检索参数 dense_prefetch = Prefetch( query=dense_vector, using="dense", limit=max(limit * 4, 20) # 预召回 4 倍 ) sparse_prefetch = Prefetch( query=qdrant_sparse, using="sparse", limit=max(limit * 4, 20) ) # 执行混合搜索,使用 RRF 融合 results = client.query_points( collection_name=collection_name, prefetch=[dense_prefetch, sparse_prefetch], query=FusionQuery(fusion=Fusion.RRF), limit=limit, query_filter=query_filter )

多集合并行检索:

def multi_collection_hybrid_search(client, collection_names, ...): """多集合并行混合搜索""" all_results = [] for collection_name in collection_names: results = hybrid_search(...) # 标记来源集合 for point in results: point.payload["_collection"] = collection_name all_results.extend(results) # 按分数降序排序,取 top-k all_results.sort(key=lambda x: x.score, reverse=True) return all_results[:limit]

八、SSE 流式输出

用户体验方面,O-RAG 支持 SSE 流式输出,避免了等待完整响应的枯燥感:

# app/query_process/agent/nodes/node_answer_output.py def step_3_create_answer(state, prompt): """流式或非流式生成答案""" model = get_llm_client() is_stream = state.get("is_stream", False) if is_stream: # 流式输出:逐 token 推送到前端 for chunk in model.stream(prompt): delta = chunk.content answer += delta push_to_session(state["session_id"], SSEEvent.DELTA, {"delta": delta}) else: # 非流式:一次性返回 response = model.invoke(prompt) set_task_result(state["session_id"], "answer", response.content)

SSE 事件类型包括:

  • DELTA:增量文本
  • PROGRESS:节点执行进度
  • FINAL:最终答案(包含图片 URL)

九、总结

回过头来看,O-RAG 系统的核心设计思路其实很清晰:

  1. 双服务分离:导入与查询职责分离,独立扩展。
  2. LangGraph 编排:基于状态图的 DAG 工作流,支持条件路由和并行分支。
  3. 三模数据源隔离:Qdrant 分集合存储,语义空间独立。
  4. 混合向量检索:BGE-M3 稠密+稀疏向量,Qdrant RRF 融合。
  5. 多路召回融合:向量检索、HyDE、Web 搜索、知识图谱多路并行,RRF 融合排序。

这套架构为后续的多路召回、跨库检索等高级特性打下了扎实的基础。下一篇我们会深入探讨 RRF 融合算法和 Rerank 精排的工程实践。

免责声明

本网站新闻资讯均来自公开渠道,力求准确但不保证绝对无误,内容观点仅代表作者本人,与本站无关。若涉及侵权,请联系我们处理。本站保留对声明的修改权,最终解释权归本站所有。

相关阅读

更多
欢迎回来 登录或注册后,可保存提示词和历史记录
登录后可同步收藏、历史记录和常用模板
注册即表示同意服务条款与隐私政策