事件驱动智能体构建实战:基于云端API从ReAct到Workflow完整教程
核心摘要 (TL;DR)
- 架构进阶:从传统的 ReAct 循环升级为 LlamaIndex Workflow,掌握基于“事件驱动”的现代智能体编排范式。
- 算力自由:摆脱本地 GPU 显存与运行时间的限制,实战接入 Groq 与硅基流动的高性能免费大模型 API。
- 实战目标:利用 Workflow 重构博客监控程序,将其拆解为“检查员 (Checker)”与“通知官 (Notifier)”,实现多智能体的解耦与协作。
前言
之前我们用本地加载的 Hugging Face 模型跑了个 ReActAgent,完成了任务。但说实话,LlamaIndex 的“正经”玩法其实是 Workflow,而且更推荐用 API 调用,而不是本地硬扛模型。所以这篇就来聊聊 LlamaIndex 的 Workflow 流程,顺便带大家摆脱对 GPU 的依赖。
1. 什么是WorkFlow
之前咱们那种用法其实属于 QueryEngine 的范畴,就是把大模型当成一个查询工具。而 Workflow 是 LlamaIndex 推出的新一代编排引擎,概念上完全不一样。
1.1 核心逻辑
LlamaIndex 的 Workflow,本质上是一个事件驱动(Event-driven)的状态机。经常做开发的朋友应该一听就懂。咱们可以把整个流程想象成一场“接力赛”:
- 选手(Steps):在代码里用
@step装饰的函数,每个函数就是一个 step,负责具体的逻辑。 - 接力棒(Events):自定义的类,继承自
Event,用于在 Step 之间传递数据。 - 终点线(StopEvent):和普通
Event一样,只是作为最后一棒。某个 Step 返回StopEvent就意味着“接力赛”结束,把结果返回给用户。
1.2 核心组件
- Event
这是 Workflow 的灵魂。可以定义各种 Event,它们决定了数据流向哪里。
from llama_index.core.workflow import Event
class NewsExtractedEvent(Event):
content: str # 这里可以随便定义内容
xxx: int # 多少条都可以
@step
逻辑处理单元。一个 Step 可以通过返回值来“发射”事件。- 生产者:返回一个
Event实例。 - 消费者:在下一个 Step 的函数参数中声明对应的
Event类型。
- 生产者:返回一个
- StartEvent & StopEvent
流程的“起跑线”和“终点线”。通常只需要定义StopEvent,调用workflow.run()时会自动触发StartEvent。任何 Step 只要返回StopEvent,整个 Workflow 就会停止并输出结果。
结合起来,就是一个极简的 Workflow 模板:
from llama_index.core.workflow import Workflow, step, StartEvent, StopEvent, Event
class ProcessedDataEvent(Event):
data: str
class MyWorkflow(Workflow):
@step
async def ingest(self, ev: StartEvent) -> ProcessedDataEvent:
# 1. 接收初始输入
input_data = ev.get("input")
return ProcessedDataEvent(data=input_data.upper())
@step
async def finish(self, ev: ProcessedDataEvent) -> StopEvent:
# 2. 接收上一步处理后的数据
return StopEvent(result=f"处理完成: {ev.data}")
# 运行
wf = MyWorkflow()
await wf.run(input="hello world") # -> "处理完成: HELLO WORLD"
通常,我们会把 step 定义成一个调用 LLM 处理的函数。后面就用 Workflow 重写上一篇中的 Agent。
2. API获取
这次换换口味,不用本地加载模型,而是用 API 的方式调用大模型。所以本篇的代码可以在没有显卡的电脑上运行,完全脱离 Kaggle 的 GPU 环境。
PS: 虽然调用 API 更方便,但本地加载或部署更安全。
这里介绍两个免费模型的 API 平台:Groq 和 硅基流动。
2.1 Groq
Groq 本身不是模型开发者,而是大模型的“搬运工”——一个推理加速平台。它的核心是自研的 LPU(Language Processing Unit),推理速度快得惊人。
- Groq模型矩阵
| 模型系列 | 模型 ID (API 调用名) | 厂商 | 上下文窗口 | 特点与优势 |
|---|---|---|---|---|
| Llama 3.3 | llama-3.3-70b-versatile | Meta | 128k | 最强逻辑,性能对标 GPT-4o |
| Llama 3.1 | llama-3.1-8b-instant | Meta | 128k | 极速响应,适合简单逻辑与翻译 |
| Llama 3.2 | llama-3.2-11b-vision-preview | Meta | 128k | 多模态支持,可识别并理解图片 |
| Gemma 2 | gemma2-9b-it | 8k | 指令遵循强,结构化输出更稳定 | |
| Mixtral | mixtral-8x7b-32768 | Mistral | 32k | 经典的混合专家模型,长文本表现好 |
| Whisper | whisper-large-v3 | OpenAI | - | 极速语音转文字 (STT) |
- Groq的限制
Groq 的免费额度给得非常慷慨,堪称业界良心。但为了防止滥用,也设置了不少限制,通常是对每个模型独立计算。限制维度包括 RPM(Requests Per Minute)、RPD(Requests Per Day)、TPM(Tokens Per Minute)。对于日常开发来说,完全够用。 注册Groq Api Key
- 生成 API Key
2.2 硅基流动
硅基流动是第二个大模型“搬运工”。国内开发者用得比较多,上面有许多便宜大碗的模型,也有不少免费的模型可用。
- 硅基流动模型矩阵
| 模型系列 | 最新代表型号 (API 调用名) | 核心优势 | 上下文窗口 | 状态 |
|---|---|---|---|---|
| DeepSeek | deepseek-ai/DeepSeek-V3.2 | 性能对标顶级闭源模型,极强逻辑 | 128k | 旗舰收费 |
| DeepSeek | deepseek-ai/DeepSeek-R1 | 推理增强模型,擅长复杂数学与逻辑 | 64k | 部分免费 |
| Qwen | Qwen/Qwen3-30B-A3B | 阿里最新一代模型,中文语境指令遵循极强 | 128k | 免费/收费 |
| Qwen-VL | Qwen/Qwen3-VL-72B | 顶级视觉理解能力,支持视频分析 | 32k | 旗舰收费 |
| GLM | Z.ai/GLM-5 | 智谱旗舰,专注于长周期 Agent 任务 | 205k | 旗舰收费 |
| Kimi | moonshotai/Kimi-K2.5 | 原生多模态智能体模型,长文本处理极佳 | 262k | 旗舰收费 |
| Step | stepfun-ai/Step-3.5-Flash | 极高性价比的轻量模型,推理速度极快 | 262k | 低成本/免费 |
| MiniMax | MiniMaxAI/MiniMax-M2.5 | 擅长角色扮演与情感交互 | 197k | 旗舰收费 |
| Llama 3 | meta-llama/Llama-3.3-70B-Instruct | 国际最强开源模型,适配各类 Agent 框架 | 128k | 旗舰收费 |
- 硅基流动的限制
硅基流动的可用模型比 Groq 更多,有不少永久免费的模型,还有新用户赠金。但免费版的并发数较低,具体模型也有 RPM 和 TPM 限制。 注册硅基流动API Key
后面会使用 Qwen/Qwen3-8B 这个免费模型。
3. 代码实战
3.1 环境准备
这次和此前不同,我们用 API 不用自己加载模型,所以不用开启 GPU 加速,Accelerator 选择 None 即可。
先装依赖库:
# 1. 确保安装最新版 uv
!pip install -U uv
# 2. 使用修正后的包名进行安装
!uv pip install llama-index llama-index-llms-groq llama-index-llms-openai-like feedparser requests transformers
注意:硅基流动其实有专门的定制库 llama-index-llms-siliconflow,但这里为了通用性用了 llama-index-llms-openai-like。(实际上最初试了 Groq 效果不太好,想换中文模型试试,结果也不理想,这次主要还是介绍 Workflow 架构。)
3.2 配置密钥
和上次一样,在 Add-ons -> Secrets 中添加 Groq 和硅基流动的 API Key,分别命名为 GROQ_API_KEY 和 SILICON-API_KEY(名字随意,和代码匹配即可)。
3.3 使用API来示例llm
Tools 和之前一样,不再赘述。
- 定义Groq的模型
import os
from llama_index.llms.groq import Groq
from llama_index.core import Settings
os.environ["GROQ_API_KEY"] = user_secrets.get_secret("GROQ_API_KEY")
# 推荐使用 llama-3.3-70b,逻辑推理能力足以应对多智能体协作
Settings.llm = Groq(model="llama-3.3-70b-versatile")
- 定义硅基流动的Qwen模型
from llama_index.llms.openai_like import OpenAILike
Settings.llm = OpenAILike(
model="Qwen/Qwen3-8B",
api_key=user_secrets.get_secret("SILICON-API_KEY"),
api_base="https://api.siliconflow.cn/v1",
is_chat_model=True
)
可见,定制接口只需定义对应的 API Key 到环境变量即可,通用接口则要补充模型名、api-key、api 的 url 等。
3.4 整体流程
计划定义两个 Agent:
- Checker:负责获取最新博客并解析。
- Notifier:负责发送通知。
当有新的博客时,Checker 向 Notifier 发送事件,告知标题和链接,然后结束。
3.5 定义Event
from llama_index.core.workflow import Workflow, step, StartEvent, StopEvent, Context, Event
class NewBlogPostEvent(Event):
title: str
link: str
由于 Checker 流向 Notifier 只需要标题和链接,所以 NewBlogPostEvent 只包含 title 和 link 字段。
3.6 定义两个合作的Agent
先定义 Checker:需要明确其角色(ROLE)、任务(TASK)和输出要求(OUTPUT_FORMAT),并赋予其所需工具。
from llama_index.core.agent.workflow import ReActAgent
CHECKER_SYSTEM_PROMPT = """
### ROLE
咱们是一个极其精准的数据提取专家。
### TASK
1. 调用工具获取最新博客文章。
2. 提取其标题和链接。
3. **必须**以 JSON 格式输出。
### OUTPUT_FORMAT
{"title": "文章标题", "link": "文章链接"}
"""
checker = ReActAgent(
name="checker",
system_prompt=CHECKER_SYSTEM_PROMPT,
tools=[tool_get_blog],
llm=Settings.llm
)
再定义 Notifier:
NOTIFIER_SYSTEM_PROMPT = """
### ROLE
咱们是一个通知官。
### TASK
请根据提供的信息发送通知。
直接调用工具发送,不要解释咱们的行为。发送成功后回复“已发出”即可。
"""
notifier = ReActAgent(
name="notifier",
system_prompt=NOTIFIER_SYSTEM_PROMPT,
tools=[tool_send_all],
llm=Settings.llm
)
3.7 定义Workflow
- 从 Workflow 类派生出我们的 workflow 类。
- 定义 step,从
StartEvent开始到StopEvent结束。
由于模型的指令跟随能力较弱,格式化输出有些困难,这里用了一个正则来兜底。
import os
import re
import json
global_last_title = ""
class DemoBlogMonitor(Workflow):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.checker = checker
self.notifier = notifier
@step
async def check_step(self, ev: StartEvent) -> NewBlogPostEvent | StopEvent:
global global_last_title
rss_url = ev.get("rss_url")
print(f"--- [Checker] 目标: {rss_url} ---")
print(f"--- [Checker] 基准: '{global_last_title}' ---")
response = await self.checker.run(user_msg=f"请提取此 RSS 的最新数据:{rss_url}")
res_text = str(response)
print(f"DEBUG [Agent Output]: {res_text}")
try:
json_match = re.search(r"({.*})", res_text, re.DOTALL)
if not json_match:
raise ValueError("未在回复中找到 JSON 块")
data = json.loads(json_match.group(1))
current_title = data.get("title", "").strip()
current_link = data.get("link", "").strip()
if not current_title or not current_link:
raise ValueError("JSON 数据缺失字段")
# 在 Python 侧进行确定性比对
if current_title != global_last_title:
print(f"发现新动态: {current_title}")
# 只在成功触发通知时更新状态
return NewBlogPostEvent(title=current_title, link=current_link)
else:
return StopEvent(result=f"判定:内容未更新")
except Exception as e:
# 如果 JSON 解析失败,尝试最后的正则兜底
print(f"WARN: JSON 解析失败 ({e}), 尝试正则兜底...")
link_match = re.search(r"(https?://S+)", res_text)
if link_match:
link = link_match.group(1).strip(" ""。")
# 假设引号中间的内容是标题
title_match = re.search(r"[是“"'](.*?)[”"']", res_text)
title = title_match.group(1) if title_match else "未识别标题"
if title != global_last_title:
return NewBlogPostEvent(title=title, link=link)
return StopEvent(result=f"解析彻底失败。原文: {res_text}")
@step
async def notify_step(self, ev: NewBlogPostEvent) -> StopEvent:
global global_last_title
print(f"--- [Notifier] 正在发送: {ev.title} ---")
await self.notifier.run(user_msg=f"发送通知:标题《{ev.title}》,链接 {ev.link}")
# 只有通知成功发出后,才真正更新全局基准
global_last_title = ev.title
return StopEvent(result=f"完成。新基准已设为: {ev.title}")
3.8 运行
async def main():
monitor = DemoBlogMonitor(timeout=120)
print("n>>> Run #1")
await monitor.run(rss_url="https://blog.algieba12.cn/atom.xml")
print("n>>> Run #2")
await monitor.run(rss_url="https://blog.algieba12.cn/atom.xml")
await main()
这里运行两遍,确认全局变量会同步更新。实际使用时可将其持久化存储,而不是用全局变量保存在内存中。
4. 代码
本篇博客代码示例可在 Kaggle 笔记本中找到。
5. 常见问题 (Q&A)
Q1: 为什么不让 Agent 直接判断是否有更新,而是要在 Python 代码里写 if current_title != global_last_title:?
A: 为了系统的绝对稳定性!(划重点)
大模型在 ReAct 模式下有时会产生“幻觉”或者表现得过于“客气”。
- 现象:最初的设计是让模型自己比对已知标题和新标题,结果模型看到传进去的旧标题占位符,误以为这是一道“填空题”,不仅没有比对,还很有礼貌地让填入真实标题。
- 结论:最佳实践是逻辑解耦。让 Agent 专注于它最擅长的事情(从杂乱网页中精准提取非结构化数据),而将确定性的逻辑判断(字符串比对)交还给传统的 Python 代码来做。
Q2: 为什么一定要强制模型输出 JSON 格式?之前用 | 这种自定义分隔符不行吗?
A: 因为自然语言的边界太模糊,且极易与真实数据冲突。
这是本次开发多智能体协作时遇到的最大痛点!
- 表达欲失控:高级模型(比如 Llama-3.3 或 Qwen)即使在 Prompt 里千叮咛万嘱咐只输出数据,也极大概率会在结果前后加上“好的,最新文章是…”之类的废话,导致
split("|")直接报错。 - 特殊符号冲突:碰巧博客的标题里本身就带有
|符号,正则匹配会产生严重的边界歧义。 - JSON 的优势:JSON 拥有明确的边界标识(花括号和双引号)。配合 Python 正则
re.search(r"({.*})")专门抓取大括号之间的内容,就能彻底免疫模型的“废话”。
Q3: 运行 Workflow 时碰到了 WorkflowValidationError: The following events are produced but never consumed 报错是怎么回事?
A: 这是 LlamaIndex 工作流引擎的一种“管道安全检查”机制。
它的意思是:check_step 生产了一个 NewBlogPostEvent,但工作流里没有任何 Step 声明要接收并处理这个事件,导致事件“无家可归”。
- 排查方法:检查消费者函数(在这里是
notify_step)。 - 解决方案:确保其参数严格加上了对应的类型注解,写成
async def notify_step(self, ev: NewBlogPostEvent)。这样引擎就能自动把上一步产生的数据路由到这里,完成闭环。
Q4: 调用 Groq 或硅基流动 API 时,频繁遇到 429 Too Many Requests 怎么办?
A: 这是因为多智能体循环请求过快,触发了并发限制。
在开发 ReAct Agent 时,模型需要反复执行“思考-行动-观察”的循环,短时间内会产生高密度的 API 请求。
- 冷却机制:如果是演示项目,可以在每次检测循环之间或者 Step 之间加一个简单的
await asyncio.sleep(2)进行冷却。 - 精简上下文:尽量精简 Prompt 和工具返回的数据量,避免单次请求打满 TPM(Tokens Per Minute)限制。如果是生产环境,建议使用硅基流动的付费版以获取更高的并发额度。
Q5: 在生产环境中,我还应该使用 global global_last_title 这种全局变量来管理状态吗?
A: 非常不建议。
本教程使用全局变量仅为了降低代码阅读门槛和方便演示。
- 状态污染:在实际生产环境中,Workflow 往往是高并发运行的(比如同时监控 10 个不同的 RSS 链接)。如果用全局变量,不同的任务会互相覆盖状态。
- 正确做法:LlamaIndex Workflow 提供了内置的
Context对象。应该在@step中通过ctx.set("last_title", title)和ctx.get("last_title")来在不同的步骤之间安全地传递状态,或者引入外部数据库(如 Redis、本地 JSON 文件)进行持久化。





