Python人机交互代码实战:DeepAgents HITL实现详解
描述
在智能体(Agent)执行高风险操作(如文件删除、邮件发送)时,引入“人在回路”(Human-in-the-Loop, HITL)机制是保障安全的关键策略。该机制要求关键操作必须经过人工确认或干预后方可执行。
虽然LangChain文档提供了基础的HITL实现,但其同步输出模式难以满足交互式应用对实时性的要求。本文将基于DeepAgents框架,详细讲解如何构建一个支持同步流式输出的HITL方案,以实时呈现智能体的思考过程与工具调用状态。
依赖
除上述核心库外,实现中还使用了项目基础设施层的日志管理模块与配置管理模块。这两个工具类的具体实现可参考相关技术文档。
代码:DeepAgentManager
整体设计
方案的核心是DeepAgentManager管理类,其设计目标是封装智能体的创建、流式对话及中断恢复逻辑,对外提供简洁的API接口。
该类包含两个核心方法:stream_chat与stream_resume。所有流式输出数据均被封装为JSON对象,便于前端解析。主要包含以下几种消息类型:
类型
数据
示例
summarization
自动摘要中... \n
text
聊天文本
{'type': 'text', 'content': ' for'}
tool_call
工具调用:*** \n
{'type': 'tool_call', 'content': '工具调用:delete_file \n'}
tool_output
工具响应: ** \n
{'type': 'tool_output', 'content': '工具响应: Deleted /temp.txt \n'}
interrupt
调用方法中断
{'type': 'interrupt', 'content': '{"action_requests": [{"name": "delete_file", "args": {"path": "/temp.txt"}, "description": "Tool execution requires approval\\n\\nTool: delete_file\\nArgs: {\'path\': \'/temp.txt\'}"}], "review_configs": [{"action_name": "delete_file", "allowed_decisions": ["approve", "edit", "reject", "respond"]}]}'}
处理这些消息类型时,需注意以下三点:
第一,summarization类型。当对话历史触发自动摘要时,流中可能连续返回多条此类消息。前端展示时需妥善处理,避免干扰用户。
第二,所有content字段均为字符串。特别是interrupt类型,其content是一个JSON字符串,需通过json.loads()解析为Python字典后,才能获取其中的中断请求(action_requests)与审核配置(review_configs)。
第三,明确两个核心方法的分工:
方法名称
方法描述
stream_chat
基本的聊天方法,流式返回消息
stream_resume
中断重启方法,stream_chat中返回了interrupt消息,可以由该方法继续执行后续操作
简而言之,stream_chat是发起对话的入口,而stream_resume是在对话因人工审核中断后,根据人工决策(批准、拒绝等)继续执行的“重启键”。
工具demo
为演示HITL机制,我们定义几个工具,并为它们配置不同的中断策略。
在interrupt_on配置中:
delete_file: True表示删除文件操作默认触发中断,等待人工审核。read_file: False表示读取文件操作无需中断,可直接执行。send_email: {\"allowed_decisions\": [\"approve\", \"reject\"]}表示发送邮件操作会触发中断,且人工仅可做出“批准”或“拒绝”的决策。
这种灵活的配置方式,允许我们精确控制哪些高风险操作需要人工介入。
详细代码
以下是DeepAgentManager类的完整实现。代码结构清晰,核心逻辑集中于流式数据的处理与分发。
代码的关键在于两个静态方法 _deal_messages_chunk 和 _deal_updates_chunk。它们作为数据分类器,负责将DeepAgents框架返回的原始数据块(chunk),根据其类型(普通消息流或状态更新流)解析并封装为标准JSON格式。主方法 stream_chat 和 stream_resume 则驱动整个流式过程,并捕获异常以确保服务健壮性。
示例
最后,我们通过一个完整示例来演示该机制的运作流程。
运行此示例,你将观察到以下流程:
- 首先进行普通问答(“法国首都是哪儿”),流式输出文本回答。
- 用户发出“删除文件”指令。由于
delete_file工具配置了人工审核,流式输出中将包含一条interrupt类型消息。 - 程序解析该中断消息,获取待审核动作(action_requests)与可执行决策(review_configs)。
- 模拟人工做出“批准”(approve)决策,并将决策列表传递给
stream_resume方法。 stream_resume方法根据决策结果,继续执行流式输出,最终完成文件删除操作并返回工具执行结果。
整个过程清晰地展示了从对话触发中断,到人工介入决策,再到智能体根据决策继续执行的完整HITL闭环。这种流式实现使得前端能够实时展示每个步骤,从而提供更流畅的用户体验。
