OpenClaw可靠消息投递源码深度解析
1. 知识回顾
上一讲我们深入剖析了 OpenClaw 中 heartbeat 和 cron 两种异步任务调度机制,其核心特性如下:
- 异步任务在后台持续执行,不阻塞主逻辑。
- 用户触发提问时,系统会立即切换优先级,暂停 heartbeat 等后台任务。
- cron 任务以低优先级运行,对主流程无感知影响。
本讲聚焦 OpenClaw 最棘手的工程难题:当网络闪断或进程异常终止时,正在回复的消息如何确保不丢失?常见的做法是直接丢弃,但用户体验无法接受。OpenClaw 的解决方案是引入可靠消息投递机制,彻底杜绝消息丢失的风险。
2. 总体设计思路
OpenClaw的设计哲学极其简洁:消息必须落盘,才能发送。
- 消息必须先持久化到本地磁盘,即使断电或进程崩溃,数据仍可从存储层恢复。
- 落盘后,由后台线程异步尝试发送。发送失败则自动重试,直至成功或达到重试次数上限。
这种“先写后发”的模式,是整个可靠性体系的基石。
实现细节
3.1 消息分片
大模型生成的回复通常较长,但不同平台对单条消息长度有严格限制。因此第一步根据平台规则切分消息:
- 如果回复长度在平台限制内,则不分片,直接投递。
- 超出限制时,按平台阈值切割为多个片段,按顺序依次投递。
3.2 生成任务对象并持久化入队
分片完成后,可靠性保障正式启动。具体流程如下:
- 系统生成唯一的
delivery_id(取UUID前12位,冲突概率极低)。 - 构造
QueuedDelivery任务对象,包含通道、接收方、消息内容、入队时间、下次重试时间等关键字段。 - 调用
_write_entry()将任务对象原子写入磁盘。
下面这段核心代码演示了原子写入的完整流程:通过临时文件 + 原子替换,确保写入过程中即使断电或崩溃,文件也不会损坏。
复制代码def _write_entry(self, entry: QueuedDelivery) -> None:
"""通过 tmp + os.replace() 保证原子写入,避免数据损坏"""
final_path = self.queue_dir / f"{entry.id}.json"
tmp_path = self.queue_dir / f".tmp.{os.getpid()}.{entry.id}.json" data = json.dumps(entry.to_dict(), indent=2, ensure_ascii=False)
with open(tmp_path, "w", encoding="utf-8") as f:
f.write(data)
f.flush()
os.fsync(f.fileno()) # 强制落盘
os.replace(str(tmp_path), str(final_path)) # 原子替换
消息写入磁盘后,即使进程崩溃,数据依然安全。这才是杜绝丢失的根本手段。
复制代码delivery_id = uuid.uuid4().hex[:12]
entry = QueuedDelivery(
id=delivery_id,
channel=channel,
to=to,
text=text,
enqueued_at=time.time(),
next_retry_at=0.0, # 立即重试
)
self._write_entry(entry) # 落盘后才返回
return delivery_id # 返回 ID 给上层
3.3 后台线程扫描与重试
系统启动时,后台线程主动扫描 pending(待发送)和 failed(失败)队列,确保无残留任务。即使服务重启,所有未完成的任务也能自动恢复。
复制代码pending = self.queue.load_pending()
failed = self.queue.load_failed()# 打印恢复状态
if pending:
print_delivery(f"Recovery: {len(pending)} pending")
if failed:
print_delivery(f"Recovery: {len(failed)} failed")
启动后,后台线程轮询 pending 队列,逐个尝试投递。
复制代码for entry in pending:
# 1. 检查是否到达下一次重试时间
if entry.next_retry_at > now:
continue # 2. 尝试投递
try:
self.deliver_fn(entry.channel, entry.to, entry.text)
self.queue.ack(entry.id) # 投递成功,确认删除
self.total_succeeded += 1
except Exception as exc:
# 3. 投递失败,记录失败并更新重试次数
self.queue.fail(entry.id, str(exc))
self.total_failed += 1 # 4. 判断是否超过最大重试次数(MAX_RETRIES)
if entry.retry_count + 1 >= MAX_RETRIES:
print_warn(f"Delivery {entry.id} -> failed (最终失败)")
else:
# 进入指数退避等待
backoff = compute_backoff_ms(entry.retry_count + 1)
print_warn(f"next retry in {backoff / 1000:.0f}s")
这里有一个关键设计:为防止某条消息持续失败而独占重试资源,OpenClaw 采用带随机抖动的指数退避算法。具体来说——首次失败后等待5秒,第二次25秒,第三次2分钟,第四次10分钟。每次等待时间还会叠加±20%的随机抖动,避免大量消息在同一时刻重试引发网络拥塞。
复制代码# 基础退避时间序列(毫秒)
BACKOFF_MS = [5_000, 25_000, 120_000, 600_000] # 5s, 25s, 2min, 10mindef compute_backoff_ms(retry_count: int) -> int:
if retry_count <= 0:
return 0 # 根据重试次数选择对应的基础退避档位(超出则取最大)
idx = min(retry_count - 1, len(BACKOFF_MS) - 1)
base = BACKOFF_MS[idx] # 加入随机抖动(±20%),避免大量消息同时重试造成网络风暴
jitter = random.randint(-base // 5, base // 5)
return max(0, base + jitter)
4. 知识总结
| 模块 | 作用 |
|---|---|
| 消息分片 | 适配各平台消息长度限制,规避投递失败 |
| 原子写入 | 采用 tmp+ os.replace 模式,防止崩溃致文件损坏 |
| 持久化队列 | 内存队列配合磁盘存储,重启后任务完整复原 |
| 指数退避重试 | 失败后等待时间指数递增(含随机抖动),保护服务端免受过载 |
| 后台线程扫描 | 服务启动时自动恢复未完成任务,保障高可用 |
