年最新系统架构设计深度对比:LangGraph状态机与多源异构RAG全面解析
RAG 系统架构设计:基于 LangGraph 的多源异构 RAG 系统
在企业级知识管理的实际场景中,知识来源从来都不是单一、规整的。本地散落着大量 PDF 文档,语雀上维护着协作更新的在线文档,Gitee 代码仓库里躺着不断迭代的工程代码——这些知识源格式各异、存储分散,甚至连语义空间都不一致。真正要把它们统一管理起来,并提供精准的问答能力,恰恰是 RAG 系统面对的最棘手的挑战之一。
这套系统名为 O-RAG,专门为多源异构知识场景设计。它支持本地文档、语雀文档、代码仓库这三类典型知识源的统一导入和联合检索。下面,我们就从系统架构、工作流编排、状态管理等几个核心维度,拆解一下它的设计思路和实现细节。
一、技术栈选型
| 组件类型 | 技术选型 | 选型理由 |
|---|---|---|
| Web 框架 | FastAPI + Uvicorn | 高性能异步框架,原生支持 SSE 流式输出 |
| RAG 编排 | LangGraph | 基于状态图的 DAG 工作流,支持条件路由和并行分支 |
| 向量数据库 | Qdrant | 原生支持稠密+稀疏双向量混合检索,RRF 融合 |
| 图数据库 | Neo4j | 构建跨知识源的语义关系图谱 |
| 对象存储 | MinIO | 存储原始文件和图片资源 |
| 历史记录 | MySQL | 持久化对话历史 |
| Embedding 模型 | BGE-M3 | 原生支持稠密+稀疏向量,L2 归一化 |
| Rerank 模型 | BGE-Reranker | 精排模型,支持动态 TopK |
| 前端 UI | Element Plus | Vue 组件库,三标签页设计 |
二、系统整体架构
O-RAG 在架构上做了一个很关键的分手——将导入和查询拆成两个独立服务。
┌─────────────────────────────────────────────────────────────────┐
│用户界面(Element Plus) │
│ 三标签页:本地文档 | 语雀文档 | 代码仓库 │
└─────────────────────────────────────────────────────────────────┘
↓
┌─────────────────────────────────────────────────────────────────┐
│ query_process 服务(端口 8001) │
│ ┌──────────────────────────────────────────────────────────┐ │
│ │ 查询工作流(LangGraph) │ │
│ │ item_name_confirm → search_embedding ─┐ │ │
│ │ → search_hyde ├→ RRF → Rerank │ │
│ │ → web_search │ │ │
│ │ → query_kg ──────────┘ │ │
│ │ → Answer │ │
│ └──────────────────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────────────────┘
↓
┌─────────────────────────────────────────────────────────────────┐
│ import_process 服务(端口 8000) │
│ ┌──────────────────────────────────────────────────────────┐ │
│ │ 导入工作流(LangGraph) │ │
│ │ entry → pdf_to_md → md_img → document_split │ │
│ │ → item_name_recognition → bge_embedding │ │
│ │ → import_qdrant → import_kg │ │
│ └──────────────────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────────────────┘
↓
┌─────────────────────────────────────────────────────────────────┐
│ 存储层 │
│ ┌────────┐ ┌────────┐ ┌────────┐ ┌────────┐ ┌────────┐ │
│ │ Qdrant │ │ Neo4j │ │ MinIO │ │ MySQL │ │ 本地 │ │
│ │ 向量库 │ │ 图谱库 │ │ 对象库 │ │ 历史库 │ │ 文件 │ │
│ └────────┘ └────────┘ └────────┘ └────────┘ └────────┘ │
└─────────────────────────────────────────────────────────────────┘
这套设计背后有几个核心理念值得留意:
- 导入与查询分离:
import_process专心做知识导入流水线,query_process负责检索和生成。职责拆清楚了,各自才能独立演进和扩展。 - 统一前端入口:所有 UI 页面都在
query_process这一侧管理,避免了前端碎片化的问题。 - 三模数据源隔离:在 Qdrant 里,不同数据源使用独立的集合(
kb_docs、kb_yuque、kb_code),语义空间各自独立,生命周期也可以单独管理。
三、LangGraph 状态机编排
LangGraph 是 LangChain 生态里用来编排工作流的框架,核心思路是用状态图(StateGraph)来控制 DAG 流程。O-RAG 用它来编排导入和查询这两条核心流水线。
3.1 状态定义:TypedDict 类型安全
每个工作流都有一个 TypedDict 状态类,定义了流程中流转的所有数据字段:
class QueryGraphState(TypedDict):
"""查询流程状态"""
session_id: str # 会话唯一标识
original_query: str # 用户原始问题
# 检索类型控制(三模支持)
search_type: str # auto/doc/code/all/cross_kb
source_types: List[str] # 指定数据源类型列表
# 检索过程中的中间数据
embedding_chunks: list # 普通向量检索结果
hyde_embedding_chunks: list # HyDE 检索结果
web_search_docs: list # 网络搜索结果
kg_docs: list # 知识图谱查询结果
cross_kb_docs: list # 跨库链路检索结果
# 排序与生成
rrf_chunks: list # RRF 融合排序后
reranked_docs: list # Rerank 精排后
answer: str # 最终答案
这种设计的优势很明显:类型安全(TypedDict 提供代码补全和类型检查)、状态共享(所有节点通过 state 字典读写数据,无需全局变量)、以及深拷贝隔离(create_default_state() 用 deepcopy 避免状态污染)。
3.2 导入工作流:从 PDF 到向量库
导入工作流处理的是知识入库的完整链路:
entry → pdf_to_md → md_img → document_split → item_name_recognition
→ bge_embedding → import_qdrant → import_kg → END
根据文件类型,系统会走不同的路径:
def route_after_entry(state: ImportGraphState) -> str:
"""根据文件类型路由"""
if state["is_pdf_read_enabled"]:
return "node_pdf_to_md" # PDF → 转 Markdown
elif state["is_md_read_enabled"]:
return "node_md_img" # MD → 直接处理图片
else:
return END # 无效文件 → 结束
每个核心节点的职责和技术实现如下:
| 节点 | 职责 | 技术实现 |
|---|---|---|
node_pdf_to_md |
PDF 转 Markdown | MinerU 模型 |
node_md_img |
提取/下载 MD 中的图片 | MinIO 存储 |
node_document_split |
文档切分 | 标题切割 + 长度控制 |
node_item_name_recognition |
提取商品名 | LLM 识别 |
node_bge_embedding |
生成向量 | BGE-M3 模型 |
node_import_qdrant |
向量入库 | Qdrant 集合 |
node_import_kg |
知识图谱导入 | Neo4j 节点/关系 |
3.3 查询工作流:多路召回与融合
查询工作流是 O-RAG 的核心,实现了多路并行召回 + 融合排序 + 精排生成的完整链路:
item_name_confirm ─┬→ search_embedding ────┐
├→ search_embedding_hyde ├→ RRF → Rerank → Answer
├→ web_search_ta vily │
├→ query_kg ──────────────┘
└→ cross_kb_search ──────┘ (条件路由)
条件路由根据 search_type 分流:
def route_after_confirm(state: QueryGraphState) -> str:
"""根据检索类型路由"""
search_type = state.get("search_type", "all")
if search_type == "cross_kb":
return "cross_kb" # 跨库链路检索
return "normal" # 普通多路检索
在普通检索路径下,4 个检索节点会并行执行:
# 普通检索路径:多路并行召回
builder.add_edge("node_search_embedding", "node_rrf")
builder.add_edge("node_search_embedding_hyde", "node_rrf")
builder.add_edge("node_web_search_ta vily", "node_rrf")
builder.add_edge("node_query_kg", "node_rrf")
四、三模数据源管理
O-RAG 支持的这三类知识源,在 Qdrant 里有各自的独立集合:
class SourceType(str, Enum):
LOCAL_DOC = "local_doc" # 本地上传文档
YUQUE_DOC = "yuque_doc" # 语雀在线文档
CODE_REPO = "code_repo" # 代码仓库
# 数据源类型到 Qdrant 集合的映射
SOURCE_TYPE_COLLECTION_MAP = {
SourceType.LOCAL_DOC: "kb_docs",
SourceType.YUQUE_DOC: "kb_yuque",
SourceType.CODE_REPO: "kb_code",
}
分集合设计的优势:
- 语义空间隔离:不同数据源的向量空间独立,避免语义混淆。
- 生命周期独立:可以单独清理某个数据源的数据,不影响其他。
- 灵活检索:支持单集合、多集合、跨库链路三种检索模式。
4.1 语雀文档导入
语雀文档通过 API 客户端拉取,经过清洗后入库:
# app/import_process/processors/yuque_cleaner.py
def clean_yuque_content(content: str) -> str:
"""语雀内容清洗"""
# 去除 HTML 注释
content = re.sub(r'', '', content, flags=re.DOTALL)
# 去除 CDN 参数
content = re.sub(r'?x-oss-process=.*?$', '', content, flags=re.MULTILINE)
# 去除锚点标记
content = re.sub(r'', '', content)
return content
4.2 代码仓库导入
代码仓库通过 Gitee API 抓取,经过 AST 解析后构建代码图谱:
# app/import_process/processors/ast_parser.py
class ASTParser:
"""多语言 AST 解析器"""
def parse(self, code: str, language: str, file_path: str) -> List[CodeSymbol]:
if language == "python":
return self._parse_python(code, file_path) # 使用 ast 模块
elif language in ("ja vascript", "typescript"):
return self._parse_js(code, language, file_path) # 正则解析
elif language == "ja va":
return self._parse_ja va(code, file_path)
elif language == "go":
return self._parse_go(code, file_path)
代码图谱的节点类型包括 CodeFile(文件)、CodeClass(类)、CodeFunction(函数/方法)。关系类型则涵盖了 CONTAINS(包含)、INHERITS(继承)、CALLS(调用)、IMPORTS(导入)。
五、文档切分策略
文档切分是 RAG 系统绕不开的关键环节,直接影响检索质量。O-RAG 采用两阶段切分策略。
5.1 第一阶段:语义粗切(按标题)
def step_2_split_by_title(md_content, file_title):
"""根据 Markdown 标题进行语义切割"""
title_pattern = r'^\s*#{1,6}\s+.+'
lines = md_content.split('\n')
is_code_block = False
for line in lines:
strip_line = line.strip()
# 判断代码块状态(避免误识别代码注释为标题)
if strip_line.startswith('```') or strip_line.startswith('~~~'):
is_code_block = not is_code_block
# 判断是否为标题
is_title = (not is_code_block) and re.match(title_pattern, strip_line)
if is_title:
# 保存当前标题的内容,开始新标题
if current_title:
sections.append({"title": current_title, "content": ...})
current_title = strip_line
current_lines = [current_title]
这里有个特别的设计亮点:代码块感知。通过状态标记,避免将代码注释误识别为标题,保证了切分的准确性。按标题切割也能让每个 Chunk 保持语义完整。
5.2 第二阶段:精细控制(长度+重叠)
def step_3_refine_chunks(sections, max_length, min_length):
"""精细控制 Chunk 大小"""
# 1. 超长段落二次切割
for section in sections:
sub_section = split_long_section(section, max_length)
final_sections.extend(sub_section)
# 2. 过短段落合并(同一 parent_title)
final_sections = merge_short_sections(final_sections, min_length)
参数配置:
DEFAULT_MAX_CONTENT_LENGTH = 2000:单个 Chunk 最大字符数MIN_CONTENT_LENGTH = 500:短 Chunk 合并阈值chunk_overlap = 100:二次切割的重叠长度
六、BGE-M3 向量化
O-RAG 采用 BGE-M3 模型生成稠密+稀疏混合向量,这是实现混合检索的基础:
# app/lm/embedding_utils.py
def generate_embeddings(texts):
"""生成稠密+稀疏混合向量"""
model = get_bge_m3_ef() # 单例模式
embeddings = model.encode_documents(texts)
# 处理稀疏向量(CSR 格式 → 字典格式)
processed_sparse = []
for i in range(len(texts)):
sparse_indices = embeddings["sparse"].indices[...].tolist()
sparse_data = embeddings["sparse"].data[...].tolist()
sparse_dict = {k: v for k, v in zip(sparse_indices, sparse_data)}
processed_sparse.append(sparse_dict)
return {
"dense": [emb.tolist() for emb in embeddings["dense"]],
"sparse": processed_sparse
}
技术亮点:
- 原生 L2 归一化:开启
normalize_embeddings=True,单位化后内积等价于余弦相似度。 - NumPy 类型转换:
np.int64 → int、np.float32 → float,解决了序列化问题。 - 单例模式:模型只加载一次,避免重复初始化。
七、Qdrant 混合检索
Qdrant 原生支持稠密+稀疏双向量混合检索,通过 RRF 融合两路结果:
# app/clients/qdrant_utils.py
def hybrid_search(client, collection_name, dense_vector, sparse_vector, ...):
"""稠密+稀疏向量混合搜索"""
# 构建预检索参数
dense_prefetch = Prefetch(
query=dense_vector,
using="dense",
limit=max(limit * 4, 20) # 预召回 4 倍
)
sparse_prefetch = Prefetch(
query=qdrant_sparse,
using="sparse",
limit=max(limit * 4, 20)
)
# 执行混合搜索,使用 RRF 融合
results = client.query_points(
collection_name=collection_name,
prefetch=[dense_prefetch, sparse_prefetch],
query=FusionQuery(fusion=Fusion.RRF),
limit=limit,
query_filter=query_filter
)
多集合并行检索:
def multi_collection_hybrid_search(client, collection_names, ...):
"""多集合并行混合搜索"""
all_results = []
for collection_name in collection_names:
results = hybrid_search(...)
# 标记来源集合
for point in results:
point.payload["_collection"] = collection_name
all_results.extend(results)
# 按分数降序排序,取 top-k
all_results.sort(key=lambda x: x.score, reverse=True)
return all_results[:limit]
八、SSE 流式输出
用户体验方面,O-RAG 支持 SSE 流式输出,避免了等待完整响应的枯燥感:
# app/query_process/agent/nodes/node_answer_output.py
def step_3_create_answer(state, prompt):
"""流式或非流式生成答案"""
model = get_llm_client()
is_stream = state.get("is_stream", False)
if is_stream:
# 流式输出:逐 token 推送到前端
for chunk in model.stream(prompt):
delta = chunk.content
answer += delta
push_to_session(state["session_id"], SSEEvent.DELTA, {"delta": delta})
else:
# 非流式:一次性返回
response = model.invoke(prompt)
set_task_result(state["session_id"], "answer", response.content)
SSE 事件类型包括:
DELTA:增量文本PROGRESS:节点执行进度FINAL:最终答案(包含图片 URL)
九、总结
回过头来看,O-RAG 系统的核心设计思路其实很清晰:
- 双服务分离:导入与查询职责分离,独立扩展。
- LangGraph 编排:基于状态图的 DAG 工作流,支持条件路由和并行分支。
- 三模数据源隔离:Qdrant 分集合存储,语义空间独立。
- 混合向量检索:BGE-M3 稠密+稀疏向量,Qdrant RRF 融合。
- 多路召回融合:向量检索、HyDE、Web 搜索、知识图谱多路并行,RRF 融合排序。
这套架构为后续的多路召回、跨库检索等高级特性打下了扎实的基础。下一篇我们会深入探讨 RRF 融合算法和 Rerank 精排的工程实践。
