Gravitino与Apache Flink流式处理实战精选

2026-06-11阅读 0热度 0
数据挖掘

本教程聚焦于一个明确目标:利用 Apache Gravitino 与 Apache Flink 搭建流式数据管道。具体任务包括:配置 Gravitino Flink Connector,在 Gravitino 中创建 Hive 与 Paimon Catalog,定义基于 Kafka 的通用表(Generic Table),并通过 Flink SQL 将 Kafka 数据持续写入 Paimon。

整体架构:

前置条件

动手前,先确认环境已达标。

系统要求:

  • Linux 或 macOS 操作系统
  • JDK 17+(Gravitino 服务器必需;本教程假设使用 JDK 17 或更高版本)
  • Apache Flink 1.18(推荐搭配 Gravitino Flink Connector)

必需组件:

  • Gravitino 服务器 v1.2.0 或更新版本(本教程依赖 v1.1.0 之后引入的功能;详见 02-setup-guide/README.md)
  • Hive Metastore(用于 Hive Catalog)
  • Apache Kafka 集群(作为 Kafka 源表的数据源)

建议版本:

  • Apache Paimon Connector JAR,需与 Flink 版本严格匹配

验证 Java 和 Flink 的基本可用性:

${JAVA_HOME}/bin/java -version
${FLINK_HOME}/bin/flink --version

分步操作指南

步骤 1:设定环境变量

以下变量贯穿整个教程,请根据实际环境替换对应值:

export GRAVITINO_URI="http://localhost:8090"
export METALAKE_NAME="default_metalake"
export HIVE_METASTORE_URI="thrift://localhost:9083"
export PAIMON_WAREHOUSE="file:///tmp/paimon-warehouse"
export KAFKA_BROKERS="localhost:9092"

步骤 2:在 Gravitino 中创建 Hive 与 Paimon Catalog

直接调用 Gravitino REST API 完成创建。如需传递 Hive 专属配置(例如 hive-conf-dir),请使用 flink.bypass. 前缀,这些参数会被自动转发至 Flink Hive Connector。

# 创建 Hive catalog
curl -X POST -H "Accept: application/vnd.gravitino.v1+json" 
  -H "Content-Type: application/json" -d '{
    "name": "hive_catalog",
    "type": "relational",
    "comment": "Hive catalog for Flink streaming",
    "provider": "hive",
    "properties": {
      "metastore.uris": "'"$HIVE_METASTORE_URI"'"
    }
  }' ${GRAVITINO_URI}/api/metalakes/${METALAKE_NAME}/catalogs

# 创建 Paimon catalog(采用文件系统后端)
curl -X POST -H "Accept: application/vnd.gravitino.v1+json" 
  -H "Content-Type: application/json" -d '{
    "name": "paimon_catalog",
    "type": "relational",
    "comment": "Paimon catalog for Flink streaming",
    "provider": "lakehouse-paimon",
    "properties": {
      "catalog-backend": "filesystem",
      "warehouse": "'"$PAIMON_WAREHOUSE"'"
    }
  }' ${GRAVITINO_URI}/api/metalakes/${METALAKE_NAME}/catalogs

步骤 3:在 Flink 中安装所需 JAR 包

将以下 JAR 文件放入 FLINK_HOME/lib 目录,Flink SQL 客户端启动时会自动加载:

  • gravitino-flink-connector-runtime-1.18_2.12-.jar
  • paimon-flink-1.18-.jar
  • flink-sql-connector-kafka-.jar
  • Flink HiveCatalog 依赖的 Hive 相关 JAR(与 Flink-Hive 集成要求一致)

注意:Kafka SQL Connector 未包含在 Flink 官方发行版中,需额外手动添加。

步骤 4:配置 Flink 使用 Gravitino Catalog Store

编辑 FLINK_HOME/conf/flink-conf.yaml,追加以下配置项,并将变量替换为实际值:

table.catalog-store.kind: gravitino
table.catalog-store.gravitino.gravitino.metalake: ${METALAKE_NAME}
table.catalog-store.gravitino.gravitino.uri: ${GRAVITINO_URI}

若 Flink 集群正在运行,请重启使其生效。验证集群状态:

${FLINK_HOME}/bin/start-cluster.sh
curl -sS http://localhost:8081/overview

如果 curl 返回连接拒绝,步骤 7 中的 INSERT INTO ... SELECT ... 将失败,因为 SQL 客户端无法向集群提交作业。

步骤 5:在 Hive Catalog 中创建 Kafka 通用表

Flink 的 HiveCatalog 同时支持 Hive 兼容表与通用表。在 HiveCatalog 中,若不显式指定 'connector' = 'hive' 或使用 Hive dialect,表默认为通用表。下面我们创建一个 Kafka 通用表,元数据保存在 Hive Metastore,数据由 Flink 从 Kafka 读取。如需 Hive 兼容表,请改用 Hive dialect 或设置 'connector' = 'hive'

启动 Flink SQL 客户端:

${FLINK_HOME}/bin/sql-client.sh

在 SQL 客户端中执行以下语句:

-- 使用 Gravitino 管理的 Hive catalog
USE CATALOG hive_catalog;

CREATE DATABASE IF NOT EXISTS streaming_db;
USE streaming_db;

-- Kafka 源表,作为通用表存储在 Hive catalog 中
CREATE TABLE kafka_events (
  user_id BIGINT,
  item_id BIGINT,
  behavior STRING,
  ts TIMESTAMP_LTZ(3) METADATA FROM 'timestamp',
  WATERMARK FOR ts AS ts - INTERVAL '5' SECOND
) WITH (
  'connector' = 'kafka',
  'topic' = 'user_behavior',
  'properties.bootstrap.servers' = '${KAFKA_BROKERS}', -- 替换为您的 Kafka brokers
  'properties.group.id' = 'gravitino-flink-demo',
  'scan.startup.mode' = 'earliest-offset',
  'format' = 'json',
  'json.ignore-parse-errors' = 'true'
);

关于通用表的关键说明:

  • HiveCatalog 能够管理 Hive 兼容表和通用表。Hive 兼容表以 Hive 原生格式存储,可直接从 Hive 查询。
  • 通用表是 Flink 特有的概念。Hive 虽然能在 Metastore 中看到其元数据,但通常无法解析表内容,从 Hive 侧查询的行为是未定义的。
  • 若要使用默认 dialect 创建 Hive 兼容表,请设置 'connector' = 'hive'。若使用 Hive dialect,则无需指定 connector 属性。
  • 在 Gravitino 中,通用表的 Schema 和分区键会以 flink.* 属性形式存储在 Hive Metastore。当 connector=hive 时,表被视为具有原生 Hive Schema 的 Hive 兼容表。

步骤 6:创建 Paimon 结果表

USE CATALOG paimon_catalog;

CREATE DATABASE IF NOT EXISTS streaming_db;
USE streaming_db;

CREATE TABLE paimon_user_behavior (
  user_id BIGINT,
  item_id BIGINT,
  behavior STRING,
  ts TIMESTAMP_LTZ(3)
);

步骤 7:将数据从 Kafka 流式写入 Paimon

SET 'execution.checkpointing.interval' = '10 s';

INSERT INTO paimon_catalog.streaming_db.paimon_user_behavior
SELECT user_id, item_id, behavior, ts
FROM hive_catalog.streaming_db.kafka_events;

若 Kafka 中 user_behavior 主题持续有数据流入,Flink 会实时将其写入 Paimon 表。
注意:流式写入 Paimon 依赖定期 Checkpoint 才能完成数据提交,不设置 Checkpoint 将导致数据无法持久化。

代码示例

Kafka 示例消息(JSON 格式):

{"user_id": 1, "item_id": 1001, "behavior": "click"}
{"user_id": 2, "item_id": 1002, "behavior": "buy"}

故障排查

以下是常见问题及对应解决方案:

  • Flink 中 Catalog 不可见:首先检查 flink-conf.yamltable.catalog-store.* 配置是否正确,然后确认 Gravitino 服务器网络可达。
  • ClassNotFoundException:确保 Gravitino Connector、Kafka Connector 和 Paimon JAR 均已放置到 FLINK_HOME/lib 目录下。
  • java.net.ConnectException: Connection refused(执行 INSERT INTO 时):Flink SQL 客户端无法访问 JobManager 的 REST 端点(默认 localhost:8081)。使用 ${FLINK_HOME}/bin/start-cluster.sh 启动集群,并通过 curl http://localhost:8081/overview 验证连通性。
  • 作业处于 RUNNING 状态但 Paimon 中无新数据:确保流处理模式下已启用 Checkpoint(例如 SET 'execution.checkpointing.interval' = '10 s';),并在 Flink Web UI 或 /jobs//checkpoints 页面查看 Checkpoint 进度。
  • 作业处于 RUNNING 状态但重启后预期记录被跳过:Kafka 偏移量由 properties.group.id 管理。如需重新消费,请更换一个新的 group id(例如 gravitino-flink-demo-v2)。
  • 表未找到:使用完全限定名引用表,例如 hive_catalog.streaming_db.kafka_eventspaimon_catalog.streaming_db.paimon_user_behavior

恭喜

至此,你已成功完成一个完整的 Gravitino Flink 流式处理实战。当前环境具备以下能力:

  • 妥善配置的 Gravitino Flink Connector,实现统一的 Catalog 访问
  • 在 Gravitino 中注册的 Hive 与 Paimon Catalog,可通过 Flink SQL 直接操作
  • 一个稳定运行的流式管道,从 Kafka 读取数据并持续写入 Paimon
  • 对 HiveCatalog 中通用表与 Hive 兼容表差异的清晰理解

你的 Flink 环境现已准备好利用 Gravitino 在流式数据生态中完成统一的元数据管理。

延伸阅读

如需更高级的配置和详细文档:

  • 查阅 Gravitino Flink Connector 文档,了解更多配置选项与最佳实践
  • 深入学习 Apache Flink SQL,掌握多样化的查询模式
  • 探索 Apache Paimon with Flink,解锁 Paimon 的特有功能

下一步

  • 尝试 Gravitino 的 Iceberg Catalog,参考 03-iceberg-catalog/README.md
  • 使用 Trino 执行联邦查询,参考 06-trino-query/README.md

Apache Gravitino 正处于快速迭代期,本文基于 1.1.0 版本撰写。若遇到问题,建议查阅官方文档或在社区直接提交 Issue。

免责声明

本网站新闻资讯均来自公开渠道,力求准确但不保证绝对无误,内容观点仅代表作者本人,与本站无关。若涉及侵权,请联系我们处理。本站保留对声明的修改权,最终解释权归本站所有。

相关阅读

更多
欢迎回来 登录或注册后,可保存提示词和历史记录
登录后可同步收藏、历史记录和常用模板
注册即表示同意服务条款与隐私政策