TimechoAI上手指南:时序数据预测分析全攻略

2026-06-11阅读 0热度 0
ai

时序分析项目里,真正消耗精力的往往不是绘制曲线,而是曲线背后那些更棘手的问题:未来几个时序点的走向如何判断,异常是否才刚冒头,换一组设备数据后是否需要从头再搭一套模型。TimechoAI 正好切入这个场景。官方产品页将其定位为“基于 Timer 系列时序大模型”,提供自然语言问答、趋势预测、异常检测,以及 Web 界面、REST API、Python SDK 等多种接入方式。

先构造一份能跑通流程的样本数据

直接对接真实设备数据是最终目标,但上手阶段建议先用一段包含周期波动与随机噪声的样例数据,把“数据清洗、模型预测、结果绘图、输出保存”这几个环节完整跑一遍。

import numpy as np
import pandas as pd

rng = np.random.default_rng(42)
time_index = pd.date_range("2026-06-01 00:00:00", periods=7 * 24 * 4, freq="15min")
base = 55 + 8 * np.sin(np.arange(len(time_index)) / 12)
noise = rng.normal(0, 1.8, len(time_index))
pressure = base + noise

# 模拟一小段异常抬升
pressure[420:445] += np.linspace(0, 14, 25)

df = pd.DataFrame({
    "time": time_index,
    "pressure": pressure.round(3),
    "motor_current": (12 + pressure / 18 + rng.normal(0, 0.3, len(time_index))).round(3),
    "valve_opening": (45 + 12 * np.sin(np.arange(len(time_index)) / 30)).round(3),
})
df.to_csv("pump_pressure.csv", index=False)
print(df.head())

这份 CSV 包含一个目标变量 pressure,以及两个可选的协变量字段:motor_currentvalve_opening。后续通过 TimechoAI 上传 CSV 时,建议先从纯目标变量预测入手,再逐步加入协变量来对比效果。

上传前先对 CSV 做本地清洗

官方文档说明 TimechoAI 支持手动输入、绘制曲线、上传 CSV,预测步数范围可设置为 1 到 720 步。上传之前,建议先做一次本地预检,重点关注时间戳格式、缺失值分布和采样间隔一致性。

from pathlib import Path

csv_path = Path("pump_pressure.csv")
df = pd.read_csv(csv_path)
df["time"] = pd.to_datetime(df["time"])
df = df.sort_values("time").drop_duplicates("time")
series = (df.set_index("time")["pressure"].asfreq("15min"))

print("rows:", len(df))
print("missing:", int(series.isna().sum()))
print("start:", series.index.min())
print("end:", series.index.max())

series = series.interpolate(limit_direction="both")
history = series.tail(256).round(4).tolist()

如果缺失值集中在某一段区间内,不建议直接交给模型处理。插值操作会把真实异常趋势抹平,后续的预测和异常检测结果都会失真。

用目标变量跑一次预测流程

从 TimechoAI 官方产品截图可以看到预测分析界面:完成预测后会展示推理耗时、预测点数、目标变量曲线与预测曲线,并提供导出功能入口。

网页端可以按这个顺序操作:新增一段时序数据,选定目标变量,设置预测步数,观察输出曲线是否符合业务预期。官方界面中包含了“预测参数设置”“添加时序数据”“上传文件”等入口,说明平台不仅支持代码调用,也能先通过页面做低成本验证。建议让懂设备业务的人员先看曲线,他们比开发更快判断某段预测是否明显偏离实际情况。

预测步数不要随意填写。15 分钟采样间隔下,预测 32 步对应未来约 8 小时;预测 96 步则对应未来约 1 天。采样粒度一旦变化,预测步数的业务含义也会随之改变。很多时序项目出问题,不是因为模型接口没调通,而是“步数”这个参数在业务层面没有定义清楚。

如果通过 REST API 接入,接口路径和字段名以平台官方文档为准。代码里建议把路径做成环境变量,避免将未确认的 endpoint 硬编码。

import os
import requests

BASE_URL = os.getenv("TIMECHOAI_BASE_URL", "https://ai.timecho.com")
API_KEY = os.environ["TIMECHOAI_API_KEY"]
FORECAST_PATH = os.getenv("TIMECHOAI_FORECAST_PATH", "/")

payload = {
    "mode": "target_only",
    "horizon": 32,
    "time_unit": "15min",
    "target": history,
}

resp = requests.post(
    BASE_URL.rstrip("/") + FORECAST_PATH,
    headers={"Authorization": f"Bearer {API_KEY}"},
    json=payload,
    timeout=30,
)
resp.raise_for_status()
result = resp.json()
print(result.keys())

在网页端体验时,本质上完成了三个动作:准备历史数据点、设置预测步长、查看预测曲线。接口接入时也尽量保持这个结构,不要把数据清洗、模型调用、结果入库混在同一个函数里。

def build_target_only_payload(values, horizon=32, time_unit="15min"):
    return {
        "mode": "target_only",
        "horizon": horizon,
        "time_unit": time_unit,
        "target": [float(v) for v in values],
    }

payload = build_target_only_payload(history, horizon=32)

实际接入时,建议把调用封装成一个轻薄的客户端类。这样后续接口路径、鉴权头、超时策略有变化时,只需改动一处。

class TimechoAIClient:
    def __init__(self, base_url, api_key, timeout=30):
        self.base_url = base_url.rstrip("/")
        self.api_key = api_key
        self.timeout = timeout

    def post(self, path, payload):
        resp = requests.post(
            self.base_url + path,
            headers={"Authorization": f"Bearer {self.api_key}"},
            json=payload,
            timeout=self.timeout,
        )
        resp.raise_for_status()
        return resp.json()

client = TimechoAIClient(BASE_URL, API_KEY)
result = client.post(FORECAST_PATH, payload)

这个封装不做复杂设计,只处理三件事:拼接 URL、携带 API Key、检查 HTTP 状态码。预测逻辑、数据清洗、结果保存都不要塞进这个类里,否则后续排查会变得困难。

加协变量之前,先确认它确实有贡献

官方文档提到 TimechoAI 支持三种预测模式:纯目标变量、历史协变量、未来协变量。这一点很关键,因为很多时序曲线的变化并非孤立事件。

例如压力曲线可能受电机电流和阀门开度影响;能耗曲线可能关联产量、班次、温度等变量。协变量不是字段堆得越多越好,应该先评估相关性和业务可解释性。

features = df.set_index("time")[["pressure", "motor_current", "valve_opening"]].asfreq("15min")
features = features.interpolate(limit_direction="both")
print(features.corr(numeric_only=True)["pressure"].sort_values(ascending=False))

构造带协变量的 payload 时,把目标变量和辅助变量分开处理。历史协变量只放已经发生的过去数据;未来协变量只放预测窗口内确实能提前获知的数据。

window = features.tail(256)

payload = {
    "mode": "with_covariates",
    "horizon": 32,
    "time_unit": "15min",
    "target": window["pressure"].round(4).tolist(),
    "history_covariates": {
        "motor_current": window["motor_current"].round(4).tolist(),
        "valve_opening": window["valve_opening"].round(4).tolist(),
    },
    "future_covariates": {}
}

如果后续能拿到排产计划、气象预报、节假日标记这类未来已知信息,再放入 future_covariates。无法获取未来值的字段,不要强行塞入。

协变量有一个容易忽略的限制:长度和时间轴必须对齐。目标变量是 15 分钟一个点,协变量也需要按相同粒度处理。某个字段如果是小时级数据,最好先重采样到 15 分钟,再决定采用前向填充还是插值。这个步骤看似不起眼,但会直接影响模型对输入关系的理解。

hourly_weather = pd.DataFrame({
    "time": pd.date_range("2026-06-01", periods=7 * 24, freq="1h"),
    "temperature": 28 + np.sin(np.arange(7 * 24) / 6),
})
weather_15min = (hourly_weather.set_index("time")
                 .asfreq("15min")
                 .ffill())
aligned = features.join(weather_15min, how="left").ffill()
print(aligned.tail())

如果协变量本身也是预测值(比如天气预报),需要在调用记录里标明数据来源。后续主序列预测偏差变大时,才能判断是 TimechoAI 的模型结果不稳定,还是未来协变量本身已经偏离真实值。

预测结果一定要可视化查看

只看返回值很难判断模型是否跟住了趋势。把预测曲线、历史曲线、置信区间放在同一张图里,异常点和过度平滑的情况会一目了然。

import matplotlib.pyplot as plt

def plot_forecast(series, prediction, lower=None, upper=None, freq="15min"):
    history = series.tail(96)
    future_index = pd.date_range(
        history.index[-1] + pd.Timedelta(freq),
        periods=len(prediction),
        freq=freq,
    )
    plt.figure(figsize=(12, 5))
    plt.plot(history.index, history.values, label="history", linewidth=2)
    plt.plot(future_index, prediction, label="forecast", linewidth=2)
    if lower is not None and upper is not None:
        plt.fill_between(future_index, lower, upper, alpha=0.18, label="interval")
    plt.legend()
    plt.xticks(rotation=30)
    plt.tight_layout()
    plt.show()

# prediction 字段名以实际返回为准
prediction = result.get("prediction", [])
plot_forecast(series, prediction)

这一步不要跳过。如果预测结果突然发散,或者预测区间快速展宽,说明后续的点只能当作趋势参考,不适合直接触发强决策动作。

需要将图片保存到报告或看板时,把 show() 替换为 savefig()

def save_forecast_png(series, prediction, output="forecast.png"):
    history = series.tail(96)
    future_index = pd.date_range(
        history.index[-1] + pd.Timedelta("15min"),
        periods=len(prediction),
        freq="15min",
    )
    fig, ax = plt.subplots(figsize=(12, 5))
    ax.plot(history.index, history.values, label="history", linewidth=2)
    ax.plot(future_index, prediction, label="forecast", linewidth=2)
    ax.legend()
    ax.grid(alpha=0.25)
    fig.autofmt_xdate()
    fig.tight_layout()
    fig.savefig(output, dpi=160)
    plt.close(fig)

保存图片还有一个好处:同一条曲线可以对比不同输入模式的效果。纯目标变量、加入历史协变量、加入未来协变量各保存一张图,哪个版本更稳定,肉眼很快就能判断出来。

数据集与模型管理也要同步关注

TimechoAI 官方页面还提供了数据集管理能力:内置工业、能源、金融、气象、医疗等真实数据集,支持原生 TsFile,支持多设备、多测点、多粒度数据组织。这些功能虽然不如预测按钮直观,但在实际项目中必不可少。

如果数据长期只靠临时 CSV 传递,很快就会变得混乱。建议先在本地把字段规范约定好,再接入平台的数据集管理流程。

schema = {
    "time": "datetime64[ns]",
    "pressure": "float64",
    "motor_current": "float64",
    "valve_opening": "float64",
}
for column, dtype in schema.items():
    assert column in df.columns, f"missing column: {column}"

clean_df = df.astype({
    "pressure": "float64",
    "motor_current": "float64",
    "valve_opening": "float64",
})
clean_df.to_csv("pump_pressure_clean.csv", index=False)

如果数据原先存储在 IoTDB 或其他时序数据库中,可以先用脚本拉取最近窗口,再转换为 TimechoAI 可接收的结构。下面用伪连接参数示意读取方式,实际主机、用户名、路径按自己的环境填写。

from iotdb.Session import Session

session = Session("127.0.0.1", 6667, "root", "root")
session.open(False)

sql = """
SELECT pressure, motor_current, valve_opening
FROM root.factory.pump_01
WHERE time >= 2026-06-01T00:00:00
"""
dataset = session.execute_query_statement(sql)
rows = []
while dataset.has_next():
    row = dataset.next()
    rows.append({
        "time": row.get_timestamp(),
        "pressure": row.get_fields()[0].get_float_value(),
        "motor_current": row.get_fields()[1].get_float_value(),
        "valve_opening": row.get_fields()[2].get_float_value(),
    })
session.close()
iotdb_df = pd.DataFrame(rows)

这一步的目标不是绑定某一种数据库,而是把“数据来源”与“模型调用”解耦。只要数据能整理成稳定的时间戳和数值列,后续就能按同一套流程做预测。

模型管理也不是装饰功能。官方页面提到可以管理训练任务、配置模型与训练超参数,并记录模型版本、训练参数以及 Checkpoint 路径。如果只是单次体验,可能感受不到它的必要性;但一旦涉及不同测点、不同数据窗口、不同参数组合的反复尝试,就必须清楚每次结果来自哪一组配置。

本地系统也可以同步保留一份调用日志,方便后期排查。

CREATE TABLE forecast_call_log (
    id BIGINT PRIMARY KEY,
    measurement VARCHAR(128) NOT NULL,
    horizon INT NOT NULL,
    time_unit VARCHAR(32) NOT NULL,
    request_hash VARCHAR(64) NOT NULL,
    model_name VARCHAR(128),
    model_version VARCHAR(128),
    status VARCHAR(32) NOT NULL,
    created_at TIMESTAMP NOT NULL
);

调用之前可以计算一个请求哈希值,保存到日志表里。后续同一测点多次预测时,用哈希值就能快速判断输入窗口是否完全一致。

import hashlib
import json

def request_hash(payload):
    raw = json.dumps(payload, ensure_ascii=False, sort_keys=True)
    return hashlib.sha256(raw.encode("utf-8")).hexdigest()

payload_hash = request_hash(payload)
print(payload_hash)

异常检测可以和预测协同使用

固定阈值只能处理“超过设定值”的简单场景,渐变型异常更让人头疼。比如压力缓慢爬升、电流长期漂移,绝对值可能还没超限,但趋势已经发生偏移。Timer 官方页面提到了异常检测能力,TimechoAI 产品页也把异常检测列为核心功能之一。

接口字段仍以官方文档为准,结构可以这样拆分:

ANOMALY_PATH = os.getenv("TIMECHOAI_ANOMALY_PATH", "/")

payload = {
    "series": series.tail(256).round(4).tolist(),
    "window": 32,
    "sensitivity": "medium",
}

resp = requests.post(
    BASE_URL.rstrip("/") + ANOMALY_PATH,
    headers={"Authorization": f"Bearer {API_KEY}"},
    json=payload,
    timeout=30,
)
resp.raise_for_status()
anomaly_result = resp.json()

异常检测的结果建议至少保存三类信息:异常分数、异常区间、原始数据片段。只保存一个“异常”标签,后续很难做复盘分析。

def collect_anomaly_points(series, scores, threshold=0.8):
    rows = []
    tail = series.tail(len(scores))
    for ts, value, score in zip(tail.index, tail.values, scores):
        if score >= threshold:
            rows.append({
                "time": ts.isoformat(),
                "value": float(value),
                "score": float(score),
            })
    return rows

scores = anomaly_result.get("scores", [])
anomaly_points = collect_anomaly_points(series, scores)
print(anomaly_points[:5])

异常检测和预测可以组合使用。当异常分数连续升高时,再触发一次短期预测,观察未来几个点是否继续抬升。这种方式比单纯看阈值更精细,也不会把每次小波动都当作故障处理。

def should_forecast_after_anomaly(scores, threshold=0.8, min_count=3):
    recent = scores[-8:]
    hit_count = sum(score >= threshold for score in recent)
    return hit_count >= min_count

if should_forecast_after_anomaly(scores):
    payload = build_target_only_payload(series.tail(256).tolist(), horizon=32)
    forecast_after_anomaly = client.post(FORECAST_PATH, payload)

这个判断逻辑比较朴素,但适合作为第一版规则。后续如果误报较多,再把阈值、连续次数、设备状态、工况标签等因素加进来。

多条曲线不要逐一手动操作

真实场景中通常不是单条曲线,而是一批设备、一批测点。网页端适合先做效果验证,后续更适合通过脚本批量处理。

measurements = {
    "pump_01.pressure": features["pressure"],
    "pump_01.current": features["motor_current"],
    "pump_01.valve": features["valve_opening"],
}

def build_batch_payload(measurements, horizon=32):
    series_list = []
    for name, s in measurements.items():
        values = s.dropna().tail(256).round(4).tolist()
        series_list.append({
            "name": name,
            "target": values,
        })
    return {
        "mode": "batch_target_only",
        "horizon": horizon,
        "time_unit": "15min",
        "series": series_list,
    }

batch_payload = build_batch_payload(measurements)

批量任务最容易遗漏的是失败处理。某一条曲线缺失严重、长度不足、字段格式不对,都不应该影响其他曲线的预测。

def validate_series(name, values, min_points=64):
    if len(values) < min_points:
        return f"{name}: points less than {min_points}"
    if any(pd.isna(values)):
        return f"{name}: contains NaN"
    return None

errors = []
for item in batch_payload["series"]:
    err = validate_series(item["name"], item["target"])
    if err:
        errors.append(err)
print(errors)

批量调用的结果也要分开保存,不要只存一个完整的 JSON 文件,否则单条曲线回溯时查找困难。

def flatten_batch_result(batch_result):
    rows = []
    for item in batch_result.get("series", []):
        name = item["name"]
        for point in item.get("prediction", []):
            rows.append({
                "measurement": name,
                "forecast_time": point["time"],
                "predicted_value": point["value"],
                "lower_bound": point.get("lower_bound"),
                "upper_bound": point.get("upper_bound"),
            })
    return pd.DataFrame(rows)

把最小流程整合成一个脚本

前面几段代码分散在不同步骤中,实际运行时可以先整理成一个最小脚本。它只做四件事:读取 CSV、清洗时间轴、调用预测接口、保存结果。先把这个脚本跑稳定,再逐步加入协变量、异常检测和批量任务。

def load_series(path, column, freq="15min", tail=256):
    df = pd.read_csv(path)
    df["time"] = pd.to_datetime(df["time"])
    df = df.sort_values("time").drop_duplicates("time")
    s = df.set_index("time")[column].asfreq(freq)
    s = s.interpolate(limit_direction="both")
    return s.tail(tail)

def save_prediction_csv(series, prediction, output="timechoai_prediction.csv"):
    future_index = pd.date_range(
        series.index[-1] + pd.Timedelta("15min"),
        periods=len(prediction),
        freq="15min",
    )
    out = pd.DataFrame({
        "time": future_index,
        "predicted_value": prediction,
    })
    out.to_csv(output, index=False)
    return output

def main():
    s = load_series("pump_pressure.csv", "pressure")
    payload = build_target_only_payload(s.tolist(), horizon=32)
    result = client.post(FORECAST_PATH, payload)
    prediction = result.get("prediction", [])
    save_prediction_csv(s, prediction)
    save_forecast_png(s, prediction)

if __name__ == "__main__":
    main()

这类脚本最适合放在验证阶段。它不追求复杂,但每一步都能单独检查:CSV 是否正确读取、时间轴是否已补齐、接口是否返回预测数组、结果文件能否被看板或报表正常读取。等这条链路跑通,再把 load_series() 替换为数据库读取,把 save_prediction_csv() 替换为写入业务库,代码结构也不会混乱。

运行前建议把几个变量写入环境变量,避免将密钥写在脚本中。

export TIMECHOAI_BASE_URL="https://ai.timecho.com"
export TIMECHOAI_API_KEY="your_api_key"
export TIMECHOAI_FORECAST_PATH="/"
python run_timechoai_forecast.py

如果接口返回的字段与示例不一致,优先修改 prediction = result.get("prediction", []) 这一行。数据清洗、绘图、结果保存等部分不需要跟着大改。

这条最小流程跑通后,可以按三个标准判断是否值得继续深入。第一,预测曲线要能接上历史曲线,不能在预测起点处突然跳到完全不相关的区间。第二,结果文件要能稳定生成,时间戳不能重复,也不能缺少预测步数。第三,同一份输入重复调用时,返回结果的结构必须一致,至少字段名、数组长度、时间顺序不能变化。只要这三点没问题,再去加入协变量和异常检测会更加稳妥。

如果这三点中有任何一项不稳定,先不要急着扩展功能。把输入窗口、采样粒度、接口返回结构固定下来,再继续加代码,排查成本会低很多。

验证阶段宁可慢一点,也别把多个变量一次性全加进去。

先让单变量流程稳定,再让协变量参与判断。

这样排查最省时间,也更稳妥。

收尾

TimechoAI 比较适合从小数据开始验证:一条曲线、一份 CSV、一个预测窗口,先看预测曲线是否符合业务直觉;再加入协变量,观察结果是否变得更稳定;最后把 REST API 或 Python SDK 接入现有看板、告警或巡检流程中。

它不是用来替代数据清洗,也不是替代现场判断。更合适的用法,是把过去只能依赖人工盯图的时序数据,推进到“能预测、能发现偏离、能把结果沉淀在系统里”的状态。

免责声明

本网站新闻资讯均来自公开渠道,力求准确但不保证绝对无误,内容观点仅代表作者本人,与本站无关。若涉及侵权,请联系我们处理。本站保留对声明的修改权,最终解释权归本站所有。

相关阅读

更多
欢迎回来 登录或注册后,可保存提示词和历史记录
登录后可同步收藏、历史记录和常用模板
注册即表示同意服务条款与隐私政策