Gravitino与Apache Flink流式处理实战精选
本教程聚焦于一个明确目标:利用 Apache Gravitino 与 Apache Flink 搭建流式数据管道。具体任务包括:配置 Gravitino Flink Connector,在 Gravitino 中创建 Hive 与 Paimon Catalog,定义基于 Kafka 的通用表(Generic Table),并通过 Flink SQL 将 Kafka 数据持续写入 Paimon。
整体架构:
前置条件
动手前,先确认环境已达标。
系统要求:
- Linux 或 macOS 操作系统
- JDK 17+(Gravitino 服务器必需;本教程假设使用 JDK 17 或更高版本)
- Apache Flink 1.18(推荐搭配 Gravitino Flink Connector)
必需组件:
- Gravitino 服务器 v1.2.0 或更新版本(本教程依赖 v1.1.0 之后引入的功能;详见 02-setup-guide/README.md)
- Hive Metastore(用于 Hive Catalog)
- Apache Kafka 集群(作为 Kafka 源表的数据源)
建议版本:
- Apache Paimon Connector JAR,需与 Flink 版本严格匹配
验证 Java 和 Flink 的基本可用性:
${JAVA_HOME}/bin/java -version
${FLINK_HOME}/bin/flink --version
分步操作指南
步骤 1:设定环境变量
以下变量贯穿整个教程,请根据实际环境替换对应值:
export GRAVITINO_URI="http://localhost:8090"
export METALAKE_NAME="default_metalake"
export HIVE_METASTORE_URI="thrift://localhost:9083"
export PAIMON_WAREHOUSE="file:///tmp/paimon-warehouse"
export KAFKA_BROKERS="localhost:9092"
步骤 2:在 Gravitino 中创建 Hive 与 Paimon Catalog
直接调用 Gravitino REST API 完成创建。如需传递 Hive 专属配置(例如 hive-conf-dir),请使用 flink.bypass. 前缀,这些参数会被自动转发至 Flink Hive Connector。
# 创建 Hive catalog
curl -X POST -H "Accept: application/vnd.gravitino.v1+json"
-H "Content-Type: application/json" -d '{
"name": "hive_catalog",
"type": "relational",
"comment": "Hive catalog for Flink streaming",
"provider": "hive",
"properties": {
"metastore.uris": "'"$HIVE_METASTORE_URI"'"
}
}' ${GRAVITINO_URI}/api/metalakes/${METALAKE_NAME}/catalogs
# 创建 Paimon catalog(采用文件系统后端)
curl -X POST -H "Accept: application/vnd.gravitino.v1+json"
-H "Content-Type: application/json" -d '{
"name": "paimon_catalog",
"type": "relational",
"comment": "Paimon catalog for Flink streaming",
"provider": "lakehouse-paimon",
"properties": {
"catalog-backend": "filesystem",
"warehouse": "'"$PAIMON_WAREHOUSE"'"
}
}' ${GRAVITINO_URI}/api/metalakes/${METALAKE_NAME}/catalogs
步骤 3:在 Flink 中安装所需 JAR 包
将以下 JAR 文件放入 FLINK_HOME/lib 目录,Flink SQL 客户端启动时会自动加载:
gravitino-flink-connector-runtime-1.18_2.12-.jar paimon-flink-1.18-.jar flink-sql-connector-kafka-.jar - Flink HiveCatalog 依赖的 Hive 相关 JAR(与 Flink-Hive 集成要求一致)
注意:Kafka SQL Connector 未包含在 Flink 官方发行版中,需额外手动添加。
步骤 4:配置 Flink 使用 Gravitino Catalog Store
编辑 FLINK_HOME/conf/flink-conf.yaml,追加以下配置项,并将变量替换为实际值:
table.catalog-store.kind: gravitino
table.catalog-store.gravitino.gravitino.metalake: ${METALAKE_NAME}
table.catalog-store.gravitino.gravitino.uri: ${GRAVITINO_URI}
若 Flink 集群正在运行,请重启使其生效。验证集群状态:
${FLINK_HOME}/bin/start-cluster.sh
curl -sS http://localhost:8081/overview
如果 curl 返回连接拒绝,步骤 7 中的 INSERT INTO ... SELECT ... 将失败,因为 SQL 客户端无法向集群提交作业。
步骤 5:在 Hive Catalog 中创建 Kafka 通用表
Flink 的 HiveCatalog 同时支持 Hive 兼容表与通用表。在 HiveCatalog 中,若不显式指定 'connector' = 'hive' 或使用 Hive dialect,表默认为通用表。下面我们创建一个 Kafka 通用表,元数据保存在 Hive Metastore,数据由 Flink 从 Kafka 读取。如需 Hive 兼容表,请改用 Hive dialect 或设置 'connector' = 'hive'。
启动 Flink SQL 客户端:
${FLINK_HOME}/bin/sql-client.sh
在 SQL 客户端中执行以下语句:
-- 使用 Gravitino 管理的 Hive catalog
USE CATALOG hive_catalog;
CREATE DATABASE IF NOT EXISTS streaming_db;
USE streaming_db;
-- Kafka 源表,作为通用表存储在 Hive catalog 中
CREATE TABLE kafka_events (
user_id BIGINT,
item_id BIGINT,
behavior STRING,
ts TIMESTAMP_LTZ(3) METADATA FROM 'timestamp',
WATERMARK FOR ts AS ts - INTERVAL '5' SECOND
) WITH (
'connector' = 'kafka',
'topic' = 'user_behavior',
'properties.bootstrap.servers' = '${KAFKA_BROKERS}', -- 替换为您的 Kafka brokers
'properties.group.id' = 'gravitino-flink-demo',
'scan.startup.mode' = 'earliest-offset',
'format' = 'json',
'json.ignore-parse-errors' = 'true'
);
关于通用表的关键说明:
- HiveCatalog 能够管理 Hive 兼容表和通用表。Hive 兼容表以 Hive 原生格式存储,可直接从 Hive 查询。
- 通用表是 Flink 特有的概念。Hive 虽然能在 Metastore 中看到其元数据,但通常无法解析表内容,从 Hive 侧查询的行为是未定义的。
- 若要使用默认 dialect 创建 Hive 兼容表,请设置
'connector' = 'hive'。若使用 Hive dialect,则无需指定connector属性。 - 在 Gravitino 中,通用表的 Schema 和分区键会以
flink.*属性形式存储在 Hive Metastore。当connector=hive时,表被视为具有原生 Hive Schema 的 Hive 兼容表。
步骤 6:创建 Paimon 结果表
USE CATALOG paimon_catalog;
CREATE DATABASE IF NOT EXISTS streaming_db;
USE streaming_db;
CREATE TABLE paimon_user_behavior (
user_id BIGINT,
item_id BIGINT,
behavior STRING,
ts TIMESTAMP_LTZ(3)
);
步骤 7:将数据从 Kafka 流式写入 Paimon
SET 'execution.checkpointing.interval' = '10 s';
INSERT INTO paimon_catalog.streaming_db.paimon_user_behavior
SELECT user_id, item_id, behavior, ts
FROM hive_catalog.streaming_db.kafka_events;
若 Kafka 中 user_behavior 主题持续有数据流入,Flink 会实时将其写入 Paimon 表。
注意:流式写入 Paimon 依赖定期 Checkpoint 才能完成数据提交,不设置 Checkpoint 将导致数据无法持久化。
代码示例
Kafka 示例消息(JSON 格式):
{"user_id": 1, "item_id": 1001, "behavior": "click"}
{"user_id": 2, "item_id": 1002, "behavior": "buy"}
故障排查
以下是常见问题及对应解决方案:
- Flink 中 Catalog 不可见:首先检查
flink-conf.yaml中table.catalog-store.*配置是否正确,然后确认 Gravitino 服务器网络可达。 - ClassNotFoundException:确保 Gravitino Connector、Kafka Connector 和 Paimon JAR 均已放置到
FLINK_HOME/lib目录下。 - java.net.ConnectException: Connection refused(执行
INSERT INTO时):Flink SQL 客户端无法访问 JobManager 的 REST 端点(默认localhost:8081)。使用${FLINK_HOME}/bin/start-cluster.sh启动集群,并通过curl http://localhost:8081/overview验证连通性。 - 作业处于 RUNNING 状态但 Paimon 中无新数据:确保流处理模式下已启用 Checkpoint(例如
SET 'execution.checkpointing.interval' = '10 s';),并在 Flink Web UI 或/jobs/页面查看 Checkpoint 进度。/checkpoints - 作业处于 RUNNING 状态但重启后预期记录被跳过:Kafka 偏移量由
properties.group.id管理。如需重新消费,请更换一个新的 group id(例如gravitino-flink-demo-v2)。 - 表未找到:使用完全限定名引用表,例如
hive_catalog.streaming_db.kafka_events和paimon_catalog.streaming_db.paimon_user_behavior。
恭喜
至此,你已成功完成一个完整的 Gravitino Flink 流式处理实战。当前环境具备以下能力:
- 妥善配置的 Gravitino Flink Connector,实现统一的 Catalog 访问
- 在 Gravitino 中注册的 Hive 与 Paimon Catalog,可通过 Flink SQL 直接操作
- 一个稳定运行的流式管道,从 Kafka 读取数据并持续写入 Paimon
- 对 HiveCatalog 中通用表与 Hive 兼容表差异的清晰理解
你的 Flink 环境现已准备好利用 Gravitino 在流式数据生态中完成统一的元数据管理。
延伸阅读
如需更高级的配置和详细文档:
- 查阅 Gravitino Flink Connector 文档,了解更多配置选项与最佳实践
- 深入学习 Apache Flink SQL,掌握多样化的查询模式
- 探索 Apache Paimon with Flink,解锁 Paimon 的特有功能
下一步
- 尝试 Gravitino 的 Iceberg Catalog,参考 03-iceberg-catalog/README.md
- 使用 Trino 执行联邦查询,参考 06-trino-query/README.md
Apache Gravitino 正处于快速迭代期,本文基于 1.1.0 版本撰写。若遇到问题,建议查阅官方文档或在社区直接提交 Issue。
