Phoenix映射HBase时间戳的一种实现

本文涉及的产品
Redis 开源版,标准版 2GB
推荐场景:
搭建游戏排行榜
云数据库 Tair(兼容Redis),内存型 2GB
云原生多模数据库 Lindorm,多引擎 多规格 0-4节点
简介: 本文介绍了Phoenix映射HBase时间戳的一种实现,希望抛砖引玉,读者根据自己的实际情况设计其他的实现方式。

HBase用户福利

新用户9.9元即可使用6个月云数据库HBase,更有低至1元包年的入门规格供广大HBase爱好者学习研究,更多内容请参考链接

官方实现

Apache Phoenix从4.6版本开始,提供了ROW_TIMESTAMP标签,来映射HBase的原生时间戳。但使用起来有以下限制:

  • 只有主键中的TIME, DATE, TIMESTAMP, BIGINT, UNSIGNED_LONG类型的字段才能设置成ROW_TIMESTAMP
  • 只能有一个主键列能被设置成ROW_TIMESTAMP
  • ROW_TIMESTAMP标志的字段不能为null值
  • 只有在建表的时候,某一列才能被设置成ROW_TIMESTAMP
  • ROW_TIMESTAMP标志的列不能为负数

除了上面使用上的限制,还有应用场景的限制。根据上面的描述,ROW_TIMESTAMP字段有以下几种形式。

  • 业务主键在前
    业务主键在前
  • ROW_TIMESTAMP字段在前
    ROW_TIMESTAMP字段在前
  • 只有ROW_TIMESTAMP字段
    只有ROW_TIMESTAMP字段

我们来看下各个形式的优劣

  • 业务主键在前。无论ROW_TIMESTAMP字段如何取值,都可以通过业务主键1进行单点查询,即在知道业务主键1的情况下是可以通过前缀精确快速的查询的。
  • ROW_TIMESTAMP字段在前。如果不知道某条数据对应的ROW_TIMESTAMP字段值,则无法通过主键查询;如果通过业务主键可以映射ROW_TIMESTAMP字段值,虽然可以通过主键查询,但该字段将无法修改。因为修改就意味着当前记录删除,重新插入。
  • 只有ROW_TIMESTAMP字段。在一些时序数据比较常见,也就是没有业务主键,不会也不便通过主键查询,一般都是范围扫描。

其实官方提供的ROW_TIMESTAMP字段实现,最大的问题就是原有记录不能更新,只能删除、然后插入,这就极大的限制了它的应用场景。

我们的实现

背景

我们用Phoenix存储了所有需要实时查询的表,写Phoenix-Sql查询当前最新的数据。基本架构如下:

基本架构

问题

正常情况下,实时抽取MySQL的binlog,写入Phoenix;每天会有Hive批量抽取MySQL数据,对Phoenix进行校验、补数。
实时写入时,需要考虑binlog更新的顺序,至少要做到MySQL原数据每行更新的顺序;离线补数时,需要考虑是否会覆盖实时写入的数据。

实时写入

实时写入的顺序,大都由CDC(canal、debezium等)控制。针对每一条数据的更新,CDC都会对“表名+主键”进行Hash,路由到Kafka对应的分区。其实针对某个表某条记录的更新,消费时是有严格的顺序的。但如果后期更改kafka分区个数,就会稍微麻烦点。如果不停服更新,就意味着同一条记录的不同更新,分布在不同的分区,也就不能保证严格的顺序,插入Phoenix表就会出现覆盖的问题。如果停服更新,就需要先停掉CDC,等消费者把数据消费完,然后再调整分区,启动消费者,这样才能避免相互覆盖的问题。

实时写入还有一个潜在的问题,那就是数据丢失。不管是网络抖动,还是组件的健壮性,都会造成数据丢失。一旦发生数据丢失,就需要校验、补数的逻辑。

离线补数

离线补数就是为了防止出现实时数据丢失的问题。离线补数包含校验和补数两个步骤。

  • 校验。拿当前全量或增量数据,与Phoenix表中相同主键的数据进行比对,确定Phoenix是否丢数或丢失更新。
    • 丢数就是Phoenix应该有的数据却没有
    • 丢失更新就是Phoenix的数据不是最新的
  • 补数。根据上一步骤计算的丢失的数据或更新,写入Phoenix

离线补数看似完美,但最大的问题就是,校验和补数是两个步骤,也就是说不在一个事务里面。有可能某条数据在校验阶段,的确是丢失的,但在校验之后、补数之前,该条数据又被写到Phoenix表了,那么在补数之后,该数据又被更新成旧数据了。

解决方案

细心的读者会发现,使用官方提供的ROW_TIMESTAMP是无法很好的解决数据乱序覆盖的问题的。
那么,究竟该怎么办呢?有没有一种方案能完美的解决上面的问题呢?下面是我解决这个问题的思路和具体实现。

思路

熟悉HBase的读者一定知道,HBase插入或更新数据的时候是可以指定时间戳(版本号)的,而且HBase查询时默认显示时间戳最大的数据。那如果Phoenix在根据主键写入数据时,能把该条数据的更新时间写入HBase的时间戳字段,是不是就能解决相互覆盖的问题了呢?
的确能。
其实每一条更新都是数据的一个版本。如果写入时能指定时间戳,就意味着指定了数据的版本,无论每个更新到达的顺序是怎样的,Phoenix读取时都会读取最新的数据。
如果能实现,那么Kafka重新设定分区个数和离线补数将不再需要考虑覆盖的问题。
但Phoenix目前并没有实现上面逻辑的机制,我们需要对其进行简单的升级。

实现方案

其实在Phoenix官方实现中,有一个CurrentSCN属性,它可以控制每一次DDL、DML、QUERY的时间戳的,也就是在插入或更新时,会根据CurrentSCN的值设定当前数据对应的HBase的时间戳。但很不幸,它只能控制每一次commit的数据,也就是无法精确控制每一条数据的时间戳。当然了,如果每一条数据Upsert时都设置CurrentSCN,然后commit也是可以解决问题的,但这就无法进行批量提交,会一定程度的影响性能。

其实我在实现时也是参考了CurrentSCN属性的原理。
经过分析,我找到了MutationState类的generateMutations方法的下面一段代码。

PRow row = table.newRow(connection.getKeyValueBuilder(), timestampToUse, key, hasOnDupKey);

上面的代码其实是创建了一条数据,后续的upset的数据就是由此而来。根据timestampToUse命名可以猜想,它就是该条数据的时间戳。


/**
     * Creates a new row at the specified timestamp using the key
     * for the PK values (from {@link #newKey(ImmutableBytesWritable, byte[][])}
     * and the optional key values specified using values.
     * @param ts the timestamp that the key value will have when committed
     * @param key the row key of the key value
     * @param hasOnDupKey true if row has an ON DUPLICATE KEY clause and false otherwise.
     * @param values the optional key values
     * @return the new row. Use {@link org.apache.phoenix.schema.PRow#toRowMutations()} to
     * generate the Row to send to the HBase server.
     * @throws ConstraintViolationException if row data violates schema
     * constraint
     */
    PRow newRow(KeyValueBuilder builder, long ts, ImmutableBytesWritable key, boolean hasOnDupKey, byte[]... values);

由newRow的描述可以确定我们的猜想,timestampToUse就是当前数据的时间戳。

根据调用链,我们找到了timestampToUse赋值最近的地方:UpsertCompiler.setValues方法,里面有一个RowTimestampColInfo类型的rowTsColInfo字段。其实还是找到timestampToUse最初的地方,也就是获取CurrentSCN的代码段,但考虑到不对原有的CurrentSCN功能过多干涉,我们选择优化UpsertCompiler.setValues方法。下面是改造后的代码片段:

for (int i = 0, j = numSplColumns; j < values.length; j++, i++) {
            byte[] value = values[j];
            PColumn column = table.getColumns().get(columnIndexes[i]);
            if (SchemaUtil.isPKColumn(column)) {
                pkValues[pkSlotIndex[i]] = value;
                if (SchemaUtil.getPKPosition(table, column) == table.getRowTimestampColPos()) {
                    if (!useServerTimestamp) {
                        PColumn rowTimestampCol = table.getPKColumns().get(table.getRowTimestampColPos());
                        rowTimestamp = PLong.INSTANCE.getCodec().decodeLong(value, 0, rowTimestampCol.getSortOrder());
                        if (rowTimestamp < 0) {
                            throw new IllegalDataException("Value of a column designated as ROW_TIMESTAMP cannot be less than zero");
                        }
                        rowTsColInfo = new RowTimestampColInfo(useServerTimestamp, rowTimestamp);
                    } 
                }
            } else {
                columnValues.put(column, value);
                columnValueSize += (column.getEstimatedSize() + value.length);
            }
            if(column.getDataType().getSqlTypeName().equals(PRowts.INSTANCE.getSqlTypeName()) && rowTimestamp == null){
                rowTimestamp = PLong.INSTANCE.getCodec().decodeLong(value, 0, column.getSortOrder());
                if (rowTimestamp < 0) {
                    throw new IllegalDataException("Value of a column designated as ROW_TS cannot be less than zero");
                }
                rowTsColInfo = new RowTimestampColInfo(useServerTimestamp, rowTimestamp);
            }
        }

在处理每行数据每个字段值的时候,判断当前字段类型是否为PRowts类型,如果是,则根据该值创建RowTimestampColInfo。这样就达到了根据数据改变HBase时间戳的目的。

考虑到快速、简单的实现PRowts类型,我们选择将PRowts设定为Long类型的别名,其实就是根据PLong类创建PRowts,二者的逻辑完全一致。只不过个别参数名称不同。
下面是PRowts的默认构造函数。


private PRowts() {
        super("ROW_TS", 21, Long.class, new PLong.LongCodec(), 48);
    }

至此,我们就实现了将数据的时间戳映射到HBase的时间戳的功能。简单来说分为两步:

  1. 新增PRowts类型。创建表时,指定某个字段为PRowts,该字段原始类型必须是long;或者修改字段的类型为PRowts。
  2. 根据数据构造HBase的Put命令时,将PRowts的值写入row timestamp

实现过程看似简单,但作者还是花了很大的精力阅读、梳理Phoenix的源码的,只有在了解的基础上才能进行改造、升级。
当然,限于篇幅还是有很多细节没有解释的,而且也不一定选择改造UpsertCompiler.setValues,读者可以根据实际情况自行实现。另外也可以扩展PRowts,使其支持其他时间类型的数据,比如TIME、DATE、TIMESTAMP、BIGINT。

HBase技术交流

欢迎加群进一步交流沟通:
image.png
image.png

相关实践学习
lindorm多模间数据无缝流转
展现了Lindorm多模融合能力——用kafka API写入,无缝流转在各引擎内进行数据存储和计算的实验。
云数据库HBase版使用教程
&nbsp; 相关的阿里云产品:云数据库 HBase 版 面向大数据领域的一站式NoSQL服务,100%兼容开源HBase并深度扩展,支持海量数据下的实时存储、高并发吞吐、轻SQL分析、全文检索、时序时空查询等能力,是风控、推荐、广告、物联网、车联网、Feeds流、数据大屏等场景首选数据库,是为淘宝、支付宝、菜鸟等众多阿里核心业务提供关键支撑的数据库。 了解产品详情:&nbsp;https://cn.aliyun.com/product/hbase &nbsp; ------------------------------------------------------------------------- 阿里云数据库体验:数据库上云实战 开发者云会免费提供一台带自建MySQL的源数据库&nbsp;ECS 实例和一台目标数据库&nbsp;RDS实例。跟着指引,您可以一步步实现将ECS自建数据库迁移到目标数据库RDS。 点击下方链接,领取免费ECS&amp;RDS资源,30分钟完成数据库上云实战!https://developer.aliyun.com/adc/scenario/51eefbd1894e42f6bb9acacadd3f9121?spm=a2c6h.13788135.J_3257954370.9.4ba85f24utseFl
目录
相关文章
|
8月前
|
SQL 分布式数据库 HIVE
Hbase二级索引_Hive on Hbase 及phoenix详解
Hbase二级索引_Hive on Hbase 及phoenix详解
89 0
|
分布式数据库 Hbase
|
SQL 物联网 关系型数据库
实时即未来,车联网项目之phoenix on hbase 即席查询【四】
实时即未来,车联网项目之phoenix on hbase 即席查询【四】
223 0
|
分布式计算 分布式数据库 Spark
Phoenix-基于HBase的低延迟操作 头歌——答案
Phoenix-基于HBase的低延迟操作 头歌——答案
443 0
|
存储 SQL 分布式数据库
phoenix连接hbase时的bug处理通用方法(亲测)
phoenix连接hbase时的bug处理通用方法(亲测)
771 0
|
SQL 搜索推荐 Java
「从零单排HBase 12」HBase二级索引Phoenix使用与最佳实践
「从零单排HBase 12」HBase二级索引Phoenix使用与最佳实践
555 0
「从零单排HBase 12」HBase二级索引Phoenix使用与最佳实践
|
SQL 存储 缓存
【Hive】如何在 Hive 中创建外部表映射 Hbase 中已存在的表
【Hive】如何在 Hive 中创建外部表映射 Hbase 中已存在的表
1207 0
|
SQL 存储 分布式数据库
hbase的表映射到hive中
1.本文主要说一下怎么把hbase中的表映射到hive中,说之前我们先简单说一下hive的内部表和外部表的区别; (1),被external关键字修饰的表是外部表,没有被external关键字修饰的表是内部表. (2),内部表数据由Hive自身管理,外部表数据由HDFS管理. (3),内部表数据存储的位置是hive.metastore.warehouse.dir.外部表数据的存储位置由自己确定. (4),删除内部表会直接删除元数据(metadata)及存储数据;删除外部表仅仅会删除元数据,HDFS上的文件并不会被删除;
|
机器学习/深度学习 SQL 分布式计算
云HBase Phoenix索引构建最佳实践
介绍三种的不同的索引构建方法及其适用场景
2671 0