Phoenix映射HBase时间戳实现:专业实战指南与代码详解

2026-06-16阅读 0热度 0
时间戳

HBase用户福利

新用户9.9元即可使用6个月云数据库HBase,更有低至1元包年的入门规格供广大HBase爱好者学习研究,更多内容请参考链接

先说几个核心判断。Phoenix从4.6版本开始官方提供了ROW_TIMESTAMP标签,用来映射HBase的原生时间戳,但这个方案在实际使用中限制还真不少。具体来说,只有主键中的TIME、DATE、TIMESTAMP、BIGINT、UNSIGNED_LONG类型的字段才能被设置成ROW_TIMESTAMP;而且只能有一个主键列能被设置成ROW_TIMESTAMP;这个标志的字段还不能为null值;更关键的是,只有在建表的时候才能指定,并且这个列不能为负数。

除了这些使用层面的限制,实际应用场景中也挺让人头疼的。根据官方设计,ROW_TIMESTAMP字段主要有以下几种形式。


业务主键在前
业务主键在前
ROW_TIMESTAMP字段在前
ROW_TIMESTAMP字段在前
只有ROW_TIMESTAMP字段
只有ROW_TIMESTAMP字段

那么,这几种形式各自的优劣如何?


业务主键在前。无论ROW_TIMESTAMP字段怎么取值,都能通过业务主键1做单点查询,这意味着在知道业务主键1的情况下可以快速精确查询,很实用。
ROW_TIMESTAMP字段在前。如果不知道某条数据对应的ROW_TIMESTAMP值,那就没法通过主键查了;反过来如果通过业务主键能映射这个值,虽然可以查了,但要注意这个字段将无法修改——修改就等于删除旧记录再重新插入。
只有ROW_TIMESTAMP字段。这种形式在时序数据中比较常见,也就是说没有业务主键,一般不做单点查询而是范围扫描。

其实官方ROW_TIMESTAMP方案最大的痛点,就是原有记录不能更新,只能先删除再插入,这直接限制了它的应用面。

我们的实现

背景

团队用Phoenix存储了所有需要实时查询的表,通过写Phoenix-SQL来查询当前最新数据。整体的架构是这样的:

基本架构

问题

正常逻辑下,实时抽取MySQL的binlog,写入Phoenix;每天还会有Hive批量抽取MySQL数据来做校验和补数。实时写入时,必须考虑binlog更新的顺序,至少要做到与MySQL原数据每行更新的顺序一致;离线补数时,需要确保不覆盖实时写入的数据。

实时写入

实时写入的顺序,多数由CDC(比如canal、debezium)控制。针对每一条数据的更新,CDC会按“表名+主键”进行哈希,然后路由到Kafka对应的分区。也就是说,某个表某条记录的更新在消费时是有严格顺序的。但问题在于,后期如果更改Kafka分区数就会比较棘手。如果不停服更新,同一条记录的不同更新可能会跑到不同分区,顺序就无法保证了,插入Phoenix时就会出现覆盖问题。反过来,如果停服更新,就需要先停掉CDC,等消费者消费完,再调整分区,最后重启消费者——这样才能避免相互覆盖。

实时写入还有个潜在风险,那就是数据丢失。无论是网络抖动还是组件健壮性,都可能导致丢数据。一旦发生,就需要走校验和补数的逻辑。

离线补数

离线补数就是用来兜底实时数据丢失的。它包含两个步骤:校验和补数。


校验。拿当前全量或增量的数据,与Phoenix表中相同主键的数据比对,看Phoenix有没有丢数或缺失更新。丢数就是Phoenix应该有的数据却没出现;缺失更新则是数据不是最新的。
补数。根据上一步计算出来的丢失数据或更新,写入Phoenix。

离线补数听起来挺完美,但最大的问题是:校验和补数是两个步骤,不在一个事务里。有可能某条数据在校验阶段确实丢了,但校验之后、补数之前,这条数据又被写回来了。那么补数动作一执行,反而把最新数据给覆盖成旧数据了。

解决方案

看到这里应该能明白,使用官方提供的ROW_TIMESTAMP方案,是没法很好解决数据乱序覆盖问题的。那究竟该怎么办?有没有一种方案能一劳永逸地解决上面所有问题?下面分享一下解决思路和具体实现。

思路

熟悉HBase的朋友一定知道,HBase在插入或更新数据时可以指定时间戳(版本号),而且查询时默认显示时间戳最大的那条数据。那么,如果Phoenix在根据主键写入数据时,能把该条数据的更新时间直接填入HBase的时间戳字段,是不是就能解决覆盖问题了呢?确实可以。

其实每一条更新都是数据的一个版本。如果写入时能指定时间戳,就意味着指定了数据的版本,无论更新到达的顺序如何,Phoenix读到的始终是最新的数据。如果真能实现,那么无论是Kafka重新分区还是离线补数,都不再需要担心覆盖问题了。可惜,Phoenix目前没有这个机制,得我们给它做个简单的升级。

实现方案

其实Phoenix官方有一个CurrentSCN属性,可以控制每次DDL、DML、QUERY的时间戳。也就是说,插入或更新时,它会根据CurrentSCN的值来设定当前数据对应的HBase时间戳。但遗憾的是,它只能控制每次提交的整批数据,没法精确控制每一条数据。当然,如果每Upsert一条数据都设一次CurrentSCN再提交,理论上也能解决问题,但这样就没法做批量提交了,性能会受影响。

实现时参考了一下CurrentSCN的原理。经过分析,在MutationState类的generateMutations方法里找到了这么一段代码:

PRow row = table.newRow(connection.getKeyValueBuilder(), timestampToUse, key, hasOnDupKey);

这段代码是创建一条数据,后续的Upsert数据都由此产生。从命名看,timestampToUse应该就是这条数据的时间戳。

/**
 * Creates a new row at the specified timestamp using the key
 * for the PK values (from {@link #newKey(ImmutableBytesWritable, byte[][])}
 * and the optional key values specified using values.
 * @param ts the timestamp that the key value will ha ve when committed
 * @param key the row key of the key value
 * @param hasOnDupKey true if row has an ON DUPLICATE KEY clause and false otherwise.
 * @param values the optional key values
 * @return the new row. Use {@link org.apache.phoenix.schema.PRow#toRowMutations()} to
 * generate the Row to send to the HBase server.
 * @throws ConstraintViolationException if row data violates schema
 * constraint
 */
PRow newRow(KeyValueBuilder builder, long ts, ImmutableBytesWritable key, boolean hasOnDupKey, byte[]... values);

从newRow的描述来看,timestampToUse确实是当前数据的时间戳。顺着调用链,找到了timestampToUse最近的一次赋值位置:UpsertCompiler.setValues方法,里面有RowTimestampColInfo类型的rowTsColInfo字段。不过为了不干扰原有的CurrentSCN功能,选择了优化UpsertCompiler.setValues方法。改造后的代码片段如下:

for (int i = 0, j = numSplColumns; j < values.length; j++, i++) {
    byte[] value = values[j];
    PColumn column = table.getColumns().get(columnIndexes[i]);
    if (SchemaUtil.isPKColumn(column)) {
        pkValues[pkSlotIndex[i]] = value;
        if (SchemaUtil.getPKPosition(table, column) == table.getRowTimestampColPos()) {
            if (!useServerTimestamp) {
                PColumn rowTimestampCol = table.getPKColumns().get(table.getRowTimestampColPos());
                rowTimestamp = PLong.INSTANCE.getCodec().decodeLong(value, 0, rowTimestampCol.getSortOrder());
                if (rowTimestamp < 0) {
                    throw new IllegalDataException("Value of a column designated as ROW_TIMESTAMP cannot be less than zero");
                }
                rowTsColInfo = new RowTimestampColInfo(useServerTimestamp, rowTimestamp);
            }
        } 
    } else {
        columnValues.put(column, value);
        columnValueSize += (column.getEstimatedSize() + value.length);
    }
    if(column.getDataType().getSqlTypeName().equals(PRowts.INSTANCE.getSqlTypeName()) && rowTimestamp == null){
        rowTimestamp = PLong.INSTANCE.getCodec().decodeLong(value, 0, column.getSortOrder());
        if (rowTimestamp < 0) {
            throw new IllegalDataException("Value of a column designated as ROW_TS cannot be less than zero");
        }
        rowTsColInfo = new RowTimestampColInfo(useServerTimestamp, rowTimestamp);
    }
}

处理每行数据每个字段时,判断当前字段类型是否为PRowts,如果是,就根据该值创建RowTimestampColInfo。这样就实现了根据数据动态改变HBase时间戳的目标。

为了快速实现PRowts类型,选择把它设定为Long类型的别名,也就是说基于PLong类创建PRowts,逻辑完全一样,只是个别参数名不同。下面是PRowts的默认构造函数:

private PRowts() {
    super("ROW_TS", 21, Long.class, new PLong.LongCodec(), 48);
}

至此,就实现了将数据时间戳映射到HBase时间戳的功能。整个过程可以归纳为两步:


新增PRowts类型。创建表时指定某个字段为PRowts,该字段原始类型必须是long;或者修改现有字段的类型为PRowts。
根据数据构造HBase的Put命令时,将PRowts的值写入row timestamp。

实现过程看上去简单,但背后确实花了不少精力去阅读和梳理Phoenix源码,只有真正理解了才能改造升级。篇幅所限,很多细节没法展开说。其实也不一定非要改造UpsertCompiler.setValues,读者完全可以根据实际情况自行实现。另外还可以扩展PRowts,使其支持TIME、DATE、TIMESTAMP、BIGINT等其他时间类型的数据。

免责声明

本网站新闻资讯均来自公开渠道,力求准确但不保证绝对无误,内容观点仅代表作者本人,与本站无关。若涉及侵权,请联系我们处理。本站保留对声明的修改权,最终解释权归本站所有。

相关阅读

更多
欢迎回来 登录或注册后,可保存提示词和历史记录
登录后可同步收藏、历史记录和常用模板
注册即表示同意服务条款与隐私政策