Parquet数据导出与上传MaxCompute完整流程指南

2026-06-11阅读 0热度 0
其他

概述

本文的目标非常明确:将本地的 Parquet 文件(kuairand-27k 推荐系统数据集)完整导出,并上传至阿里云 MaxCompute 表。全流程涵盖数据探查、类型映射、建表与上传,附带了可直接复用的完整脚本,实现端到端自动化。

kuairand-27k的Parquet 数据导出与上传到 MaxCompute 完整流程(hstu格式)

1. 环境准备

依赖安装

先安装必需的 Python 包:

pip install pandas pyarrow pyodps

其中 pandaspyarrow 用于读取 Parquet 文件,pyodps 是阿里云 MaxCompute 的 Python SDK,三者缺一不可。

凭证配置

连接 MaxCompute 需要 AccessKey。强烈建议通过环境变量传入,避免硬编码——安全是底线。配置命令如下:

export ACCESS_ID="your_access_id"
export ACCESS_KEY="your_access_key"

2. 数据探查:读取 Parquet 文件

上传前,必须摸清数据结构。用以下代码加载 Parquet 文件,快速掌握全貌:

import pandas as pd
df = pd.read_parquet("kuairand-27k-train-0.parquet")
print(f"数据总行数: {len(df)}")
print(f"数据列数: {len(df.columns)}")
print(f"列名列表:")
print(df.columns.tolist())
print(f"数据类型:")
print(df.dtypes)
# 逐列查看前5条数据
for col in df.columns:
    print(f"--- {col} (dtype: {df[col].dtype}) ---")
    for i in range(min(5, len(df))):
        val = df[col].iloc[i]
        if isinstance(val, (list,)):
            print(f"[{i}] len={len(val)}, 前10个值: {val[:10]}")
        else:
            print(f"[{i}] {val}")

探查结果

执行后,数据集的画像清晰:共 1257 行、14 列,结构如下:

列名 Python 类型 说明
user_id int64 用户 ID
user_active_degree int64 用户活跃度
follow_user_num_range int64 关注人数区间
fans_user_num_range int64 粉丝人数区间
friend_user_num_range int64 好友人数区间
register_days_range int64 注册天数区间
video_id list(int) 用户历史交互视频序列
action_timestamp list(int) 行为时间戳序列
action_weight list(int) 行为权重序列(bitmask 编码)
watch_time list(int) 观看时长序列
item_video_id list(int) 候选视频 ID 序列
item_action_weight list(int) 候选视频行为标签
item_target_watchtime list(int) 候选视频目标观看时长
item_query_time list(int) 候选请求时间戳

3. 类型映射:Parquet → MaxCompute

数据类型必须严格对应,否则上传会失败。映射关系如下:

Parquet/Python 类型 MaxCompute 类型
int64(标量) bigint
list(int)(数组) array

规则非常直接:标量映射为 bigint,数组映射为 array

4. 完整上传脚本

以下是核心环节:完整的上传脚本。脚本已内建类型转换与批量写入逻辑,开箱即用。

#!/usr/bin/env python3
"""将 kuairand-27k-train-0.parquet 数据上传到 MaxCompute 表 pairec_kuairand_train"""
import os
import pandas as pd
import numpy as np
from odps import ODPS
from odps.models import TableSchema as Schema, Column

# ========== 1. 配置连接参数 ==========
project_name = "pairec_mc"
access_id = os.environ["ACCESS_ID"]
access_key = os.environ["ACCESS_KEY"]
endpoint = "http://service.cn.maxcompute.aliyun.com/api"

# ========== 2. 连接 MaxCompute ==========
odps = ODPS(access_id, access_key, project_name, endpoint=endpoint)
print("MaxCompute 连接成功")

# ========== 3. 建表 ==========
TABLE_NAME = "pairec_kuairand_train"
# 先删除已有表(如需覆盖写入)
if odps.exist_table(TABLE_NAME):
    print(f"表 {TABLE_NAME} 已存在,正在删除...")
    odps.delete_table(TABLE_NAME)
    print(f"表 {TABLE_NAME} 已删除")

# 定义表 schema
columns = [
    # 用户侧标量字段 (bigint)
    Column(name="user_id", type="bigint"),
    Column(name="user_active_degree", type="bigint"),
    Column(name="follow_user_num_range", type="bigint"),
    Column(name="fans_user_num_range", type="bigint"),
    Column(name="friend_user_num_range", type="bigint"),
    Column(name="register_days_range", type="bigint"),
    # 历史序列字段 (array)
    Column(name="video_id", type="array"),
    Column(name="action_timestamp", type="array"),
    Column(name="action_weight", type="array"),
    Column(name="watch_time", type="array"),
    # 候选物料字段 (array)
    Column(name="item_video_id", type="array"),
    Column(name="item_action_weight", type="array"),
    Column(name="item_target_watchtime", type="array"),
    Column(name="item_query_time", type="array"),
]
schema = Schema(columns=columns)
odps.create_table(TABLE_NAME, schema)
print(f"表 {TABLE_NAME} 创建成功")

# ========== 4. 读取 Parquet 并上传数据 ==========
PARQUET_PATH = "kuairand-27k-train-0.parquet"
df = pd.read_parquet(PARQUET_PATH)
print(f"Parquet 读取完成,共 {len(df)} 行")

# 将 numpy 数组转为 Python list(PyODPS 要求原生 Python 类型)
array_cols = ["video_id", "action_timestamp", "action_weight", "watch_time",
              "item_video_id", "item_action_weight", "item_target_watchtime", "item_query_time"]
for col in array_cols:
    df[col] = df[col].apply(lambda x: list(x) if isinstance(x, np.ndarray) else (x if isinstance(x, list) else []))

# 标量列确保为 Python int
scalar_cols = ["user_id", "user_active_degree", "follow_user_num_range",
               "fans_user_num_range", "friend_user_num_range", "register_days_range"]
for col in scalar_cols:
    df[col] = df[col].astype(int)

# 使用 Tunnel 上传
table = odps.get_table(TABLE_NAME)
print(f"开始上传数据(共 {len(df)} 行)...")
with table.open_writer() as writer:
    records = []
    for idx, row in df.iterrows():
        record = [
            int(row["user_id"]),
            int(row["user_active_degree"]),
            int(row["follow_user_num_range"]),
            int(row["fans_user_num_range"]),
            int(row["friend_user_num_range"]),
            int(row["register_days_range"]),
            list(row["video_id"]),
            list(row["action_timestamp"]),
            list(row["action_weight"]),
            list(row["watch_time"]),
            list(row["item_video_id"]),
            list(row["item_action_weight"]),
            list(row["item_target_watchtime"]),
            list(row["item_query_time"]),
        ]
        records.append(table.new_record(record))
        if (idx + 1) % 100 == 0:
            print(f"已处理 {idx + 1}/{len(df)} 行")
    writer.write(records)
print(f"数据上传完成!共上传 {len(records)} 行到 {TABLE_NAME}")

5. 执行

先设置环境变量,然后运行脚本:

source env.sh  # 设置 ACCESS_ID、ACCESS_KEY 环境变量
python upload_to_odps.py

正常情况下输出类似:

project_name: pairec
endpoint: http://service.cn.maxcompute.aliyun.com/api
MaxCompute 连接成功
表 pairec_kuairand_train 创建成功
Parquet 读取完成,共 1257 行
开始上传数据(共 1257 行)...
已处理 100/1257 行
...
已处理 1200/1257 行
数据上传完成!共上传 1257 行到 pairec_kuairand_train

看到以上日志,表示数据已顺利写入 MaxCompute。

6. 常见问题与注意事项

Endpoint 与 Project 不匹配

MaxCompute 的 Project 绑定至特定 Region 的 Endpoint。遇到 Project not found 错误,多半是 Endpoint 配置有误。请确认 Project 所在 Region 并切换对应 Endpoint。例如公网华东2的 Endpoint 为 service.cn.maxcompute.aliyun.com

Endpoint 适用场景
service.cn.maxcompute.aliyun.com 公网(华东2)

AccessKey 与 Endpoint 网络域不匹配

公网 AccessKey 仅能与公网 Endpoint 配合,内网 AccessKey 仅能用于内网 Endpoint。混用将引发 AccessKeyIdNotFound 错误。这是常见的陷阱,务必区分。

PyODPS 要求原生 Python 类型

使用 open_writer() 写入时,PyODPS 不接受 numpy 的 int64ndarray,必须转换为 Python 原生 intlist。上述脚本已实现自动转换;若自行编写,切勿遗漏,否则直接报类型错误。

批量上传性能

若数据量极大(百万行以上),建议分批写入以降低内存消耗。例如每批 5000 条:

BATCH_SIZE = 5000
with table.open_writer() as writer:
    batch = []
    for idx, row in df.iterrows():
        batch.append(table.new_record([...]))
        if len(batch) >= BATCH_SIZE:
            writer.write(batch)
            batch = []
    if batch:
        writer.write(batch)

该模式兼顾内存效率与写入速度,尤其适合生产环境。

免责声明

本网站新闻资讯均来自公开渠道,力求准确但不保证绝对无误,内容观点仅代表作者本人,与本站无关。若涉及侵权,请联系我们处理。本站保留对声明的修改权,最终解释权归本站所有。

相关阅读

更多
欢迎回来 登录或注册后,可保存提示词和历史记录
登录后可同步收藏、历史记录和常用模板
注册即表示同意服务条款与隐私政策