事件驱动智能体构建实战:基于云端API从ReAct到Workflow完整教程

2026-06-11阅读 0热度 0
大模型

核心摘要 (TL;DR)

  • 架构进阶:从传统的 ReAct 循环升级为 LlamaIndex Workflow,掌握基于“事件驱动”的现代智能体编排范式。
  • 算力自由:摆脱本地 GPU 显存与运行时间的限制,实战接入 Groq 与硅基流动的高性能免费大模型 API。
  • 实战目标:利用 Workflow 重构博客监控程序,将其拆解为“检查员 (Checker)”与“通知官 (Notifier)”,实现多智能体的解耦与协作。

前言

之前我们用本地加载的 Hugging Face 模型跑了个 ReActAgent,完成了任务。但说实话,LlamaIndex 的“正经”玩法其实是 Workflow,而且更推荐用 API 调用,而不是本地硬扛模型。所以这篇就来聊聊 LlamaIndex 的 Workflow 流程,顺便带大家摆脱对 GPU 的依赖。

1. 什么是WorkFlow

之前咱们那种用法其实属于 QueryEngine 的范畴,就是把大模型当成一个查询工具。而 Workflow 是 LlamaIndex 推出的新一代编排引擎,概念上完全不一样。

1.1 核心逻辑

LlamaIndex 的 Workflow,本质上是一个事件驱动(Event-driven)的状态机。经常做开发的朋友应该一听就懂。咱们可以把整个流程想象成一场“接力赛”:

  • 选手(Steps):在代码里用 @step 装饰的函数,每个函数就是一个 step,负责具体的逻辑。
  • 接力棒(Events):自定义的类,继承自 Event,用于在 Step 之间传递数据。
  • 终点线(StopEvent):和普通 Event 一样,只是作为最后一棒。某个 Step 返回 StopEvent 就意味着“接力赛”结束,把结果返回给用户。

1.2 核心组件

  1. Event
    这是 Workflow 的灵魂。可以定义各种 Event,它们决定了数据流向哪里。
from llama_index.core.workflow import Event

class NewsExtractedEvent(Event):
    content: str  # 这里可以随便定义内容
    xxx: int # 多少条都可以
  1. @step
    逻辑处理单元。一个 Step 可以通过返回值来“发射”事件。

    • 生产者:返回一个 Event 实例。
    • 消费者:在下一个 Step 的函数参数中声明对应的 Event 类型。
  2. StartEvent & StopEvent
    流程的“起跑线”和“终点线”。通常只需要定义 StopEvent,调用 workflow.run() 时会自动触发 StartEvent。任何 Step 只要返回 StopEvent,整个 Workflow 就会停止并输出结果。

结合起来,就是一个极简的 Workflow 模板:

from llama_index.core.workflow import Workflow, step, StartEvent, StopEvent, Event

class ProcessedDataEvent(Event):
    data: str

class MyWorkflow(Workflow):
    @step
    async def ingest(self, ev: StartEvent) -> ProcessedDataEvent:
        # 1. 接收初始输入
        input_data = ev.get("input")
        return ProcessedDataEvent(data=input_data.upper())

    @step
    async def finish(self, ev: ProcessedDataEvent) -> StopEvent:
        # 2. 接收上一步处理后的数据
        return StopEvent(result=f"处理完成: {ev.data}")

# 运行
wf = MyWorkflow()
await wf.run(input="hello world") # -> "处理完成: HELLO WORLD"

通常,我们会把 step 定义成一个调用 LLM 处理的函数。后面就用 Workflow 重写上一篇中的 Agent。

2. API获取

这次换换口味,不用本地加载模型,而是用 API 的方式调用大模型。所以本篇的代码可以在没有显卡的电脑上运行,完全脱离 Kaggle 的 GPU 环境。
PS: 虽然调用 API 更方便,但本地加载或部署更安全。
这里介绍两个免费模型的 API 平台:Groq硅基流动

2.1 Groq

Groq 本身不是模型开发者,而是大模型的“搬运工”——一个推理加速平台。它的核心是自研的 LPU(Language Processing Unit),推理速度快得惊人。

  • Groq模型矩阵
模型系列模型 ID (API 调用名)厂商上下文窗口特点与优势
Llama 3.3llama-3.3-70b-versatileMeta128k最强逻辑,性能对标 GPT-4o
Llama 3.1llama-3.1-8b-instantMeta128k极速响应,适合简单逻辑与翻译
Llama 3.2llama-3.2-11b-vision-previewMeta128k多模态支持,可识别并理解图片
Gemma 2gemma2-9b-itGoogle8k指令遵循强,结构化输出更稳定
Mixtralmixtral-8x7b-32768Mistral32k经典的混合专家模型,长文本表现好
Whisperwhisper-large-v3OpenAI-极速语音转文字 (STT)
  • Groq的限制
    Groq 的免费额度给得非常慷慨,堪称业界良心。但为了防止滥用,也设置了不少限制,通常是对每个模型独立计算。限制维度包括 RPM(Requests Per Minute)、RPD(Requests Per Day)、TPM(Tokens Per Minute)。对于日常开发来说,完全够用。
  • 注册Groq Api Key

    • 进入 Groq 官网,点击右上角的 Start Building
    • 用谷歌账号登录即可。
    • 点击顶部菜单栏的 API KEYS
    • 点击 Create API Key,可以设置过期时间,保存下来后续配置到 Secrets 中。
  • 生成 API Key

2.2 硅基流动

硅基流动是第二个大模型“搬运工”。国内开发者用得比较多,上面有许多便宜大碗的模型,也有不少免费的模型可用。

  • 硅基流动模型矩阵
模型系列最新代表型号 (API 调用名)核心优势上下文窗口状态
DeepSeekdeepseek-ai/DeepSeek-V3.2性能对标顶级闭源模型,极强逻辑128k旗舰收费
DeepSeekdeepseek-ai/DeepSeek-R1推理增强模型,擅长复杂数学与逻辑64k部分免费
QwenQwen/Qwen3-30B-A3B阿里最新一代模型,中文语境指令遵循极强128k免费/收费
Qwen-VLQwen/Qwen3-VL-72B顶级视觉理解能力,支持视频分析32k旗舰收费
GLMZ.ai/GLM-5智谱旗舰,专注于长周期 Agent 任务205k旗舰收费
Kimimoonshotai/Kimi-K2.5原生多模态智能体模型,长文本处理极佳262k旗舰收费
Stepstepfun-ai/Step-3.5-Flash极高性价比的轻量模型,推理速度极快262k低成本/免费
MiniMaxMiniMaxAI/MiniMax-M2.5擅长角色扮演与情感交互197k旗舰收费
Llama 3meta-llama/Llama-3.3-70B-Instruct国际最强开源模型,适配各类 Agent 框架128k旗舰收费
  • 硅基流动的限制
    硅基流动的可用模型比 Groq 更多,有不少永久免费的模型,还有新用户赠金。但免费版的并发数较低,具体模型也有 RPM 和 TPM 限制。
  • 注册硅基流动API Key

    • 进入硅基流动官网,用手机号或微信号注册。
    • 点击左侧侧边栏的 API密钥,点击 新建API密钥

后面会使用 Qwen/Qwen3-8B 这个免费模型。

3. 代码实战

3.1 环境准备

这次和此前不同,我们用 API 不用自己加载模型,所以不用开启 GPU 加速,Accelerator 选择 None 即可。

先装依赖库:

# 1. 确保安装最新版 uv
!pip install -U uv

# 2. 使用修正后的包名进行安装
!uv pip install llama-index llama-index-llms-groq llama-index-llms-openai-like feedparser requests transformers

注意:硅基流动其实有专门的定制库 llama-index-llms-siliconflow,但这里为了通用性用了 llama-index-llms-openai-like。(实际上最初试了 Groq 效果不太好,想换中文模型试试,结果也不理想,这次主要还是介绍 Workflow 架构。)

3.2 配置密钥

和上次一样,在 Add-ons -> Secrets 中添加 Groq 和硅基流动的 API Key,分别命名为 GROQ_API_KEYSILICON-API_KEY(名字随意,和代码匹配即可)。

3.3 使用API来示例llm

Tools 和之前一样,不再赘述。

  • 定义Groq的模型
import os
from llama_index.llms.groq import Groq
from llama_index.core import Settings

os.environ["GROQ_API_KEY"] = user_secrets.get_secret("GROQ_API_KEY")

# 推荐使用 llama-3.3-70b,逻辑推理能力足以应对多智能体协作
Settings.llm = Groq(model="llama-3.3-70b-versatile")
  • 定义硅基流动的Qwen模型
from llama_index.llms.openai_like import OpenAILike

Settings.llm = OpenAILike(
    model="Qwen/Qwen3-8B",
    api_key=user_secrets.get_secret("SILICON-API_KEY"),
    api_base="https://api.siliconflow.cn/v1",
    is_chat_model=True
)

可见,定制接口只需定义对应的 API Key 到环境变量即可,通用接口则要补充模型名、api-key、api 的 url 等。

3.4 整体流程

计划定义两个 Agent:

  • Checker:负责获取最新博客并解析。
  • Notifier:负责发送通知。

当有新的博客时,Checker 向 Notifier 发送事件,告知标题和链接,然后结束。

整体流程如下:

3.5 定义Event

from llama_index.core.workflow import Workflow, step, StartEvent, StopEvent, Context, Event

class NewBlogPostEvent(Event):
    title: str
    link: str

由于 Checker 流向 Notifier 只需要标题和链接,所以 NewBlogPostEvent 只包含 titlelink 字段。

3.6 定义两个合作的Agent

先定义 Checker:需要明确其角色(ROLE)任务(TASK)输出要求(OUTPUT_FORMAT),并赋予其所需工具。

from llama_index.core.agent.workflow import ReActAgent

CHECKER_SYSTEM_PROMPT = """
### ROLE
咱们是一个极其精准的数据提取专家。

### TASK
1. 调用工具获取最新博客文章。
2. 提取其标题和链接。
3. **必须**以 JSON 格式输出。

### OUTPUT_FORMAT
{"title": "文章标题", "link": "文章链接"}
"""
checker = ReActAgent(
            name="checker",
            system_prompt=CHECKER_SYSTEM_PROMPT,
            tools=[tool_get_blog],
            llm=Settings.llm
        )

再定义 Notifier

NOTIFIER_SYSTEM_PROMPT = """
### ROLE
咱们是一个通知官。

### TASK
请根据提供的信息发送通知。
直接调用工具发送,不要解释咱们的行为。发送成功后回复“已发出”即可。
"""
notifier = ReActAgent(
            name="notifier",
            system_prompt=NOTIFIER_SYSTEM_PROMPT,
            tools=[tool_send_all],
            llm=Settings.llm
        )

3.7 定义Workflow

  • Workflow 类派生出我们的 workflow 类。
  • 定义 step,从 StartEvent 开始到 StopEvent 结束。

由于模型的指令跟随能力较弱,格式化输出有些困难,这里用了一个正则来兜底。

import os
import re
import json

global_last_title = ""

class DemoBlogMonitor(Workflow):
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.checker = checker
        self.notifier = notifier

    @step
    async def check_step(self, ev: StartEvent) -> NewBlogPostEvent | StopEvent:
        global global_last_title
        rss_url = ev.get("rss_url")

        print(f"--- [Checker] 目标: {rss_url} ---")
        print(f"--- [Checker] 基准: '{global_last_title}' ---")

        response = await self.checker.run(user_msg=f"请提取此 RSS 的最新数据:{rss_url}")
        res_text = str(response)
        print(f"DEBUG [Agent Output]: {res_text}")

        try:
            json_match = re.search(r"({.*})", res_text, re.DOTALL)
            if not json_match:
                raise ValueError("未在回复中找到 JSON 块")

            data = json.loads(json_match.group(1))
            current_title = data.get("title", "").strip()
            current_link = data.get("link", "").strip()

            if not current_title or not current_link:
                raise ValueError("JSON 数据缺失字段")

            # 在 Python 侧进行确定性比对
            if current_title != global_last_title:
                print(f"发现新动态: {current_title}")
                # 只在成功触发通知时更新状态
                return NewBlogPostEvent(title=current_title, link=current_link)
            else:
                return StopEvent(result=f"判定:内容未更新")

        except Exception as e:
            # 如果 JSON 解析失败,尝试最后的正则兜底
            print(f"WARN: JSON 解析失败 ({e}), 尝试正则兜底...")
            link_match = re.search(r"(https?://S+)", res_text)
            if link_match:
                link = link_match.group(1).strip(" ""。")
                # 假设引号中间的内容是标题
                title_match = re.search(r"[是“"'](.*?)[”"']", res_text)
                title = title_match.group(1) if title_match else "未识别标题"

                if title != global_last_title:
                    return NewBlogPostEvent(title=title, link=link)

            return StopEvent(result=f"解析彻底失败。原文: {res_text}")

    @step
    async def notify_step(self, ev: NewBlogPostEvent) -> StopEvent:
        global global_last_title
        print(f"--- [Notifier] 正在发送: {ev.title} ---")

        await self.notifier.run(user_msg=f"发送通知:标题《{ev.title}》,链接 {ev.link}")

        # 只有通知成功发出后,才真正更新全局基准
        global_last_title = ev.title
        return StopEvent(result=f"完成。新基准已设为: {ev.title}")

3.8 运行

async def main():
    monitor = DemoBlogMonitor(timeout=120)

    print("n>>> Run #1")
    await monitor.run(rss_url="https://blog.algieba12.cn/atom.xml")

    print("n>>> Run #2")
    await monitor.run(rss_url="https://blog.algieba12.cn/atom.xml")
await main()

这里运行两遍,确认全局变量会同步更新。实际使用时可将其持久化存储,而不是用全局变量保存在内存中。

4. 代码

本篇博客代码示例可在 Kaggle 笔记本中找到。

5. 常见问题 (Q&A)

Q1: 为什么不让 Agent 直接判断是否有更新,而是要在 Python 代码里写 if current_title != global_last_title:
A: 为了系统的绝对稳定性!(划重点)
大模型在 ReAct 模式下有时会产生“幻觉”或者表现得过于“客气”。

  • 现象:最初的设计是让模型自己比对已知标题和新标题,结果模型看到传进去的旧标题占位符,误以为这是一道“填空题”,不仅没有比对,还很有礼貌地让填入真实标题。
  • 结论:最佳实践是逻辑解耦。让 Agent 专注于它最擅长的事情(从杂乱网页中精准提取非结构化数据),而将确定性的逻辑判断(字符串比对)交还给传统的 Python 代码来做。

Q2: 为什么一定要强制模型输出 JSON 格式?之前用 | 这种自定义分隔符不行吗?
A: 因为自然语言的边界太模糊,且极易与真实数据冲突。
这是本次开发多智能体协作时遇到的最大痛点!

  • 表达欲失控:高级模型(比如 Llama-3.3 或 Qwen)即使在 Prompt 里千叮咛万嘱咐只输出数据,也极大概率会在结果前后加上“好的,最新文章是…”之类的废话,导致 split("|") 直接报错。
  • 特殊符号冲突:碰巧博客的标题里本身就带有 | 符号,正则匹配会产生严重的边界歧义。
  • JSON 的优势:JSON 拥有明确的边界标识(花括号和双引号)。配合 Python 正则 re.search(r"({.*})") 专门抓取大括号之间的内容,就能彻底免疫模型的“废话”。

Q3: 运行 Workflow 时碰到了 WorkflowValidationError: The following events are produced but never consumed 报错是怎么回事?
A: 这是 LlamaIndex 工作流引擎的一种“管道安全检查”机制。
它的意思是:check_step 生产了一个 NewBlogPostEvent,但工作流里没有任何 Step 声明要接收并处理这个事件,导致事件“无家可归”。

  • 排查方法:检查消费者函数(在这里是 notify_step)。
  • 解决方案:确保其参数严格加上了对应的类型注解,写成 async def notify_step(self, ev: NewBlogPostEvent)。这样引擎就能自动把上一步产生的数据路由到这里,完成闭环。

Q4: 调用 Groq 或硅基流动 API 时,频繁遇到 429 Too Many Requests 怎么办?
A: 这是因为多智能体循环请求过快,触发了并发限制。
在开发 ReAct Agent 时,模型需要反复执行“思考-行动-观察”的循环,短时间内会产生高密度的 API 请求。

  • 冷却机制:如果是演示项目,可以在每次检测循环之间或者 Step 之间加一个简单的 await asyncio.sleep(2) 进行冷却。
  • 精简上下文:尽量精简 Prompt 和工具返回的数据量,避免单次请求打满 TPM(Tokens Per Minute)限制。如果是生产环境,建议使用硅基流动的付费版以获取更高的并发额度。

Q5: 在生产环境中,我还应该使用 global global_last_title 这种全局变量来管理状态吗?
A: 非常不建议。
本教程使用全局变量仅为了降低代码阅读门槛和方便演示。

  • 状态污染:在实际生产环境中,Workflow 往往是高并发运行的(比如同时监控 10 个不同的 RSS 链接)。如果用全局变量,不同的任务会互相覆盖状态。
  • 正确做法:LlamaIndex Workflow 提供了内置的 Context 对象。应该在 @step 中通过 ctx.set("last_title", title)ctx.get("last_title") 来在不同的步骤之间安全地传递状态,或者引入外部数据库(如 Redis、本地 JSON 文件)进行持久化。
免责声明

本网站新闻资讯均来自公开渠道,力求准确但不保证绝对无误,内容观点仅代表作者本人,与本站无关。若涉及侵权,请联系我们处理。本站保留对声明的修改权,最终解释权归本站所有。

相关阅读

更多
欢迎回来 登录或注册后,可保存提示词和历史记录
登录后可同步收藏、历史记录和常用模板
注册即表示同意服务条款与隐私政策