TimechoAI上手指南:时序数据预测分析全攻略
时序分析项目里,真正消耗精力的往往不是绘制曲线,而是曲线背后那些更棘手的问题:未来几个时序点的走向如何判断,异常是否才刚冒头,换一组设备数据后是否需要从头再搭一套模型。TimechoAI 正好切入这个场景。官方产品页将其定位为“基于 Timer 系列时序大模型”,提供自然语言问答、趋势预测、异常检测,以及 Web 界面、REST API、Python SDK 等多种接入方式。
先构造一份能跑通流程的样本数据
直接对接真实设备数据是最终目标,但上手阶段建议先用一段包含周期波动与随机噪声的样例数据,把“数据清洗、模型预测、结果绘图、输出保存”这几个环节完整跑一遍。
import numpy as np
import pandas as pd
rng = np.random.default_rng(42)
time_index = pd.date_range("2026-06-01 00:00:00", periods=7 * 24 * 4, freq="15min")
base = 55 + 8 * np.sin(np.arange(len(time_index)) / 12)
noise = rng.normal(0, 1.8, len(time_index))
pressure = base + noise
# 模拟一小段异常抬升
pressure[420:445] += np.linspace(0, 14, 25)
df = pd.DataFrame({
"time": time_index,
"pressure": pressure.round(3),
"motor_current": (12 + pressure / 18 + rng.normal(0, 0.3, len(time_index))).round(3),
"valve_opening": (45 + 12 * np.sin(np.arange(len(time_index)) / 30)).round(3),
})
df.to_csv("pump_pressure.csv", index=False)
print(df.head())
这份 CSV 包含一个目标变量 pressure,以及两个可选的协变量字段:motor_current 和 valve_opening。后续通过 TimechoAI 上传 CSV 时,建议先从纯目标变量预测入手,再逐步加入协变量来对比效果。
上传前先对 CSV 做本地清洗
官方文档说明 TimechoAI 支持手动输入、绘制曲线、上传 CSV,预测步数范围可设置为 1 到 720 步。上传之前,建议先做一次本地预检,重点关注时间戳格式、缺失值分布和采样间隔一致性。
from pathlib import Path
csv_path = Path("pump_pressure.csv")
df = pd.read_csv(csv_path)
df["time"] = pd.to_datetime(df["time"])
df = df.sort_values("time").drop_duplicates("time")
series = (df.set_index("time")["pressure"].asfreq("15min"))
print("rows:", len(df))
print("missing:", int(series.isna().sum()))
print("start:", series.index.min())
print("end:", series.index.max())
series = series.interpolate(limit_direction="both")
history = series.tail(256).round(4).tolist()
如果缺失值集中在某一段区间内,不建议直接交给模型处理。插值操作会把真实异常趋势抹平,后续的预测和异常检测结果都会失真。
用目标变量跑一次预测流程
从 TimechoAI 官方产品截图可以看到预测分析界面:完成预测后会展示推理耗时、预测点数、目标变量曲线与预测曲线,并提供导出功能入口。
网页端可以按这个顺序操作:新增一段时序数据,选定目标变量,设置预测步数,观察输出曲线是否符合业务预期。官方界面中包含了“预测参数设置”“添加时序数据”“上传文件”等入口,说明平台不仅支持代码调用,也能先通过页面做低成本验证。建议让懂设备业务的人员先看曲线,他们比开发更快判断某段预测是否明显偏离实际情况。
预测步数不要随意填写。15 分钟采样间隔下,预测 32 步对应未来约 8 小时;预测 96 步则对应未来约 1 天。采样粒度一旦变化,预测步数的业务含义也会随之改变。很多时序项目出问题,不是因为模型接口没调通,而是“步数”这个参数在业务层面没有定义清楚。
如果通过 REST API 接入,接口路径和字段名以平台官方文档为准。代码里建议把路径做成环境变量,避免将未确认的 endpoint 硬编码。
import os
import requests
BASE_URL = os.getenv("TIMECHOAI_BASE_URL", "https://ai.timecho.com")
API_KEY = os.environ["TIMECHOAI_API_KEY"]
FORECAST_PATH = os.getenv("TIMECHOAI_FORECAST_PATH", "/")
payload = {
"mode": "target_only",
"horizon": 32,
"time_unit": "15min",
"target": history,
}
resp = requests.post(
BASE_URL.rstrip("/") + FORECAST_PATH,
headers={"Authorization": f"Bearer {API_KEY}"},
json=payload,
timeout=30,
)
resp.raise_for_status()
result = resp.json()
print(result.keys())
在网页端体验时,本质上完成了三个动作:准备历史数据点、设置预测步长、查看预测曲线。接口接入时也尽量保持这个结构,不要把数据清洗、模型调用、结果入库混在同一个函数里。
def build_target_only_payload(values, horizon=32, time_unit="15min"):
return {
"mode": "target_only",
"horizon": horizon,
"time_unit": time_unit,
"target": [float(v) for v in values],
}
payload = build_target_only_payload(history, horizon=32)
实际接入时,建议把调用封装成一个轻薄的客户端类。这样后续接口路径、鉴权头、超时策略有变化时,只需改动一处。
class TimechoAIClient:
def __init__(self, base_url, api_key, timeout=30):
self.base_url = base_url.rstrip("/")
self.api_key = api_key
self.timeout = timeout
def post(self, path, payload):
resp = requests.post(
self.base_url + path,
headers={"Authorization": f"Bearer {self.api_key}"},
json=payload,
timeout=self.timeout,
)
resp.raise_for_status()
return resp.json()
client = TimechoAIClient(BASE_URL, API_KEY)
result = client.post(FORECAST_PATH, payload)
这个封装不做复杂设计,只处理三件事:拼接 URL、携带 API Key、检查 HTTP 状态码。预测逻辑、数据清洗、结果保存都不要塞进这个类里,否则后续排查会变得困难。
加协变量之前,先确认它确实有贡献
官方文档提到 TimechoAI 支持三种预测模式:纯目标变量、历史协变量、未来协变量。这一点很关键,因为很多时序曲线的变化并非孤立事件。
例如压力曲线可能受电机电流和阀门开度影响;能耗曲线可能关联产量、班次、温度等变量。协变量不是字段堆得越多越好,应该先评估相关性和业务可解释性。
features = df.set_index("time")[["pressure", "motor_current", "valve_opening"]].asfreq("15min")
features = features.interpolate(limit_direction="both")
print(features.corr(numeric_only=True)["pressure"].sort_values(ascending=False))
构造带协变量的 payload 时,把目标变量和辅助变量分开处理。历史协变量只放已经发生的过去数据;未来协变量只放预测窗口内确实能提前获知的数据。
window = features.tail(256)
payload = {
"mode": "with_covariates",
"horizon": 32,
"time_unit": "15min",
"target": window["pressure"].round(4).tolist(),
"history_covariates": {
"motor_current": window["motor_current"].round(4).tolist(),
"valve_opening": window["valve_opening"].round(4).tolist(),
},
"future_covariates": {}
}
如果后续能拿到排产计划、气象预报、节假日标记这类未来已知信息,再放入 future_covariates。无法获取未来值的字段,不要强行塞入。
协变量有一个容易忽略的限制:长度和时间轴必须对齐。目标变量是 15 分钟一个点,协变量也需要按相同粒度处理。某个字段如果是小时级数据,最好先重采样到 15 分钟,再决定采用前向填充还是插值。这个步骤看似不起眼,但会直接影响模型对输入关系的理解。
hourly_weather = pd.DataFrame({
"time": pd.date_range("2026-06-01", periods=7 * 24, freq="1h"),
"temperature": 28 + np.sin(np.arange(7 * 24) / 6),
})
weather_15min = (hourly_weather.set_index("time")
.asfreq("15min")
.ffill())
aligned = features.join(weather_15min, how="left").ffill()
print(aligned.tail())
如果协变量本身也是预测值(比如天气预报),需要在调用记录里标明数据来源。后续主序列预测偏差变大时,才能判断是 TimechoAI 的模型结果不稳定,还是未来协变量本身已经偏离真实值。
预测结果一定要可视化查看
只看返回值很难判断模型是否跟住了趋势。把预测曲线、历史曲线、置信区间放在同一张图里,异常点和过度平滑的情况会一目了然。
import matplotlib.pyplot as plt
def plot_forecast(series, prediction, lower=None, upper=None, freq="15min"):
history = series.tail(96)
future_index = pd.date_range(
history.index[-1] + pd.Timedelta(freq),
periods=len(prediction),
freq=freq,
)
plt.figure(figsize=(12, 5))
plt.plot(history.index, history.values, label="history", linewidth=2)
plt.plot(future_index, prediction, label="forecast", linewidth=2)
if lower is not None and upper is not None:
plt.fill_between(future_index, lower, upper, alpha=0.18, label="interval")
plt.legend()
plt.xticks(rotation=30)
plt.tight_layout()
plt.show()
# prediction 字段名以实际返回为准
prediction = result.get("prediction", [])
plot_forecast(series, prediction)
这一步不要跳过。如果预测结果突然发散,或者预测区间快速展宽,说明后续的点只能当作趋势参考,不适合直接触发强决策动作。
需要将图片保存到报告或看板时,把 show() 替换为 savefig()。
def save_forecast_png(series, prediction, output="forecast.png"):
history = series.tail(96)
future_index = pd.date_range(
history.index[-1] + pd.Timedelta("15min"),
periods=len(prediction),
freq="15min",
)
fig, ax = plt.subplots(figsize=(12, 5))
ax.plot(history.index, history.values, label="history", linewidth=2)
ax.plot(future_index, prediction, label="forecast", linewidth=2)
ax.legend()
ax.grid(alpha=0.25)
fig.autofmt_xdate()
fig.tight_layout()
fig.savefig(output, dpi=160)
plt.close(fig)
保存图片还有一个好处:同一条曲线可以对比不同输入模式的效果。纯目标变量、加入历史协变量、加入未来协变量各保存一张图,哪个版本更稳定,肉眼很快就能判断出来。
数据集与模型管理也要同步关注
TimechoAI 官方页面还提供了数据集管理能力:内置工业、能源、金融、气象、医疗等真实数据集,支持原生 TsFile,支持多设备、多测点、多粒度数据组织。这些功能虽然不如预测按钮直观,但在实际项目中必不可少。
如果数据长期只靠临时 CSV 传递,很快就会变得混乱。建议先在本地把字段规范约定好,再接入平台的数据集管理流程。
schema = {
"time": "datetime64[ns]",
"pressure": "float64",
"motor_current": "float64",
"valve_opening": "float64",
}
for column, dtype in schema.items():
assert column in df.columns, f"missing column: {column}"
clean_df = df.astype({
"pressure": "float64",
"motor_current": "float64",
"valve_opening": "float64",
})
clean_df.to_csv("pump_pressure_clean.csv", index=False)
如果数据原先存储在 IoTDB 或其他时序数据库中,可以先用脚本拉取最近窗口,再转换为 TimechoAI 可接收的结构。下面用伪连接参数示意读取方式,实际主机、用户名、路径按自己的环境填写。
from iotdb.Session import Session
session = Session("127.0.0.1", 6667, "root", "root")
session.open(False)
sql = """
SELECT pressure, motor_current, valve_opening
FROM root.factory.pump_01
WHERE time >= 2026-06-01T00:00:00
"""
dataset = session.execute_query_statement(sql)
rows = []
while dataset.has_next():
row = dataset.next()
rows.append({
"time": row.get_timestamp(),
"pressure": row.get_fields()[0].get_float_value(),
"motor_current": row.get_fields()[1].get_float_value(),
"valve_opening": row.get_fields()[2].get_float_value(),
})
session.close()
iotdb_df = pd.DataFrame(rows)
这一步的目标不是绑定某一种数据库,而是把“数据来源”与“模型调用”解耦。只要数据能整理成稳定的时间戳和数值列,后续就能按同一套流程做预测。
模型管理也不是装饰功能。官方页面提到可以管理训练任务、配置模型与训练超参数,并记录模型版本、训练参数以及 Checkpoint 路径。如果只是单次体验,可能感受不到它的必要性;但一旦涉及不同测点、不同数据窗口、不同参数组合的反复尝试,就必须清楚每次结果来自哪一组配置。
本地系统也可以同步保留一份调用日志,方便后期排查。
CREATE TABLE forecast_call_log (
id BIGINT PRIMARY KEY,
measurement VARCHAR(128) NOT NULL,
horizon INT NOT NULL,
time_unit VARCHAR(32) NOT NULL,
request_hash VARCHAR(64) NOT NULL,
model_name VARCHAR(128),
model_version VARCHAR(128),
status VARCHAR(32) NOT NULL,
created_at TIMESTAMP NOT NULL
);
调用之前可以计算一个请求哈希值,保存到日志表里。后续同一测点多次预测时,用哈希值就能快速判断输入窗口是否完全一致。
import hashlib
import json
def request_hash(payload):
raw = json.dumps(payload, ensure_ascii=False, sort_keys=True)
return hashlib.sha256(raw.encode("utf-8")).hexdigest()
payload_hash = request_hash(payload)
print(payload_hash)
异常检测可以和预测协同使用
固定阈值只能处理“超过设定值”的简单场景,渐变型异常更让人头疼。比如压力缓慢爬升、电流长期漂移,绝对值可能还没超限,但趋势已经发生偏移。Timer 官方页面提到了异常检测能力,TimechoAI 产品页也把异常检测列为核心功能之一。
接口字段仍以官方文档为准,结构可以这样拆分:
ANOMALY_PATH = os.getenv("TIMECHOAI_ANOMALY_PATH", "/")
payload = {
"series": series.tail(256).round(4).tolist(),
"window": 32,
"sensitivity": "medium",
}
resp = requests.post(
BASE_URL.rstrip("/") + ANOMALY_PATH,
headers={"Authorization": f"Bearer {API_KEY}"},
json=payload,
timeout=30,
)
resp.raise_for_status()
anomaly_result = resp.json()
异常检测的结果建议至少保存三类信息:异常分数、异常区间、原始数据片段。只保存一个“异常”标签,后续很难做复盘分析。
def collect_anomaly_points(series, scores, threshold=0.8):
rows = []
tail = series.tail(len(scores))
for ts, value, score in zip(tail.index, tail.values, scores):
if score >= threshold:
rows.append({
"time": ts.isoformat(),
"value": float(value),
"score": float(score),
})
return rows
scores = anomaly_result.get("scores", [])
anomaly_points = collect_anomaly_points(series, scores)
print(anomaly_points[:5])
异常检测和预测可以组合使用。当异常分数连续升高时,再触发一次短期预测,观察未来几个点是否继续抬升。这种方式比单纯看阈值更精细,也不会把每次小波动都当作故障处理。
def should_forecast_after_anomaly(scores, threshold=0.8, min_count=3):
recent = scores[-8:]
hit_count = sum(score >= threshold for score in recent)
return hit_count >= min_count
if should_forecast_after_anomaly(scores):
payload = build_target_only_payload(series.tail(256).tolist(), horizon=32)
forecast_after_anomaly = client.post(FORECAST_PATH, payload)
这个判断逻辑比较朴素,但适合作为第一版规则。后续如果误报较多,再把阈值、连续次数、设备状态、工况标签等因素加进来。
多条曲线不要逐一手动操作
真实场景中通常不是单条曲线,而是一批设备、一批测点。网页端适合先做效果验证,后续更适合通过脚本批量处理。
measurements = {
"pump_01.pressure": features["pressure"],
"pump_01.current": features["motor_current"],
"pump_01.valve": features["valve_opening"],
}
def build_batch_payload(measurements, horizon=32):
series_list = []
for name, s in measurements.items():
values = s.dropna().tail(256).round(4).tolist()
series_list.append({
"name": name,
"target": values,
})
return {
"mode": "batch_target_only",
"horizon": horizon,
"time_unit": "15min",
"series": series_list,
}
batch_payload = build_batch_payload(measurements)
批量任务最容易遗漏的是失败处理。某一条曲线缺失严重、长度不足、字段格式不对,都不应该影响其他曲线的预测。
def validate_series(name, values, min_points=64):
if len(values) < min_points:
return f"{name}: points less than {min_points}"
if any(pd.isna(values)):
return f"{name}: contains NaN"
return None
errors = []
for item in batch_payload["series"]:
err = validate_series(item["name"], item["target"])
if err:
errors.append(err)
print(errors)
批量调用的结果也要分开保存,不要只存一个完整的 JSON 文件,否则单条曲线回溯时查找困难。
def flatten_batch_result(batch_result):
rows = []
for item in batch_result.get("series", []):
name = item["name"]
for point in item.get("prediction", []):
rows.append({
"measurement": name,
"forecast_time": point["time"],
"predicted_value": point["value"],
"lower_bound": point.get("lower_bound"),
"upper_bound": point.get("upper_bound"),
})
return pd.DataFrame(rows)
把最小流程整合成一个脚本
前面几段代码分散在不同步骤中,实际运行时可以先整理成一个最小脚本。它只做四件事:读取 CSV、清洗时间轴、调用预测接口、保存结果。先把这个脚本跑稳定,再逐步加入协变量、异常检测和批量任务。
def load_series(path, column, freq="15min", tail=256):
df = pd.read_csv(path)
df["time"] = pd.to_datetime(df["time"])
df = df.sort_values("time").drop_duplicates("time")
s = df.set_index("time")[column].asfreq(freq)
s = s.interpolate(limit_direction="both")
return s.tail(tail)
def save_prediction_csv(series, prediction, output="timechoai_prediction.csv"):
future_index = pd.date_range(
series.index[-1] + pd.Timedelta("15min"),
periods=len(prediction),
freq="15min",
)
out = pd.DataFrame({
"time": future_index,
"predicted_value": prediction,
})
out.to_csv(output, index=False)
return output
def main():
s = load_series("pump_pressure.csv", "pressure")
payload = build_target_only_payload(s.tolist(), horizon=32)
result = client.post(FORECAST_PATH, payload)
prediction = result.get("prediction", [])
save_prediction_csv(s, prediction)
save_forecast_png(s, prediction)
if __name__ == "__main__":
main()
这类脚本最适合放在验证阶段。它不追求复杂,但每一步都能单独检查:CSV 是否正确读取、时间轴是否已补齐、接口是否返回预测数组、结果文件能否被看板或报表正常读取。等这条链路跑通,再把 load_series() 替换为数据库读取,把 save_prediction_csv() 替换为写入业务库,代码结构也不会混乱。
运行前建议把几个变量写入环境变量,避免将密钥写在脚本中。
export TIMECHOAI_BASE_URL="https://ai.timecho.com"
export TIMECHOAI_API_KEY="your_api_key"
export TIMECHOAI_FORECAST_PATH="/"
python run_timechoai_forecast.py
如果接口返回的字段与示例不一致,优先修改 prediction = result.get("prediction", []) 这一行。数据清洗、绘图、结果保存等部分不需要跟着大改。
这条最小流程跑通后,可以按三个标准判断是否值得继续深入。第一,预测曲线要能接上历史曲线,不能在预测起点处突然跳到完全不相关的区间。第二,结果文件要能稳定生成,时间戳不能重复,也不能缺少预测步数。第三,同一份输入重复调用时,返回结果的结构必须一致,至少字段名、数组长度、时间顺序不能变化。只要这三点没问题,再去加入协变量和异常检测会更加稳妥。
如果这三点中有任何一项不稳定,先不要急着扩展功能。把输入窗口、采样粒度、接口返回结构固定下来,再继续加代码,排查成本会低很多。
验证阶段宁可慢一点,也别把多个变量一次性全加进去。
先让单变量流程稳定,再让协变量参与判断。
这样排查最省时间,也更稳妥。
收尾
TimechoAI 比较适合从小数据开始验证:一条曲线、一份 CSV、一个预测窗口,先看预测曲线是否符合业务直觉;再加入协变量,观察结果是否变得更稳定;最后把 REST API 或 Python SDK 接入现有看板、告警或巡检流程中。
它不是用来替代数据清洗,也不是替代现场判断。更合适的用法,是把过去只能依赖人工盯图的时序数据,推进到“能预测、能发现偏离、能把结果沉淀在系统里”的状态。



