Parquet数据导出与上传MaxCompute完整流程指南
概述
本文的目标非常明确:将本地的 Parquet 文件(kuairand-27k 推荐系统数据集)完整导出,并上传至阿里云 MaxCompute 表。全流程涵盖数据探查、类型映射、建表与上传,附带了可直接复用的完整脚本,实现端到端自动化。
1. 环境准备
依赖安装
先安装必需的 Python 包:
pip install pandas pyarrow pyodps
其中 pandas 和 pyarrow 用于读取 Parquet 文件,pyodps 是阿里云 MaxCompute 的 Python SDK,三者缺一不可。
凭证配置
连接 MaxCompute 需要 AccessKey。强烈建议通过环境变量传入,避免硬编码——安全是底线。配置命令如下:
export ACCESS_ID="your_access_id"
export ACCESS_KEY="your_access_key"
2. 数据探查:读取 Parquet 文件
上传前,必须摸清数据结构。用以下代码加载 Parquet 文件,快速掌握全貌:
import pandas as pd
df = pd.read_parquet("kuairand-27k-train-0.parquet")
print(f"数据总行数: {len(df)}")
print(f"数据列数: {len(df.columns)}")
print(f"列名列表:")
print(df.columns.tolist())
print(f"数据类型:")
print(df.dtypes)
# 逐列查看前5条数据
for col in df.columns:
print(f"--- {col} (dtype: {df[col].dtype}) ---")
for i in range(min(5, len(df))):
val = df[col].iloc[i]
if isinstance(val, (list,)):
print(f"[{i}] len={len(val)}, 前10个值: {val[:10]}")
else:
print(f"[{i}] {val}")
探查结果
执行后,数据集的画像清晰:共 1257 行、14 列,结构如下:
| 列名 | Python 类型 | 说明 |
|---|---|---|
user_id | int64 | 用户 ID |
user_active_degree | int64 | 用户活跃度 |
follow_user_num_range | int64 | 关注人数区间 |
fans_user_num_range | int64 | 粉丝人数区间 |
friend_user_num_range | int64 | 好友人数区间 |
register_days_range | int64 | 注册天数区间 |
video_id | list(int) | 用户历史交互视频序列 |
action_timestamp | list(int) | 行为时间戳序列 |
action_weight | list(int) | 行为权重序列(bitmask 编码) |
watch_time | list(int) | 观看时长序列 |
item_video_id | list(int) | 候选视频 ID 序列 |
item_action_weight | list(int) | 候选视频行为标签 |
item_target_watchtime | list(int) | 候选视频目标观看时长 |
item_query_time | list(int) | 候选请求时间戳 |
3. 类型映射:Parquet → MaxCompute
数据类型必须严格对应,否则上传会失败。映射关系如下:
| Parquet/Python 类型 | MaxCompute 类型 |
|---|---|
int64(标量) | bigint |
list(int)(数组) | array |
规则非常直接:标量映射为 bigint,数组映射为 array。
4. 完整上传脚本
以下是核心环节:完整的上传脚本。脚本已内建类型转换与批量写入逻辑,开箱即用。
#!/usr/bin/env python3
"""将 kuairand-27k-train-0.parquet 数据上传到 MaxCompute 表 pairec_kuairand_train"""
import os
import pandas as pd
import numpy as np
from odps import ODPS
from odps.models import TableSchema as Schema, Column
# ========== 1. 配置连接参数 ==========
project_name = "pairec_mc"
access_id = os.environ["ACCESS_ID"]
access_key = os.environ["ACCESS_KEY"]
endpoint = "http://service.cn.maxcompute.aliyun.com/api"
# ========== 2. 连接 MaxCompute ==========
odps = ODPS(access_id, access_key, project_name, endpoint=endpoint)
print("MaxCompute 连接成功")
# ========== 3. 建表 ==========
TABLE_NAME = "pairec_kuairand_train"
# 先删除已有表(如需覆盖写入)
if odps.exist_table(TABLE_NAME):
print(f"表 {TABLE_NAME} 已存在,正在删除...")
odps.delete_table(TABLE_NAME)
print(f"表 {TABLE_NAME} 已删除")
# 定义表 schema
columns = [
# 用户侧标量字段 (bigint)
Column(name="user_id", type="bigint"),
Column(name="user_active_degree", type="bigint"),
Column(name="follow_user_num_range", type="bigint"),
Column(name="fans_user_num_range", type="bigint"),
Column(name="friend_user_num_range", type="bigint"),
Column(name="register_days_range", type="bigint"),
# 历史序列字段 (array)
Column(name="video_id", type="array"),
Column(name="action_timestamp", type="array"),
Column(name="action_weight", type="array"),
Column(name="watch_time", type="array"),
# 候选物料字段 (array)
Column(name="item_video_id", type="array"),
Column(name="item_action_weight", type="array"),
Column(name="item_target_watchtime", type="array"),
Column(name="item_query_time", type="array"),
]
schema = Schema(columns=columns)
odps.create_table(TABLE_NAME, schema)
print(f"表 {TABLE_NAME} 创建成功")
# ========== 4. 读取 Parquet 并上传数据 ==========
PARQUET_PATH = "kuairand-27k-train-0.parquet"
df = pd.read_parquet(PARQUET_PATH)
print(f"Parquet 读取完成,共 {len(df)} 行")
# 将 numpy 数组转为 Python list(PyODPS 要求原生 Python 类型)
array_cols = ["video_id", "action_timestamp", "action_weight", "watch_time",
"item_video_id", "item_action_weight", "item_target_watchtime", "item_query_time"]
for col in array_cols:
df[col] = df[col].apply(lambda x: list(x) if isinstance(x, np.ndarray) else (x if isinstance(x, list) else []))
# 标量列确保为 Python int
scalar_cols = ["user_id", "user_active_degree", "follow_user_num_range",
"fans_user_num_range", "friend_user_num_range", "register_days_range"]
for col in scalar_cols:
df[col] = df[col].astype(int)
# 使用 Tunnel 上传
table = odps.get_table(TABLE_NAME)
print(f"开始上传数据(共 {len(df)} 行)...")
with table.open_writer() as writer:
records = []
for idx, row in df.iterrows():
record = [
int(row["user_id"]),
int(row["user_active_degree"]),
int(row["follow_user_num_range"]),
int(row["fans_user_num_range"]),
int(row["friend_user_num_range"]),
int(row["register_days_range"]),
list(row["video_id"]),
list(row["action_timestamp"]),
list(row["action_weight"]),
list(row["watch_time"]),
list(row["item_video_id"]),
list(row["item_action_weight"]),
list(row["item_target_watchtime"]),
list(row["item_query_time"]),
]
records.append(table.new_record(record))
if (idx + 1) % 100 == 0:
print(f"已处理 {idx + 1}/{len(df)} 行")
writer.write(records)
print(f"数据上传完成!共上传 {len(records)} 行到 {TABLE_NAME}")
5. 执行
先设置环境变量,然后运行脚本:
source env.sh # 设置 ACCESS_ID、ACCESS_KEY 环境变量
python upload_to_odps.py
正常情况下输出类似:
project_name: pairec
endpoint: http://service.cn.maxcompute.aliyun.com/api
MaxCompute 连接成功
表 pairec_kuairand_train 创建成功
Parquet 读取完成,共 1257 行
开始上传数据(共 1257 行)...
已处理 100/1257 行
...
已处理 1200/1257 行
数据上传完成!共上传 1257 行到 pairec_kuairand_train
看到以上日志,表示数据已顺利写入 MaxCompute。
6. 常见问题与注意事项
Endpoint 与 Project 不匹配
MaxCompute 的 Project 绑定至特定 Region 的 Endpoint。遇到 Project not found 错误,多半是 Endpoint 配置有误。请确认 Project 所在 Region 并切换对应 Endpoint。例如公网华东2的 Endpoint 为 service.cn.maxcompute.aliyun.com:
| Endpoint | 适用场景 |
|---|---|
service.cn.maxcompute.aliyun.com | 公网(华东2) |
AccessKey 与 Endpoint 网络域不匹配
公网 AccessKey 仅能与公网 Endpoint 配合,内网 AccessKey 仅能用于内网 Endpoint。混用将引发 AccessKeyIdNotFound 错误。这是常见的陷阱,务必区分。
PyODPS 要求原生 Python 类型
使用 open_writer() 写入时,PyODPS 不接受 numpy 的 int64 或 ndarray,必须转换为 Python 原生 int 和 list。上述脚本已实现自动转换;若自行编写,切勿遗漏,否则直接报类型错误。
批量上传性能
若数据量极大(百万行以上),建议分批写入以降低内存消耗。例如每批 5000 条:
BATCH_SIZE = 5000
with table.open_writer() as writer:
batch = []
for idx, row in df.iterrows():
batch.append(table.new_record([...]))
if len(batch) >= BATCH_SIZE:
writer.write(batch)
batch = []
if batch:
writer.write(batch)
该模式兼顾内存效率与写入速度,尤其适合生产环境。
