OpenClaw Agent框架评测:用PI构建自定义Agent
揭秘OpenClaw背后的PI技术栈,手把手教你构建自定义AI Agent框架,解锁生产级智能体开发能力。
核心内容:
1. PI技术栈分层解析:从LLM通信到完整Agent运行时的逐层构建
2. 实战演示:如何组合各层功能打造代码库助手
3. OpenClaw生产环境适配经验与进阶开发指南
随着OpenClaw的火爆出圈,它背后那个叫pi-mono的技术栈,估计很快就会被更多人盯上。
PI是啥?简单说,它是一个用来构建AI Agent的工具包。本身是一个monorepo,里面塞了好几个互相叠加的包:
- • pi-ai 专门负责跟各家LLM打交道,跨提供商那种。
- • pi-agent-core 在pi-ai基础上,加了一个带工具调用能力的Agent循环。
- • pi-coding-agent 直接给你一个完整的编程Agent,内置工具、会话持久化、可扩展性,一应俱全。
- • pi-tui 专门用来构建漂亮的终端命令行界面。
没错,这些正是驱动OpenClaw的核心包。这篇指南会逐层拆解,一步步带你搭出一个功能齐全、带终端UI、能持久化会话、还能自定义工具的Agent。
理解了这些层是怎么拼在一起的,你就能按自己的方式,构建出生产级的Agent软件,而且不会被锁定在某个特定的抽象框架里。
Pi由@badlogicgames创建,是一个开源项目。
文章很长,建议先看目录,挑你感兴趣的:
- • 技术栈
- • 第1层:pi-ai
- • 第2层:pi-agent-core
- • 第3层:pi-coding-agent
- • 构建实用的东西 - 一个代码库助手
- • OpenClaw为生产环境的适配
- • 进阶指引
下面是正文。
技术栈
整个架构是分层的,每一层都往上堆新能力,而且隔离做得很好。
pi-ai - 用一个统一接口,就能调用几乎任何LLM。支持的列表很长:Anthropic、OpenAI、Google、Bedrock、Mistral、Groq、xAI、OpenRouter、Ollama,国内的朋友也能用MiniMax、智谱这些。流式传输、补全、工具定义、成本跟踪,全都给你包了。
pi-agent-core - 把pi-ai包进一个Agent循环里。你定义好工具,它自己就会去调LLM、执行工具、把结果喂回去,不断重复直到任务完成。
pi-coding-agent - 一个完整的Agent运行时。内置了文件操作、bash命令、JSONL会话持久化、上下文压缩,还有技能和扩展系统。
pi-tui - 支持差量渲染的终端UI库。可以漂亮地显示Markdown、带自动补全的多行编辑器、加载旋转动画,屏幕更新不会闪烁。
先决条件
- • Node.js 20+
- • 至少一个提供商的API密钥。国内的朋友可以用智谱的GLM-5或MiniMax M2.5之类的模型。
示例代码以及说明
本文所有代码都已经开源,以MiniMax M2.5作为后端LLM,只需要在.env里配置好对应的API Key就能跑起来。
或者在环境变量里直接设置你的API密钥:
export MINIMAX_API_KEY=sk-api-...
# or
export OPENAI_API_KEY=sk-...
第1层:pi-ai
第一次LLM调用
创建basics.ts:
import { getModel, completeSimple } from "@mariozechner/pi-ai";
async function main() {
const model = getModel("minimax", "MiniMax-M2.5");
const response = await completeSimple(model, {
systemPrompt: "You are a helpful assistant.",
messages: [
{ role: "user", content: "中国的首都在哪里?", timestamp: Date.now() }
],
});
// response is an AssistantMessage
for (const block of response.content) {
if (block.type === "text") {
console.log(block.text);
}
}
console.log(`Tokens: ${response.usage.totalTokens}`);
console.log(`Stop reason: ${response.stopReason}`);
}
main();
运行它:
npx tsx basics.ts
getModel会根据提供商和模型ID,从PI内置的2000多个模型目录里把模型找出来。completeSimple发送消息,等模型完成后返回一个完整的AssistantMessage。
响应里有个.content数组,装的是不同类型的块——可能是text(文本)、thinking(思考过程)或toolCall(工具调用)。另外还有.usage用来算token,.stopReason告诉你模型为什么停了(比如"stop"、"toolUse"、"length"、"error"、"aborted")。
流式传输 (Streaming)
completeSimple会一直等到完整响应才返回。如果想像打字机一样看到实时输出,那就用streamSimple:
import { getModel, streamSimple } from "@mariozechner/pi-ai";
async function main() {
const model = getModel("minimax", "MiniMax-M2.5");
const stream = streamSimple(model, {
systemPrompt: "You are a helpful assistant.",
messages: [
{ role: "user", content: "用三句话解释 TCP/IP 的握手机制", timestamp: Date.now() }
],
});
for await (const event of stream) {
switch (event.type) {
case "text_delta":
process.stdout.write(event.delta);
break;
case "done":
console.log(`\nTokens: ${event.message.usage.totalTokens}`);
break;
case "error":
console.error("Error:", event.error.errorMessage);
break;
}
}
}
main();
各家提供商都有自己的流格式——Anthropic、OpenAI、Google,各有各的玩法。但streamSimple把它们全归一化成一套统一的事件:start、text_start、text_delta、text_end、thinking_start/delta/end、toolcall_start/delta/end、done和error。
也就是说,你只需要写一次流处理逻辑,它就能跟任何提供商一起工作。大多数场景下,你主要关心的是text_delta(文本片段)和done(最终消息)。
当然,也可以直接等最终消息:
const stream = streamSimple(model, context);
const finalMessage = await stream.result(); // AssistantMessage
切换提供商
换提供商?改一行getModel调用就行,剩下的代码不用动。
// Just change this line - everything else stays the same
const model = getModel("anthropic", "claude-opus-4-5");
// const model = getModel("openai", "gpt-4o");
// const model = getModel("google", "gemini-2.5-pro");
// const model = getModel("groq", "llama-3.3-70b-versatile");
const stream = streamSimple(model, context);
每个提供商需要在环境里设好自己的API密钥(ANTHROPIC_API_KEY、OPENAI_API_KEY、GEMINI_API_KEY、MINIMAX_API_KEY这些)。
还能为自托管端点自定义模型:
import type { Model } from "@mariozechner/pi-ai";
const localModel: Model<"openai-completions"> = {
id: "llama-3.1-8b",
name: "llama-3.1-8b",
api: "openai-completions",
provider: "ollama",
baseUrl: "http://localhost:11434/v1",
reasoning: false,
input: ["text"],
cost: { input: 0, output: 0, cacheRead: 0, cacheWrite: 0 },
contextWindow: 128000,
maxTokens: 8192,
};
底层上,pi-ai用的是各家官方的SDK(OpenAI SDK、Anthropic SDK等等)。api字段决定了哪个SDK来处理请求——"openai-completions"就走OpenAI SDK的路由,这也是为什么它能跟任何兼容OpenAI的端点(比如Ollama、vLLM、Mistral)一起工作。
API密钥会根据提供商名称自动从环境变量里解析出来,然后传给SDK去处理认证。Ollama不需要认证,上面那个例子直接就能用。需要密钥的提供商,可以设环境变量,也可以直接传参:
const stream = streamSimple(localModel, context, {
apiKey: "your-api-key",
});
思考层级 (Thinking levels)
对于支持扩展思考的模型(比如Claude、o3、Gemini 2.5这些),PI可以通过reasoning选项来开启。默认是关的。
const stream = streamSimple(model, context, {
reasoning: "high", // "minimal" | "low" | "medium" | "high" | "xhigh"
});
开启后,流里除了text_delta,还会同时发出thinking_delta事件。
第2层:pi-agent-core
pi-ai让你能跟LLM对话。pi-agent-core则让LLM能够通过工具来回应你。
Agent类跑的是标准的Agent循环:给LLM发消息、执行它调用的任何工具、把结果反馈回去、重复这个过程直到模型停下来。
定义工具
工具的参数定义用的是TypeBox模式,这样能保证类型安全:
import { Type } from "@mariozechner/pi-ai";
import type { AgentTool } from "@mariozechner/pi-agent-core";
const weatherParams = Type.Object({
city: Type.String({ description: "City name" }),
});
const weatherTool: AgentTool = {
name: "get_weather",
label: "Weather",
description: "Get the current weather for a city",
parameters: weatherParams,
execute: async (toolCallId, params, signal, onUpdate) => {
// params is typed: { city: string }
const temp = Math.round(Math.random() * 30);
return {
content: [{ type: "text", text: `${params.city}: ${temp}C, partly cloudy` }],
details: { temp, city: params.city },
};
},
};
把模式定义成独立的变量,然后作为泛型参数传给AgentTool——这样TypeScript就能在execute内部正确地推断出params的类型。
每个工具都包含:
- • name - LLM用来调用它的标识符
- • label - 人类可读的显示名称
- • description - 告诉LLM什么时候用、怎么用这个工具
- • parameters - TypeBox模式;执行前会用AJV做验证
- • execute - LLM调用工具时运行的逻辑;返回
content(发回给LLM)和details(给你的UI用,不发给LLM)
onUpdate回调可以让你在执行期间流式传输部分结果——对运行时间较长的工具(比如bash命令)特别有用。
创建Agent
把上面的天气工具跟模型和流式传输函数连起来:
import { Agent } from "@mariozechner/pi-agent-core";
import { getModel, streamSimple } from "@mariozechner/pi-ai";
const model = getModel("minimax", "MiniMax-M2.5");
const agent = new Agent({
initialState: {
systemPrompt: "You are a helpful assistant with access to tools.",
model,
tools: [weatherTool],
thinkingLevel: "off",
},
streamFn: streamSimple,
});
Agent接收一个initialState(系统提示词、模型、工具、思考层级)和一个streamFn——这个函数才是真正去调LLM的。从pi-ai传入streamSimple,Agent就自动连上了模型指定的提供商。
事件流
订阅事件,就能实时看到Agent在干什么:
agent.subscribe((event) => {
switch (event.type) {
case "agent_start":
console.log("Agent started");
break;
case "message_update":
// Streaming text from the LLM
if (event.assistantMessageEvent.type === "text_delta") {
process.stdout.write(event.assistantMessageEvent.delta);
}
break;
case "tool_execution_start":
console.log(`\nTool: ${event.toolName}(${JSON.stringify(event.args)})`);
break;
case "tool_execution_end":
console.log(`Result: ${event.isError ? "ERROR" : "OK"}`);
break;
case "agent_end":
console.log("\nAgent finished");
break;
}
});
完整的事件列表:
agent_start
agent_end
turn_start
turn_end
message_start
message_update
message_end
tool_execution_start
tool_execution_update
tool_execution_end
运行Agent
await agent.prompt("What's the weather in Tokyo and London?");
就这一行。Agent会自己处理剩下的:
- 1. 把你的消息发给LLM
- 2. LLM决定为东京调用
get_weather - 3. Agent执行工具,把结果反馈回去
- 4. LLM再为伦敦调用
get_weather - 5. Agent再执行一次,再反馈
- 6. LLM最终生成文本回复
你不需要手动写任何循环。Agent全包了。
完整示例
下面是一个包含两个工具(list_files、read_file)的完整工作Agent:
import { Agent } from "@mariozechner/pi-agent-core";
import { getModel, streamSimple } from "@mariozechner/pi-ai";
import { Type } from "@mariozechner/pi-ai";
import type { AgentTool } from "@mariozechner/pi-agent-core";
import * as fs from "fs";
const readFileParams = Type.Object({
path: Type.String({ description: "Path to the file" }),
});
const readFileTool: AgentTool = {
name: "read_file",
label: "Read File",
description: "Read the contents of a file",
parameters: readFileParams,
execute: async (_id, params) => {
try {
const content = fs.readFileSync(params.path, "utf-8");
return {
content: [{ type: "text", text: content }],
details: {},
};
} catch (err: any) {
return {
content: [{ type: "text", text: `Error: ${err.message}` }],
details: {},
};
}
},
};
const listFilesParams = Type.Object({
path: Type.String({ description: "Directory path", default: "." }),
});
const listFilesTool: AgentTool = {
name: "list_files",
label: "List Files",
description: "List files in a directory",
parameters: listFilesParams,
execute: async (_id, params) => {
const files = fs.readdirSync(params.path);
return {
content: [{ type: "text", text: files.join("\n") }],
details: { count: files.length },
};
},
};
async function main() {
const model = getModel("minimax", "MiniMax-M2.5");
const agent = new Agent({
initialState: {
systemPrompt: "You can read files and list directories. Be concise.",
model,
tools: [readFileTool, listFilesTool],
thinkingLevel: "off",
},
streamFn: streamSimple,
});
agent.subscribe((event) => {
if (event.type === "message_update" && event.assistantMessageEvent.type === "text_delta") {
process.stdout.write(event.assistantMessageEvent.delta);
}
if (event.type === "tool_execution_start") {
console.log(`\n[${event.toolName}] ${JSON.stringify(event.args)}`);
}
});
await agent.prompt("What files are in the current directory? Read the package.json if it exists.");
console.log();
}
main();
引导 (Steering) 与跟进 (Follow-ups)
如果Agent正在跑,你想中途打断它、换个方向:
// Interrupt: delivered after the current tool finishes.
// Remaining pending tools are skipped.
agent.steer({
role: "user",
content: "Actually, skip that and read tsconfig.json instead.",
timestamp: Date.now(),
});
// Follow-up: queued for after the agent finishes naturally.
// Doesn't interrupt current work.
agent.followUp({
role: "user",
content: "Now summarize what you found.",
timestamp: Date.now(),
});
steer是中断——它会跳过剩余还没执行的工具,直接注入你的新消息。followUp是等待——它先把消息排好队,等Agent自然完成后才处理。OpenClaw就用引导来处理实时用户消息(比如Agent还在干活时,有人打字打断了),用跟进来做程序化的链式调用。
状态管理
Agent的配置随时可以改:
agent.setModel(getModel("openai", "gpt-4o")); // Switch providers mid-session
agent.setThinkingLevel("high"); // Enable extended thinking
agent.setSystemPrompt("New instructions."); // Update the system prompt
agent.setTools([...newTools]); // Swap the tool set
agent.replaceMessages(trimmedMessages); // Replace conversation history
这些更改会在下一轮对话中生效。
第3层:pi-coding-agent
pi-agent-core提供了循环。pi-coding-agent则直接给你一个生产就绪的Agent,内置工具、会话持久化、可扩展性,全都配好了。它建立在pi-agent-core之上——所以当你用pi-coding-agent时,底层就已经在用pi-agent-core了。
建议大多数用户从这里开始。只有当你需要构建一个不用内置编程工具或自定义会话系统的Agent时,才直接使用pi-agent-core。
内置工具
pi-coding-agent内置了7个工具。默认激活了4个(codingTools),另外3个也有,但默认是关闭的:
默认工具(激活):
| 工具 | 作用 |
read | 读取文件内容和图片(jpg、png、gif、webp)。图片会作为附件返回。文本输出会被截断到2000行或50KB。支持通过offset/limit分页读取大文件。 |
bash | 在工作目录执行shell命令。返回stdout和stderr,会被截断到最后2000行或50KB。支持设置timeout(超时时间,单位秒)。 |
edit | 替换文件中精确匹配的文本。oldText必须完全匹配(包括空格)。适合做精确的、外科手术式的修改。 |
write | 把内容写入文件。文件不存在就自动创建,已存在就直接覆盖。父级目录也会自动创建。 |
附加工具(可选):
| 工具 | 作用 |
grep | 用正则或字面量模式在文件内容里搜索。返回匹配的行、文件路径和行号。遵循.gitignore规则。底层用的是ripgrep。 |
find | 用glob模式查找文件。返回相对于搜索目录的匹配路径。也遵循.gitignore规则。 |
ls | 列出目录内容。条目按字母顺序排,目录后面带/后缀。会包含隐藏文件(dotfiles)。 |
这些工具被组织成了预设:
import { codingTools, readOnlyTools } from "@mariozechner/pi-coding-agent";
codingTools; // [read, bash, edit, write] - default
readOnlyTools; // [read, grep, find, ls] - exploration without modification
或者只挑几个你想要的:
import { allBuiltInTools } from "@mariozechner/pi-coding-agent";
// allBuiltInTools.read, allBuiltInTools.bash, allBuiltInTools.edit,
// allBuiltInTools.write, allBuiltInTools.grep, allBuiltInTools.find, allBuiltInTools.ls
const { session } = await createAgentSession({
model,
tools: [allBuiltInTools.read, allBuiltInTools.bash, allBuiltInTools.grep],
sessionManager: SessionManager.inMemory(),
});
createAgentSession
createAgentSession就是把所有东西——模型、工具、会话持久化、设置——全部串起来的关键函数:
import { createAgentSession, SessionManager } from "@mariozechner/pi-coding-agent";
import { getModel, streamSimple } from "@mariozechner/pi-ai";
async function main() {
const model = getModel("minimax", "MiniMax-M2.5");
const { session } = await createAgentSession({
model,
thinkingLevel: "off",
sessionManager: SessionManager.inMemory(),
});
session.agent.streamFn = streamSimple;
session.subscribe((event) => {
if (event.type === "message_update" && event.assistantMessageEvent.type === "text_delta") {
process.stdout.write(event.assistantMessageEvent.delta);
}
if (event.type === "tool_execution_start") {
console.log(`\n[${event.toolName}]`);
}
});
await session.prompt("What files are in the current directory? Summarize the package.json.");
console.log();
session.dispose();
}
main();
这就是一个可以工作的编程Agent。它能够读取你的文件、运行命令、编辑代码、写入新文件。SessionManager.inMemory()意味着会话存在内存里,进程一退出就消失了。
会话持久化
要想让会话持久存在,把SessionManager指向一个文件就行:
import * as path from "path";
const sessionFile = path.join(process.cwd(), ".sessions", "my-session.jsonl");
const sessionManager = SessionManager.open(sessionFile);
const { session } = await createAgentSession({
model,
sessionManager,
});
会话存储为JSONL文件,有树状结构——每个条目都有id和parentId。这意味着你可以做分支操作:跳转到对话中任何一个历史节点,从那里继续,而不会丢失任何历史记录。
SessionManager有几个静态工厂方法,根据你的需求选一个,传给createAgentSession就行:
// Option 1: In-memory (ephemeral, nothing written to disk)
const sessionManager = SessionManager.inMemory();
// Option 2: New persistent session in ~/.pi/agent/sessions/
const sessionManager = SessionManager.create(process.cwd());
// Option 3: Open a specific session file
const sessionManager = SessionManager.open("/path/to/session.jsonl");
// Option 4: Continue the most recent session (or create new if none exists)
const sessionManager = SessionManager.continueRecent(process.cwd());
// Then pass whichever one you chose:
const { session } = await createAgentSession({ model, sessionManager });
还可以列出目录里已有的会话:
const sessions = await SessionManager.list(process.cwd());
有了SessionManager之后,你通常不需要直接去调它的方法——createAgentSession会帮你处理大部分连接工作。不过,如果你正在构建自定义的会话逻辑(就像OpenClaw做多通道路由那样),这些是它的关键方法:
// Reconstruct the conversation from the JSONL file.
// Use this when you need to inspect or display the current conversation
// outside of the agent session (e.g., showing history in a web UI).
const { messages, thinkingLevel, model } = sessionManager.buildSessionContext();
// Get the last entry in the current branch.
// Useful for checking what the most recent message was,
// or grabbing an entry ID to branch from.
const leaf = sessionManager.getLeafEntry();
// Fork the conversation from a specific point.
// Everything after entryId is abandoned (but still in the file).
// The agent continues from that point on the next prompt.
// OpenClaw uses this for "retry from here" flows.
sessionManager.branch(entryId);
// Manually append a message to the session transcript.
// createAgentSession does this automatically during prompt(),
// but you'd use it to inject messages programmatically -
// e.g., adding a system notification or a cron-triggered prompt.
sessionManager.appendMessage(message);
// Get the full tree structure of the session.
// Each node has children, so you can render a branch selector
// or let users na vigate conversation history.
const tree = sessionManager.getTree();
OpenClaw每个频道线程使用一个会话文件:~/.openclaw/agents//sessions/。这样一来,每个对话都是独立的,而且防崩溃(JSONL是只追加的,就算崩溃了,最多也就丢一行)。
使用工具工厂
像codingTools和readOnlyTools这种预先构建好的工具数组,其实是单例,默认在进程当前运行的目录上操作。如果你需要让工具在特定目录上操作,就得用工厂函数:
import {
createCodingTools,
createReadOnlyTools,
createReadTool,
createBashTool,
createGrepTool,
} from "@mariozechner/pi-coding-agent";
// Create preset groups scoped to a workspace
const customCodingTools = createCodingTools("/path/to/workspace"); // [read, bash, edit, write]
const customReadOnlyTools = createReadOnlyTools("/path/to/workspace"); // [read, grep, find, ls]
// Or create individual tools - there's a factory for each built-in tool
const customRead = createReadTool("/path/to/workspace");
const customBash = createBashTool("/path/to/workspace");
const customGrep = createGrepTool("/path/to/workspace");
每个工厂还可以接受一个可选的operations对象,用来覆盖底层的I/O操作——如果你想在Docker容器里跑、通过SSH执行、或者针对虚拟文件系统操作,这就非常有用了:
// Read files from a remote server instead of the local disk
const remoteRead = createReadTool("/workspace", {
operations: {
readFile: async (path) => fetchFileFromRemote(path),
access: async (path) => checkRemoteFileExists(path),
},
});
// Execute commands in a Docker sandbox instead of the host
const sandboxedBash = createBashTool("/workspace", {
operations: {
exec: async (command, cwd, opts) => runInDockerContainer(command, cwd, opts),
},
});
OpenClaw就用了这些工厂,为每个Agent创建作用域在工作区内的工具,然后外面再包一层额外的中间件——比如权限检查、读取工具的图片规范化、以及Claude Code参数兼容性的别名处理(file_path → path,old_string → oldText)。
自定义工具与内置工具并存
内置工具覆盖了文件操作和shell命令。但如果你想做别的事情——部署、调用API、查数据库——那就自己定义工具,通过customTools传进去。它们会和默认工具一起工作:
import { Type } from "@mariozechner/pi-ai";
import type { AgentTool } from "@mariozechner/pi-agent-core";
const deployParams = Type.Object({
environment: Type.String({ description: "Target environment", default: "staging" }),
});
const deployTool: AgentTool = {
name: "deploy",
label: "Deploy",
description: "Deploy the application to production",
parameters: deployParams,
execute: async (_id, params, signal, onUpdate) => {
onUpdate?.({
content: [{ type: "text", text: `Deploying to ${params.environment}...` }],
details: {},
});
// 在这里添加自有逻辑- 比如调用API, 运行脚本或者触发一个CI事件等等
await new Promise((resolve) => setTimeout(resolve, 2000));
return {
content: [{ type: "text", text: `Deployed to ${params.environment} successfully.` }],
details: { environment: params.environment, timestamp: Date.now() },
};
},
};
const { session } = await createAgentSession({
model,
customTools: [deployTool],
sessionManager: SessionManager.inMemory(),
});
现在Agent手里就有read、write、edit、bash再加上deploy,一整套工具了。
压缩 (Compaction)
对话一长,很容易超出模型的上下文窗口。pi-coding-agent通过压缩来解决这个问题——把旧消息总结一下,同时保留最近的消息:
import { estimateTokens } from "@mariozechner/pi-coding-agent";
// Check how many tokens the conversation uses
const totalTokens = session.messages.reduce(
(sum, msg) => sum + estimateTokens(msg),
0
);
// Manually trigger compaction - the optional string guides what the summary should preserve
if (totalTokens > 100_000) {
await session.compact("Preserve all file paths and code changes.");
}
默认情况下,createAgentSession已经开启了自动压缩——当上下文快接近模型窗口限制时,它会自动触发。完整的消息历史依然保留在JSONL文件里;被压缩的只是内存中的上下文。
扩展 (Extensions)
工具让LLM去做事。扩展则让你修改Agent的行为方式——而且LLM完全不知道这回事。
扩展挂载在Agent循环期间触发的生命周期事件上:比如消息发送给LLM之前、压缩运行之前、工具被调用时、会话开始时。LLM完全看不到它上下文里有扩展的存在;它们全在幕后运作。
你可以在这里放各种逻辑:修剪旧的工具结果来保持上下文窗口聚焦、用自定义的总结管道替换默认的压缩、基于权限控制工具调用、或者根据对话的当前状态注入额外的上下文。
扩展就是一个TypeScript模块,导出一个接收ExtensionAPI的函数:
import type { ExtensionAPI } from "@mariozechner/pi-coding-agent";
export default function myExtension(api: ExtensionAPI): void {
// Fires before every LLM call. Lets you rewrite the message array.
api.on("context", (event, ctx) => {
const pruned = event.messages.filter((msg) => {
// Drop large tool results older than 10 messages
if (msg.role === "toolResult" && event.messages.indexOf(msg) < event.messages.length - 10) {
const text = msg.content.map((c) => (c.type === "text" ? c.text : "")).join("");
if (text.length > 5000) return false;
}
return true;
});
return { messages: pruned };
});
// Replace the default compaction with your own summarization logic
api.on("session_before_compact", async (event, ctx) => {
const summary = await myCustomSummarize(event.messages);
return { compaction: { summary, firstKeptEntryId: event.firstKeptEntryId, tokensBefore: event.tokensBefore } };
});
// Register a user-facing command (not an LLM tool)
api.registerCommand("stats", {
description: "Show session statistics",
handler: async (_args, ctx) => {
const stats = ctx.session.getSessionStats();
console.log(`Messages: ${stats.totalMessages}, Cost: $${stats.cost.toFixed(4)}`);
},
});
}
关键的扩展事件包括context(在LLM看到之前重写消息)、session_before_compact(自定义总结)、tool_call(拦截或控制工具调用)、before_agent_start(注入上下文或修改提示词)、以及session_start/session_switch(响应会话更改)。
OpenClaw就用了扩展来做上下文修剪(静默地修剪过大的工具结果来节省token)和压缩保护(用一个多阶段的管道替换pi默认的总结,这个管道能保留文件操作的历史和工具失败的数据)。
构建实用的东西
下面是一个把三层全部结合在一起的完整示例:一个代码库助手。它可以读取你的项目、回答问题、进行更改、跨重启记住对话。
创建assistant.ts:
import {
createAgentSession,
SessionManager,
estimateTokens,
} from "@mariozechner/pi-coding-agent";
import { getModel, streamSimple } from "@mariozechner/pi-ai";
import { Type } from "@mariozechner/pi-ai";
import type { AgentTool } from "@mariozechner/pi-agent-core";
import * as path from "path";
import * as fs from "fs";
import * as readline from "readline";
// --- Custom tool: search the web ---
const webSearchParams = Type.Object({
query: Type.String({ description: "Search query" }),
});
const webSearchTool: AgentTool = {
name: "web_search",
label: "Web Search",
description: "Search the web for documentation, error messages, or general information",
parameters: webSearchParams,
execute: async (_id, params) => {
// 在实际场景中, 调用真正的搜救API (Bra ve, Serper, etc.)
return {
content: [{ type: "text", text: `[Search results for: "${params.query}" would appear here]` }],
details: { query: params.query },
};
},
};
// --- Session persistence ---
const sessionDir = path.join(process.cwd(), ".sessions");
fs.mkdirSync(sessionDir, { recursive: true });
const sessionFile = path.join(sessionDir, "assistant.jsonl");
const sessionManager = SessionManager.open(sessionFile);
// --- Create the agent session ---
async function createAssistant() {
const model = getModel("minimax", "MiniMax-M2.5");
const { session } = await createAgentSession({
model,
thinkingLevel: "off",
sessionManager,
customTools: [webSearchTool],
});
session.agent.streamFn = streamSimple;
return session;
}
// --- Event handler ---
function attachEventHandlers(session: Awaited>) {
session.subscribe((event) => {
switch (event.type) {
case "message_update":
if (event.assistantMessageEvent.type === "text_delta") {
process.stdout.write(event.assistantMessageEvent.delta);
}
break;
case "tool_execution_start":
console.log(`\n [${event.toolName}] ${summarizeArgs(event.args)}`);
break;
case "tool_execution_end":
if (event.isError) {
console.log(` ERROR`);
}
break;
case "auto_compaction_start":
console.log("\n [compacting context...]");
break;
case "agent_end":
console.log();
break;
}
});
}
function summarizeArgs(args: any): string {
if (args?.path) return args.path;
if (args?.command) return args.command.slice(0, 60);
if (args?.query) return `"${args.query}"`;
if (args?.pattern) return args.pattern;
return JSON.stringify(args).slice(0, 60);
}
// --- REPL ---
async function main() {
const session = await createAssistant();
attachEventHandlers(session);
const tokenCount = session.messages.reduce((sum, msg) => sum + estimateTokens(msg), 0);
console.log("PI Assistant");
console.log(` Model: ${session.model?.id}`);
console.log(` Session: ${sessionFile}`);
console.log(` History: ${session.messages.length} messages, ~${tokenCount} tokens`);
console.log(` Tools: ${session.getActiveToolNames().join(", ")}`);
console.log(` Type "exit" to quit, "new" to reset session\n`);
const rl = readline.createInterface({ input: process.stdin, output: process.stdout });
const ask = () => {
rl.question("You: ", async (input) => {
const trimmed = input.trim();
if (trimmed === "exit") {
session.dispose();
rl.close();
return;
}
if (trimmed === "new") {
await session.newSession();
console.log("Session reset.\n");
ask();
return;
}
if (!trimmed) {
ask();
return;
}
try {
await session.prompt(trimmed);
} catch (err: any) {
console.error(`Error: ${err.message}`);
}
ask();
});
};
ask();
}
main();
运行它:
npx tsx assistant.ts
上面这份代码,大概120行,就实现了一个持久化的编程助手。它能读文件、跑命令、改代码、搜网络,而且跨重启还能记住你的对话。JSONL文件里的会话树,就算经过了压缩,也保留了完整的历史记录。
一个会话看起来像这样:
OpenClaw为生产环境的适配
OpenClaw采用了完全相同的模式,只是为生产环境加了更多层级:
多提供商认证
OpenClaw可不是只靠一个ANTHROPIC_API_KEY或MINIMAX_API_KEY走天下。它用AuthStorage和ModelRegistry来管理跨多个提供商的凭证,还能支持OAuth流程:
import { AuthStorage, ModelRegistry } from "@mariozechner/pi-coding-agent";
const authStorage = AuthStorage.create(path.join(agentDir, "auth.json"));
const modelRegistry = new ModelRegistry(authStorage, modelsConfigPath);
const { session } = await createAgentSession({
authStorage,
modelRegistry,
model: modelRegistry.find("ollama", "llama3.1:8b"),
// ...
});
AuthStorage从auth.json文件里读数据——就是一个以提供商名称为键的扁平对象,每个值要么是API密钥,要么是OAuth凭证:
{
"anthropic": {"type":"api_key","key":"sk-ant-..."},
"openai": {"type":"api_key","key":"sk-..."},
"minimax": {"type":"api_key","key":"sk-api-..."},
"devin": {"type":"api_key","key":"cog_..."},
"github-copilot": {
"type":"oauth",
"refresh":"gho_xxxxxxxxxxxx",
"access":"ghu_yyyyyyyyyyyy",
"expires":1700000000000
}
}
key字段可以是个字面值、环境变量名、或者以!开头的shell命令(比如 "!op read 'op://vault/openai/key'" 用来调1Password)。OAuth令牌过期了会自动刷新。
ModelRegistry读取models.json文件,里面定义了自定义的提供商和模型。想添加自托管模型,或者pi本来不支持的提供商,就靠这个文件了:
{
"providers": {
"ollama": {
"baseUrl": "http://localhost:11434/v1",
"api": "openai-completions",
"apiKey": "ollama",
"models": [
{"id": "llama3.1:8b"},
{"id": "qwen2.5-coder:7b"}
]
},
"my-company-api": {
"baseUrl": "https://llm.internal.company.com/v1",
"api": "openai-completions",
"apiKey": "COMPANY_LLM_KEY",
"authHeader": true,
"models": [
{"id": "internal-model-v2"}
]
}
}
}
这里定义的模型,会跟内置的目录一起显示出来。modelRegistry.find("ollama", "llama3.1:8b")返回一个完整类型化的Model,可以直接传给createAgentSession。
流中间件 (Stream middleware)
session.agent.streamFn是Agent需要跟LLM对话时调用的函数。默认是streamSimple,但你可以把它包一层,用来注入标头、调整参数、或者按提供商添加日志。
OpenClaw就用它来添加OpenRouter的归属标头,以及启用Anthropic的提示缓存:
import { streamSimple } from "@mariozechner/pi-ai";
import type { StreamFn } from "@mariozechner/pi-agent-core";
const wrappedStreamFn: StreamFn = (model, context, options) => {
const extraHeaders: Record = {};
// OpenRouter uses these for their public app rankings/leaderboard
if (model.provider === "openrouter") {
extraHeaders["X-Title"] = "My App";
extraHeaders["HTTP-Referer"] = "https://myapp.com";
}
return streamSimple(model, context, {
...options,
headers: { ...options?.headers, ...extraHeaders },
cacheRetention: model.provider === "anthropic" ? "long" : "none",
});
};
session.agent.streamFn = wrappedStreamFn;
工具定制
默认的内置工具在process.cwd()上操作,对于本地CLI来说没问题。但在OpenClaw这种多用户产品里,每个Agent会话需要锁死在特定的工作区目录里,免得用户读到或写到自己的项目之外。OpenClaw用工具工厂,通过工作区根目录重建文件工具,保持同样的工具行为,但把所有路径的作用域都限制住:
import {
codingTools,
readTool,
createReadTool,
createWriteTool,
createEditTool,
} from "@mariozechner/pi-coding-agent";
import type { AgentTool } from "@mariozechner/pi-agent-core";
function buildTools(workspace: string): AgentTool[] {
return (codingTools as AgentTool[]).map((tool) => {
if (tool.name === readTool.name) {
return createReadTool(workspace);
}
if (tool.name === "write") {
return createWriteTool(workspace);
}
if (tool.name === "edit") {
return createEditTool(workspace);
}
return tool; // bash stays as-is
});
}
事件路由
Agent运行时,会不断发出事件——文本token流入、工具调用开始和结束、Agent完成它的轮次。在终端应用里,你只需要把这些直接打印到stdout。
但OpenClaw代编用户通过Telegram、Discord或Slack来聊天,所以它需要把这些事件转成特定平台的消息。session.subscribe()为每个事件提供了回调,至于每个事件怎么处理,完全由你决定:
session.subscribe((event) => {
switch (event.type) {
case "message_update":
if (event.assistantMessageEvent.type === "text_delta") {
// Tokens arrive one at a time - buffer them, then send as one message
messageBuffer.append(event.assistantMessageEvent.delta);
}
break;
case "tool_execution_start":
// Send tool call notification to the channel
channel.sendNotification(`Running ${event.toolName}...`);
break;
case "agent_end":
// Flush remaining buffered text
messageBuffer.flush();
break;
}
});
添加终端UI (TUI)
assistant.ts示例用了readline做输入——能用,但没法渲染Markdown,没有自动补全,流式传输也用的是原始的process.stdout.write。pi-tui把这些全替换成了一个像样的终端UI:带语法高亮的Markdown、带斜杠命令和文件路径自动补全的编辑器、加载旋转动画、以及不闪烁的差量渲染。
下面是用pi-tui升级后的同一个助手。创建assistant-tui.ts:
import {
createAgentSession,
SessionManager,
estimateTokens,
} from "@mariozechner/pi-coding-agent";
import { getModel, streamSimple } from "@mariozechner/pi-ai";
import { Type } from "@mariozechner/pi-ai";
import type { AgentTool } from "@mariozechner/pi-agent-core";
import {
TUI,
ProcessTerminal,
Editor,
Markdown,
Text,
Loader,
CombinedAutocompleteProvider,
} from "@mariozechner/pi-tui";
import type { EditorTheme, MarkdownTheme } from "@mariozechner/pi-tui";
import chalk from "chalk";
import * as path from "path";
import * as fs from "fs";
// --- Themes ---
const markdownTheme: MarkdownTheme = {
heading: (s) => chalk.bold.cyan(s),
link: (s) => chalk.blue(s),
linkUrl: (s) => chalk.dim(s),
code: (s) => chalk.yellow(s),
codeBlock: (s) => chalk.green(s),
codeBlockBorder: (s) => chalk.dim(s),
quote: (s) => chalk.italic(s),
quoteBorder: (s) => chalk.dim(s),
hr: (s) => chalk.dim(s),
listBullet: (s) => chalk.cyan(s),
bold: (s) => chalk.bold(s),
italic: (s) => chalk.italic(s),
strikethrough: (s) => chalk.strikethrough(s),
underline: (s) => chalk.underline(s),
};
const editorTheme: EditorTheme = {
borderColor: (s) => chalk.dim(s),
selectList: {
selectedPrefix: (s) => chalk.blue(s),
selectedText: (s) => chalk.bold(s),
description: (s) => chalk.dim(s),
scrollInfo: (s) => chalk.dim(s),
noMatch: (s) => chalk.dim(s),
},
};
// --- Custom tool ---
const webSearchParams = Type.Object({
query: Type.String({ description: "Search query" }),
});
const webSearchTool: AgentTool = {
name: "web_search",
label: "Web Search",
description: "Search the web for documentation, error messages, or general information",
parameters: webSearchParams,
execute: async (_id, params) => ({
content: [{ type: "text", text: `[Search results for: "${params.query}" would appear here]` }],
details: { query: params.query },
}),
};
// --- Session persistence ---
const sessionDir = path.join(process.cwd(), ".sessions");
fs.mkdirSync(sessionDir, { recursive: true });
const sessionFile = path.join(sessionDir, "assistant.jsonl");
// --- TUI setup ---
const tui = new TUI(new ProcessTerminal());
tui.addChild(new Text(chalk.bold("PI Assistant") + chalk.dim(" (Ctrl+C to exit)\n")));
const editor = new Editor(tui, editorTheme);
editor.setAutocompleteProvider(
new CombinedAutocompleteProvider(
[
{ name: "new", description: "Reset the session" },
{ name: "exit", description: "Quit the assistant" },
],
process.cwd(),
),
);
tui.addChild(editor);
tui.setFocus(editor);
// --- Main ---
async function main() {
const model = getModel("minimax", "MiniMax-M2.5");
const sessionManager = SessionManager.open(sessionFile);
const { session } = await createAgentSession({
model,
thinkingLevel: "off",
sessionManager,
customTools: [webSearchTool],
});
session.agent.streamFn = streamSimple;
// Show session info
const tokenCount = session.messages.reduce((sum, msg) => sum + estimateTokens(msg), 0);
const children = tui.children;
children.splice(children.length - 1, 0, new Text(
chalk.dim(` Model: ${model.id}\n`) +
chalk.dim(` Session: ${sessionFile}\n`) +
chalk.dim(` History: ${session.messages.length} messages, ~${tokenCount} tokens\n`) +
chalk.dim(` Tools: ${session.getActiveToolNames().join(", ")}\n`),
));
tui.requestRender();
// Streaming state
let streamingMarkdown: Markdown | null = null;
let streamingText = "";
let loader: Loader | null = null;
let isRunning = false;
// Subscribe to agent events
session.subscribe((event) => {
switch (event.type) {
case "agent_start":
isRunning = true;
editor.disableSubmit = true;
loader = new Loader(tui, (s) => chalk.cyan(s), (s) => chalk.dim(s), "Thinking...");
children.splice(children.length - 1, 0, loader);
tui.requestRender();
break;
case "message_update":
if (event.assistantMessageEvent.type === "text_delta") {
// Remove loader on first text
if (loader) {
tui.removeChild(loader);
loader = null;
}
// Create or update the streaming markdown component
streamingText += event.assistantMessageEvent.delta;
if (!streamingMarkdown) {
streamingMarkdown = new Markdown(streamingText, 1, 0, markdownTheme);
children.splice(children.length - 1, 0, streamingMarkdown);
} else {
streamingMarkdown.setText(streamingText);
}
tui.requestRender();
}
break;
case "tool_execution_start": {
if (loader) {
tui.removeChild(loader);
loader = null;
}
const args = event.args?.path || event.args?.command?.slice(0, 60) || event.args?.query || "";
const toolMsg = new Text(chalk.dim(` [${event.toolName}] ${args}`));
children.splice(children.length - 1, 0, toolMsg);
tui.requestRender();
break;
}
case "agent_end":
if (loader) {
tui.removeChild(loader);
loader = null;
}
streamingMarkdown = null;
streamingText = "";
isRunning = false;
editor.disableSubmit = false;
tui.requestRender();
break;
}
});
// Handle input submission
editor.onSubmit = async (value: string) => {
if (isRunning) return;
const trimmed = value.trim();
if (!trimmed) return;
if (trimmed === "/exit") {
session.dispose();
tui.stop();
process.exit(0);
}
if (trimmed === "/new") {
await session.newSession();
children.splice(2, children.length - 3); // Keep header, info, and editor
children.splice(children.length - 1, 0, new Text(chalk.dim(" Session reset.\n")));
tui.requestRender();
return;
}
// Add user message to chat
const userMsg = new Markdown(value, 1, 0, markdownTheme, (s) => chalk.bold(s));
children.splice(children.length - 1, 0, userMsg);
tui.requestRender();
// Send to agent
try {
await session.prompt(trimmed);
} catch (err: any) {
children.splice(children.length - 1, 0, new Text(chalk.red(`Error: ${err.message}`)));
editor.disableSubmit = false;
tui.requestRender();
}
};
tui.start();
}
main();
运行它:
npx tsx assistant-tui.ts
跟readline版本相比,主要区别在于:
- • Markdown渲染。Agent的回复不再是丢到stdout的纯文本,而是渲染成带语法高亮的代码块、粗体、斜体、列表和链接。
- • 通过
setText流式传输。token不断到达时,我们追加到字符串,然后调用streamingMarkdown.setText()。TUI的差量渲染器只更新变化了的行——不闪烁,不清屏。 - • 带自动补全的编辑器。输入
/就会弹出斜杠命令下拉列表。按键可以补全文件路径。按 Shift+Enter可以输入多行。 - • 加载旋转动画。
Loader组件在Agent思考时显示一个旋转动画,文本一开始流式传输,它就自动消失。 - • 无需手动管理光标。TUI自己处理终端状态、光标定位和清理。事件处理器里再也不会有七零八落的
process.stdout.write调用了。
架构还是一样的——createAgentSession + session.subscribe() + session.prompt()。唯一的变化在于事件如何渲染:你不是往stdout写内容,而是在TUI的组件树里添加和更新Markdown、Text、Loader组件。
进阶指引
这篇指南涵盖了构建基于终端的Agent所需要的四个包。
pi-mono里剩下的包,把系统往其他方向扩展:
- • pi-web-ui - 面向浏览器聊天界面的Lit Web组件。提供了即用型
ChatPanel组件,支持流式传输、文件附件和产物渲染(沙箱iframe里的HTML/SVG/Markdown)。 - • pi-mom - 一个把消息委托给pi-coding-agent的Slack机器人。支持每个频道独立的Agent隔离、Docker沙箱、定时事件和自管理工具安装。
- • pi-pods - 用于通过vLLM在GPU pod上部署开源模型的CLI。支持DataCrunch、RunPod、Vast.ai以及裸机。每个部署的模型都会暴露一个OpenAI兼容的端点,pi-ai可以直接用。
pi-coding-agent文档里涵盖了完整的扩展API、技能系统和CLI用法。pi-mono的AGENTS.md里有添加新LLM提供商的详细说明。
希望通过这样的框架,结合你自己领域的专业知识,也能创建出属于你的龙虾。