Llamaindex Workflow:事件驱动架构重塑复杂LLM应用构建
前文解读了Langchain于8月1日发布的Langgraph Studio,聚焦复杂Agent应用构建的调试瓶颈。同日,LLM应用开发领域的另一核心玩家Llamaindex也亮出了自己的方案:workflow,志在进一步强化应用编排能力。
Llamaindex去年就已有所布局,推出了Query Pipeline,采用声明式设计,允许用户将整个查询流程定义为DAG(有向无环图)。对常规RAG类流程而言,DAG足够简洁直观,这也是多数LLM workflow的标准选择。然而,一旦涉及Agent流程,标准DAG的架构缺陷便暴露无遗:它无法处理循环逻辑——即有环。而经典的ReAct流程恰恰是一个循环迭代的过程。
Llamaindex官方还列举了其他痛点:
1) Debug过程异常棘手;
2) 组件与模块的执行逻辑变得难以追溯;
3) Pipeline执行器的实现日趋臃肿,必须应对大量边界条件;
4) 复杂Pipeline的可读性严重衰退。
一旦在Query Pipeline中加入环,围绕图结构展开的各种用户体验问题就会被放大。具体而言:
1) 大量核心编排逻辑(如if-else分支和while循环)被迫塞入图的边上,导致边的定义冗长且难以维护。
2) 处理可选值与默认值的边界场景颇为棘手——作为框架,难以预判上游节点是否会传递某个参数。
3) 对构建Agent的开发者来说,用有环图定义流程并不自然。Agent本质上是LLM驱动的通用实体,接收观察并生成响应,而图的形式强制“Agent”节点明确定义输入和输出边,迫使用户编写与其他节点间冗长的通信模式。
这些瓶颈迫使Llamaindex团队重新审视架构设计的合理性。实际上,更早之前行业内就有人反思过:以DAG图为基础设计编排执行器,虽然直觉上很顺畅,但并非最优解。原因有两个:
第一,开发者需要从宏观上解析图中边与节点的关系,逻辑复杂度极高。尤其是复杂流程节点的处理及失败恢复,涉及大量的状态管理,导致图本身变得臃肿,编排器的实现也随之复杂化。
第二,这种做法违反了依赖倒置原则。选择应用编排的初衷是让图上的组件可复用、可插拔,组件不应关心自己所处的图结构——毕竟先有组件,才有具体的业务Pipeline。但上述方式迫使组件节点适配图的结构,既不利于组件沉淀,也增加了组件的开发难度。
于是Llamaindex另辟蹊径:采用事件驱动的模式来协调组件流程执行。简单来说,就是将图流程的调度转变为组件如何订阅并处理事件。这样一来,原本堆积在边上的处理逻辑内化为组件自身的行为,复杂度与依赖关系大幅降低。这种设计还能自然地支持重试、失败、超时、循环乃至human-in-loop等原本需要硬啃图结构的复杂逻辑。对于Pipeline执行器而言,只需做好消息分发与Context维护,再根据订阅情况唤醒相关组件即可,实现复杂度也显著简化。
下面展示Llamaindex的workflow具体写法:
from llama_index.core.workflow import (
StartEvent,
StopEvent,
Workflow,
step,
)
from llama_index.llms.openai import OpenAI
class OpenAIGenerator(Workflow):
@step()
async def generate(self, ev: StartEvent) -> StopEvent:
query = ev.get("query")
llm = OpenAI()
response = await llm.acomplete(query)
return StopEvent(result=str(response))
w = OpenAIGenerator(timeout=10, verbose=False)
result = await w.run(query="What's LlamaIndex?")
print(result)
上述示例定义了一个workflow类OpenAIGenerator,其中generate函数通过@step装饰器标记为一个workflow步骤。方法签名指定了它接收何种事件消息,返回值则定义该步骤执行后发布的消息类型。
Llamaindex同时给出了利用这种方式实现循环逻辑的示例:
class ExtractionDone(Event):
output: str
passage: str
class ValidationErrorEvent(Event):
error: str
wrong_output: str
passage: str
class ReflectionWorkflow(Workflow):
@step()
async def extract(
self, ev: StartEvent | ValidationErrorEvent
) -> StopEvent | ExtractionDone:
if isinstance(ev, StartEvent):
passage = ev.get("passage")
if not passage:
return StopEvent(result="Please provide some text in input")
reflection_prompt = ""
elif isinstance(ev, ValidationErrorEvent):
passage = ev.passage
reflection_prompt = REFLECTION_PROMPT.format(
wrong_answer=ev.wrong_output, error=ev.error
)
llm = Ollama(model="llama3", request_timeout=30)
prompt = EXTRACTION_PROMPT.format(
passage=passage, schema=CarCollection.schema_json()
)
if reflection_prompt:
prompt += reflection_prompt
output = await llm.acomplete(prompt)
return ExtractionDone(output=str(output), passage=passage)
@step()
async def validate(
self, ev: ExtractionDone
) -> StopEvent | ValidationErrorEvent:
try:
json.loads(ev.output)
except Exception as e:
print("Validation failed, retrying...")
return ValidationErrorEvent(
error=str(e), wrong_output=ev.output, passage=ev.passage
)
return StopEvent(result=ev.output)
w = ReflectionWorkflow(timeout=60, verbose=True)
result = await w.run(
passage="There are two cars a vailable: a Fiat Panda with 45Hp and a Honda Civic with 330Hp."
)
print(result)
在这个例子中,validate步骤接收实验性模式提取的结果作为事件,然后通过返回ValidationErrorEvent决定是否重试。该事件最终会被传递到extract步骤,后者再执行下一次尝试。这样就自然实现了循环迭代逻辑。
当然,仅靠阅读代码来理解复杂业务流程仍然吃力。Llamaindex也提供了类似LangGraph Studio的可视化能力,能够对执行流程进行可视化呈现,便于调试。
可见,Llamaindex在应对复杂LLM应用时,选择了与Langchain相近的策略:高代码配合可视化辅助调试。其中事件驱动的流程编排算是一个独特的设计。但值得思考的是,事件驱动本身完全可以与声明式、低代码的Pipeline开发融合。用户可以用直观的拖拽来编排整个流程,而底层的编排器实现则采用事件驱动方式,而非解析图的方式——这样岂不更优?甚至可以同时提供编程模式与低代码可视化两种模式,两者还能互操作,覆盖更广泛的开发者背景。这或许才是未来应用编排更成熟的形态。