Phoenix映射HBase时间戳实现:专业实战指南与代码详解
HBase用户福利
新用户9.9元即可使用6个月云数据库HBase,更有低至1元包年的入门规格供广大HBase爱好者学习研究,更多内容请参考链接
先说几个核心判断。Phoenix从4.6版本开始官方提供了ROW_TIMESTAMP标签,用来映射HBase的原生时间戳,但这个方案在实际使用中限制还真不少。具体来说,只有主键中的TIME、DATE、TIMESTAMP、BIGINT、UNSIGNED_LONG类型的字段才能被设置成ROW_TIMESTAMP;而且只能有一个主键列能被设置成ROW_TIMESTAMP;这个标志的字段还不能为null值;更关键的是,只有在建表的时候才能指定,并且这个列不能为负数。
除了这些使用层面的限制,实际应用场景中也挺让人头疼的。根据官方设计,ROW_TIMESTAMP字段主要有以下几种形式。
业务主键在前
ROW_TIMESTAMP字段在前
只有ROW_TIMESTAMP字段
那么,这几种形式各自的优劣如何?
业务主键在前。无论ROW_TIMESTAMP字段怎么取值,都能通过业务主键1做单点查询,这意味着在知道业务主键1的情况下可以快速精确查询,很实用。
ROW_TIMESTAMP字段在前。如果不知道某条数据对应的ROW_TIMESTAMP值,那就没法通过主键查了;反过来如果通过业务主键能映射这个值,虽然可以查了,但要注意这个字段将无法修改——修改就等于删除旧记录再重新插入。
只有ROW_TIMESTAMP字段。这种形式在时序数据中比较常见,也就是说没有业务主键,一般不做单点查询而是范围扫描。
其实官方ROW_TIMESTAMP方案最大的痛点,就是原有记录不能更新,只能先删除再插入,这直接限制了它的应用面。
我们的实现
背景
团队用Phoenix存储了所有需要实时查询的表,通过写Phoenix-SQL来查询当前最新数据。整体的架构是这样的:
问题
正常逻辑下,实时抽取MySQL的binlog,写入Phoenix;每天还会有Hive批量抽取MySQL数据来做校验和补数。实时写入时,必须考虑binlog更新的顺序,至少要做到与MySQL原数据每行更新的顺序一致;离线补数时,需要确保不覆盖实时写入的数据。
实时写入
实时写入的顺序,多数由CDC(比如canal、debezium)控制。针对每一条数据的更新,CDC会按“表名+主键”进行哈希,然后路由到Kafka对应的分区。也就是说,某个表某条记录的更新在消费时是有严格顺序的。但问题在于,后期如果更改Kafka分区数就会比较棘手。如果不停服更新,同一条记录的不同更新可能会跑到不同分区,顺序就无法保证了,插入Phoenix时就会出现覆盖问题。反过来,如果停服更新,就需要先停掉CDC,等消费者消费完,再调整分区,最后重启消费者——这样才能避免相互覆盖。
实时写入还有个潜在风险,那就是数据丢失。无论是网络抖动还是组件健壮性,都可能导致丢数据。一旦发生,就需要走校验和补数的逻辑。
离线补数
离线补数就是用来兜底实时数据丢失的。它包含两个步骤:校验和补数。
校验。拿当前全量或增量的数据,与Phoenix表中相同主键的数据比对,看Phoenix有没有丢数或缺失更新。丢数就是Phoenix应该有的数据却没出现;缺失更新则是数据不是最新的。
补数。根据上一步计算出来的丢失数据或更新,写入Phoenix。
离线补数听起来挺完美,但最大的问题是:校验和补数是两个步骤,不在一个事务里。有可能某条数据在校验阶段确实丢了,但校验之后、补数之前,这条数据又被写回来了。那么补数动作一执行,反而把最新数据给覆盖成旧数据了。
解决方案
看到这里应该能明白,使用官方提供的ROW_TIMESTAMP方案,是没法很好解决数据乱序覆盖问题的。那究竟该怎么办?有没有一种方案能一劳永逸地解决上面所有问题?下面分享一下解决思路和具体实现。
思路
熟悉HBase的朋友一定知道,HBase在插入或更新数据时可以指定时间戳(版本号),而且查询时默认显示时间戳最大的那条数据。那么,如果Phoenix在根据主键写入数据时,能把该条数据的更新时间直接填入HBase的时间戳字段,是不是就能解决覆盖问题了呢?确实可以。
其实每一条更新都是数据的一个版本。如果写入时能指定时间戳,就意味着指定了数据的版本,无论更新到达的顺序如何,Phoenix读到的始终是最新的数据。如果真能实现,那么无论是Kafka重新分区还是离线补数,都不再需要担心覆盖问题了。可惜,Phoenix目前没有这个机制,得我们给它做个简单的升级。
实现方案
其实Phoenix官方有一个CurrentSCN属性,可以控制每次DDL、DML、QUERY的时间戳。也就是说,插入或更新时,它会根据CurrentSCN的值来设定当前数据对应的HBase时间戳。但遗憾的是,它只能控制每次提交的整批数据,没法精确控制每一条数据。当然,如果每Upsert一条数据都设一次CurrentSCN再提交,理论上也能解决问题,但这样就没法做批量提交了,性能会受影响。
实现时参考了一下CurrentSCN的原理。经过分析,在MutationState类的generateMutations方法里找到了这么一段代码:
PRow row = table.newRow(connection.getKeyValueBuilder(), timestampToUse, key, hasOnDupKey);
这段代码是创建一条数据,后续的Upsert数据都由此产生。从命名看,timestampToUse应该就是这条数据的时间戳。
/**
* Creates a new row at the specified timestamp using the key
* for the PK values (from {@link #newKey(ImmutableBytesWritable, byte[][])}
* and the optional key values specified using values.
* @param ts the timestamp that the key value will ha ve when committed
* @param key the row key of the key value
* @param hasOnDupKey true if row has an ON DUPLICATE KEY clause and false otherwise.
* @param values the optional key values
* @return the new row. Use {@link org.apache.phoenix.schema.PRow#toRowMutations()} to
* generate the Row to send to the HBase server.
* @throws ConstraintViolationException if row data violates schema
* constraint
*/
PRow newRow(KeyValueBuilder builder, long ts, ImmutableBytesWritable key, boolean hasOnDupKey, byte[]... values);
从newRow的描述来看,timestampToUse确实是当前数据的时间戳。顺着调用链,找到了timestampToUse最近的一次赋值位置:UpsertCompiler.setValues方法,里面有RowTimestampColInfo类型的rowTsColInfo字段。不过为了不干扰原有的CurrentSCN功能,选择了优化UpsertCompiler.setValues方法。改造后的代码片段如下:
for (int i = 0, j = numSplColumns; j < values.length; j++, i++) {
byte[] value = values[j];
PColumn column = table.getColumns().get(columnIndexes[i]);
if (SchemaUtil.isPKColumn(column)) {
pkValues[pkSlotIndex[i]] = value;
if (SchemaUtil.getPKPosition(table, column) == table.getRowTimestampColPos()) {
if (!useServerTimestamp) {
PColumn rowTimestampCol = table.getPKColumns().get(table.getRowTimestampColPos());
rowTimestamp = PLong.INSTANCE.getCodec().decodeLong(value, 0, rowTimestampCol.getSortOrder());
if (rowTimestamp < 0) {
throw new IllegalDataException("Value of a column designated as ROW_TIMESTAMP cannot be less than zero");
}
rowTsColInfo = new RowTimestampColInfo(useServerTimestamp, rowTimestamp);
}
}
} else {
columnValues.put(column, value);
columnValueSize += (column.getEstimatedSize() + value.length);
}
if(column.getDataType().getSqlTypeName().equals(PRowts.INSTANCE.getSqlTypeName()) && rowTimestamp == null){
rowTimestamp = PLong.INSTANCE.getCodec().decodeLong(value, 0, column.getSortOrder());
if (rowTimestamp < 0) {
throw new IllegalDataException("Value of a column designated as ROW_TS cannot be less than zero");
}
rowTsColInfo = new RowTimestampColInfo(useServerTimestamp, rowTimestamp);
}
}
处理每行数据每个字段时,判断当前字段类型是否为PRowts,如果是,就根据该值创建RowTimestampColInfo。这样就实现了根据数据动态改变HBase时间戳的目标。
为了快速实现PRowts类型,选择把它设定为Long类型的别名,也就是说基于PLong类创建PRowts,逻辑完全一样,只是个别参数名不同。下面是PRowts的默认构造函数:
private PRowts() {
super("ROW_TS", 21, Long.class, new PLong.LongCodec(), 48);
}
至此,就实现了将数据时间戳映射到HBase时间戳的功能。整个过程可以归纳为两步:
新增PRowts类型。创建表时指定某个字段为PRowts,该字段原始类型必须是long;或者修改现有字段的类型为PRowts。
根据数据构造HBase的Put命令时,将PRowts的值写入row timestamp。
实现过程看上去简单,但背后确实花了不少精力去阅读和梳理Phoenix源码,只有真正理解了才能改造升级。篇幅所限,很多细节没法展开说。其实也不一定非要改造UpsertCompiler.setValues,读者完全可以根据实际情况自行实现。另外还可以扩展PRowts,使其支持TIME、DATE、TIMESTAMP、BIGINT等其他时间类型的数据。
