HBase2 使用协处理器删除指定qualifier的全部数据

简介:

用户画像的场景中,通常会开发很多标签,每个标签作为一个qualifier,其中有一些不再使用后需要下线,但hbase提供的delete相关api都只能针对单行,要清理某个qualifier的全部数据不太容易,这里提供一个基于协处理器的实现方案;

hbase对于compact过程提供了以下5个hook可以嵌入自定义代码:

  • preCompactSelection
  • postCompactSelection
  • preCompactScannerOpen
  • preCompact
  • postCompact

而preCompact会在创建了storeScanner之后读取数据之前调用,因此这里的思路就是对scanner进行代理,创建一个新的scanner实现其next方法,进而对读取到的原始数据进行加工;

代码如下,参考了hbase-examples模块中的ValueRewritingObserver类:

public class QualifierDeletingObserver implements RegionObserver, RegionCoprocessor {
  private static final Logger LOG = LoggerFactory.getLogger(QualifierDeletingObserver.class);

  private byte[] qualifierToDelete = null;
  private Bytes.ByteArrayComparator comparator;

  @Override
  public Optional<RegionObserver> getRegionObserver() {
    // Extremely important to be sure that the coprocessor is invoked as a RegionObserver
    return Optional.of(this);
  }

  @Override
  public void start(
      @SuppressWarnings("rawtypes") CoprocessorEnvironment env) throws IOException {
    RegionCoprocessorEnvironment renv = (RegionCoprocessorEnvironment) env;
    qualifierToDelete = Bytes.toBytes(renv.getConfiguration().get("qualifier.to.delete"));
    comparator = new Bytes.ByteArrayComparator();
  }

  @Override
  public InternalScanner preCompact(
      ObserverContext<RegionCoprocessorEnvironment> c, Store store,
      final InternalScanner scanner, ScanType scanType, CompactionLifeCycleTracker tracker,
      CompactionRequest request) {
    InternalScanner modifyingScanner = new InternalScanner() {
      @Override
      public boolean next(List<Cell> result, ScannerContext scannerContext) throws IOException {
        boolean ret = scanner.next(result, scannerContext);
        for (int i = 0; i < result.size(); i++) {
          Cell c = result.get(i);
          byte[] qualifier = CellUtil.cloneQualifier(c);
          if (comparator.compare(qualifier, qualifierToDelete) == 0) {
            result.remove(i);
          }
        }
        return ret;
      }

      @Override
      public void close() throws IOException {
        scanner.close();
      }
    };

    return modifyingScanner;
  }
}

打成jar包上传到hdfs;

以下是简单的测试过程展示;

create 'cp_test','f'

put 'cp_test','rk1','f:q1','123'
put 'cp_test','rk1','f:q2','123'
put 'cp_test','rk2','f:q1','123'
put 'cp_test','rk2','f:q2','123'
put 'cp_test','rk2','f:q3','123'


hbase(main):015:0> scan 'cp_test'
ROW                                    COLUMN+CELL                                                                                                     
 rk1                                   column=f:q1, timestamp=1590567958995, value=123                                                                 
 rk1                                   column=f:q2, timestamp=1590567959023, value=123                                                                 
 rk2                                   column=f:q1, timestamp=1590567959048, value=123                                                                 
 rk2                                   column=f:q2, timestamp=1590567959073, value=123                                                                 
 rk2                                   column=f:q3, timestamp=1590567959842, value=123        
 

alter 'cp_test' \
, METHOD => 'table_att', 'coprocessor'=>'hdfs://xxx.jar|xxx.QualifierDelexxxtingObserver|1024|qualifier.to.delete=q1'

flush 'cp_test'

major_compact 'cp_test'

hbase(main):017:0> scan 'cp_test'
ROW                                    COLUMN+CELL                                                                                                     
 rk1                                   column=f:q2, timestamp=1590567959023, value=123                                                                 
 rk2                                   column=f:q2, timestamp=1590567959073, value=123                                                                 
 rk2                                   column=f:q3, timestamp=1590567959842, value=123  
相关实践学习
lindorm多模间数据无缝流转
展现了Lindorm多模融合能力——用kafka API写入,无缝流转在各引擎内进行数据存储和计算的实验。
云数据库HBase版使用教程
&nbsp; 相关的阿里云产品:云数据库 HBase 版 面向大数据领域的一站式NoSQL服务,100%兼容开源HBase并深度扩展,支持海量数据下的实时存储、高并发吞吐、轻SQL分析、全文检索、时序时空查询等能力,是风控、推荐、广告、物联网、车联网、Feeds流、数据大屏等场景首选数据库,是为淘宝、支付宝、菜鸟等众多阿里核心业务提供关键支撑的数据库。 了解产品详情:&nbsp;https://cn.aliyun.com/product/hbase &nbsp; ------------------------------------------------------------------------- 阿里云数据库体验:数据库上云实战 开发者云会免费提供一台带自建MySQL的源数据库&nbsp;ECS 实例和一台目标数据库&nbsp;RDS实例。跟着指引,您可以一步步实现将ECS自建数据库迁移到目标数据库RDS。 点击下方链接,领取免费ECS&amp;RDS资源,30分钟完成数据库上云实战!https://developer.aliyun.com/adc/scenario/51eefbd1894e42f6bb9acacadd3f9121?spm=a2c6h.13788135.J_3257954370.9.4ba85f24utseFl
目录
相关文章
|
7月前
|
Java Shell 分布式数据库
【大数据技术Hadoop+Spark】HBase数据模型、Shell操作、Java API示例程序讲解(附源码 超详细)
【大数据技术Hadoop+Spark】HBase数据模型、Shell操作、Java API示例程序讲解(附源码 超详细)
162 0
|
SQL 存储 分布式数据库
【通过Hive清洗、处理和计算原始数据,Hive清洗处理后的结果,将存入Hbase,海量数据随机查询场景从HBase查询数据 】
【通过Hive清洗、处理和计算原始数据,Hive清洗处理后的结果,将存入Hbase,海量数据随机查询场景从HBase查询数据 】
277 0
|
1月前
|
缓存 监控 Shell
如何使用 HBase Shell 进行数据的实时监控和备份?
如何使用 HBase Shell 进行数据的实时监控和备份?
|
1月前
|
Shell 分布式数据库 Hbase
如何使用 HBase Shell 进行数据的批量导入和导出?
如何使用 HBase Shell 进行数据的批量导入和导出?
|
5月前
|
存储 分布式数据库 数据库
Hbase学习二:Hbase数据特点和架构特点
Hbase学习二:Hbase数据特点和架构特点
88 0
|
5月前
|
缓存 监控 Shell
使用 HBase Shell 进行数据的实时监控和备份
使用 HBase Shell 进行数据的实时监控和备份
|
5月前
|
Shell 分布式数据库 Hbase
使用 HBase Shell 进行数据的批量导入和导出
使用 HBase Shell 进行数据的批量导入和导出
647 6
|
4月前
|
存储 分布式计算 分布式数据库
《HBase MapReduce之旅:我的学习笔记与心得》——跟随我的步伐,一同探索HBase世界,揭开MapReduce的神秘面纱,分享那些挑战与收获,让你在数据的海洋里畅游无阻!
【8月更文挑战第17天】HBase是Apache顶级项目,作为Bigtable的开源版,它是一个非关系型、分布式数据库,具备高可扩展性和性能。结合HDFS存储和MapReduce计算框架,以及Zookeeper协同服务,HBase支持海量数据高效管理。MapReduce通过将任务拆解并在集群上并行执行,极大提升处理速度。学习HBase MapReduce涉及理解其数据模型、编程模型及应用实践,虽然充满挑战,但收获颇丰,对职业发展大有裨益。
50 0
|
5月前
|
存储 Java 分布式数据库
HBase构建图片视频数据的统一存储检索
HBase构建图片视频数据的统一存储检索
|
7月前
|
消息中间件 关系型数据库 MySQL
实时计算 Flink版操作报错合集之使用 Event Time Temporal Join 关联多个 HBase 后,Kafka 数据的某个字段变为 null 是什么原因导致的
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
142 0