开发者社区> 易虹> 正文
阿里云
为了无法计算的价值
打开APP
阿里云APP内打开

HBase2 使用协处理器删除指定qualifier的全部数据

简介:
+关注继续查看

用户画像的场景中,通常会开发很多标签,每个标签作为一个qualifier,其中有一些不再使用后需要下线,但hbase提供的delete相关api都只能针对单行,要清理某个qualifier的全部数据不太容易,这里提供一个基于协处理器的实现方案;

hbase对于compact过程提供了以下5个hook可以嵌入自定义代码:

  • preCompactSelection
  • postCompactSelection
  • preCompactScannerOpen
  • preCompact
  • postCompact

而preCompact会在创建了storeScanner之后读取数据之前调用,因此这里的思路就是对scanner进行代理,创建一个新的scanner实现其next方法,进而对读取到的原始数据进行加工;

代码如下,参考了hbase-examples模块中的ValueRewritingObserver类:

public class QualifierDeletingObserver implements RegionObserver, RegionCoprocessor {
  private static final Logger LOG = LoggerFactory.getLogger(QualifierDeletingObserver.class);

  private byte[] qualifierToDelete = null;
  private Bytes.ByteArrayComparator comparator;

  @Override
  public Optional<RegionObserver> getRegionObserver() {
    // Extremely important to be sure that the coprocessor is invoked as a RegionObserver
    return Optional.of(this);
  }

  @Override
  public void start(
      @SuppressWarnings("rawtypes") CoprocessorEnvironment env) throws IOException {
    RegionCoprocessorEnvironment renv = (RegionCoprocessorEnvironment) env;
    qualifierToDelete = Bytes.toBytes(renv.getConfiguration().get("qualifier.to.delete"));
    comparator = new Bytes.ByteArrayComparator();
  }

  @Override
  public InternalScanner preCompact(
      ObserverContext<RegionCoprocessorEnvironment> c, Store store,
      final InternalScanner scanner, ScanType scanType, CompactionLifeCycleTracker tracker,
      CompactionRequest request) {
    InternalScanner modifyingScanner = new InternalScanner() {
      @Override
      public boolean next(List<Cell> result, ScannerContext scannerContext) throws IOException {
        boolean ret = scanner.next(result, scannerContext);
        for (int i = 0; i < result.size(); i++) {
          Cell c = result.get(i);
          byte[] qualifier = CellUtil.cloneQualifier(c);
          if (comparator.compare(qualifier, qualifierToDelete) == 0) {
            result.remove(i);
          }
        }
        return ret;
      }

      @Override
      public void close() throws IOException {
        scanner.close();
      }
    };

    return modifyingScanner;
  }
}

打成jar包上传到hdfs;

以下是简单的测试过程展示;

create 'cp_test','f'

put 'cp_test','rk1','f:q1','123'
put 'cp_test','rk1','f:q2','123'
put 'cp_test','rk2','f:q1','123'
put 'cp_test','rk2','f:q2','123'
put 'cp_test','rk2','f:q3','123'


hbase(main):015:0> scan 'cp_test'
ROW                                    COLUMN+CELL                                                                                                     
 rk1                                   column=f:q1, timestamp=1590567958995, value=123                                                                 
 rk1                                   column=f:q2, timestamp=1590567959023, value=123                                                                 
 rk2                                   column=f:q1, timestamp=1590567959048, value=123                                                                 
 rk2                                   column=f:q2, timestamp=1590567959073, value=123                                                                 
 rk2                                   column=f:q3, timestamp=1590567959842, value=123        
 

alter 'cp_test' \
, METHOD => 'table_att', 'coprocessor'=>'hdfs://xxx.jar|xxx.QualifierDelexxxtingObserver|1024|qualifier.to.delete=q1'

flush 'cp_test'

major_compact 'cp_test'

hbase(main):017:0> scan 'cp_test'
ROW                                    COLUMN+CELL                                                                                                     
 rk1                                   column=f:q2, timestamp=1590567959023, value=123                                                                 
 rk2                                   column=f:q2, timestamp=1590567959073, value=123                                                                 
 rk2                                   column=f:q3, timestamp=1590567959842, value=123  

版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。

相关文章
面试复习系列【python-数据处理-2 】
面试复习系列【python-数据处理-2 】
26 0
大数据处理 | HDFS文件系统配置及基本使用
Spark是目前Apache三大主流开源分布式大数据处理框架之一,它具有低时延、速度快、通用性强、生态系统等优点,此外它不仅可以用于数据的批计算,还可以用于数据的流计算,这让它倍受欢迎。因此,我准备用两篇文章介绍一下Spark集群环境的搭建和基本使用,由于本文是基于HDFS文件存储系统,因此第一篇文章会详细介绍Hadoop集群的搭建与基本使用,第二篇文章会介绍Spark集群的搭建与基本使用。
89 0
JS的数组去重处理(十九)
JS的数组去重处理(十九)
49 0
Spring 基于注解(annotation)的配置之@Qualifier注解
Spring 基于注解(annotation)的配置之@Qualifier注解
31 0
德哥PG系列课程直播(第14讲):PostgreSQL 数据清洗、采样、脱敏、批处理、合并
知识点 知识点:数据清洗、去重、采样、脱敏、批处理、合并 学习资料 1、PostgreSQL 数据采样与脱敏标签:PostgreSQL , 采样 , 脱敏PostgreSQL 巧妙的数据采样方法 2、PostgreSQL 数据去重大法标签:PostgreSQL , 去重 , 单列去重 , 多列去重.
12887 0
Spring的qualifier标签
@Autowired是根据类型进行自动装配的。如果当Spring上下文中存在不止一个UserDao类型的bean时,就会抛出BeanCreationException异常;如果Spring上下文中不存在UserDao类型的bean,也会抛出BeanCreationException异常。
1228 0
新手学JAVA(六)----处理随机性的数据
<div class="markdown_views"> <p></p> <div class="toc"> <ul> <li><a href="#%E5%9F%BA%E7%A1%80%E7%9F%A5%E8%AF%86">基础知识</a></li> <li><a href="#%E5%AE%9E%E4%BE%8B">实例</a></li> </ul> </div> <hr
990 0
+关注
20
文章
0
问答
文章排行榜
最热
最新
相关电子书
更多
低代码开发师(初级)实战教程
立即下载
阿里巴巴DevOps 最佳实践手册
立即下载
冬季实战营第三期:MySQL数据库进阶实战
立即下载