- HBase的协处理器涵盖了两种类似关系型数据库中的应用场景:存储过程和触发器,所以协处理器也分为两种:用来实现存储过程功能的终端 程序EndPoint和用来实现触发器功能的观察者Observers
Observer
- 在hbase2.x的时候,按照之前的继承
BaseRegionObserver
是不起作用的,经过我的测试,这个类好像是被移除了,我使用的版本是2.1.1
-
新的实现可以查看接口
Coprocessor
来查看,我们来看一下/** * Base interface for the 4 coprocessors - MasterCoprocessor, RegionCoprocessor, * RegionServerCoprocessor, and WALCoprocessor. * Do NOT implement this interface directly. Unless an implementation implements one (or more) of * the above mentioned 4 coprocessors, it'll fail to be loaded by any coprocessor host. * * Example: * Building a coprocessor to observe Master operations. * <pre> * class MyMasterCoprocessor implements MasterCoprocessor { * @Override * public Optional<MasterObserver> getMasterObserver() { * return new MyMasterObserver(); * } * } * class MyMasterObserver implements MasterObserver { * .... * } * </pre> * Building a Service which can be loaded by both Master and RegionServer * <pre> * class MyCoprocessorService implements MasterCoprocessor, RegionServerCoprocessor { * @Override * public Optional<Service> getServices() { * return new ...; * } * } */
-
这是
Coprocessor
接口的介绍,非常清楚的介绍了协处理器的用法,那么我就简单的翻译一下下,自己的英语很渣的有四个接口的父类是coprocessors,他们分别是MasterCoprocessor, RegionCoprocessor, RegionServerCoprocessor, WALCoprocessor. 不要去直接实现coprocessors接口,除非你的实现类实现了上述4个Observer 然后下面的就是一些具体的实现代码
- 经过之前的介绍,我们知道了Observer的作用就类似一个触发器,当指定操作也就是你实现的方法对应的操作被执行的时候,你定义的Observer逻辑就会去在HBase上去跑,以实现一些控制
- 当然上面的coprocessors的实现不知四个接口,还有一些其他实现类,比如
AccessController
,这个提供数据访问和管理的基本授权检查等操作,还有一些自己可以去查看一下 -
多说无益,直接上 需求
t1表中有f1列族 向f1中插入name,然后自动计算出name的hash作为此行的password列,不可单独添加password 可删除name,顺便一起删除password,不可单独删除password
-
maven
<dependencies> <dependency> <groupId>org.apache.hbase</groupId> <artifactId>hbase-server</artifactId> <version>2.1.1</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> <version>2.7.7</version> </dependency> </dependencies>
-
直接上 代码
public class MyRegionCoprocessor implements RegionCoprocessor { @Override public Optional<RegionObserver> getRegionObserver() { return Optional.ofNullable(new MyRegionObserver()); } private static class MyRegionObserver implements RegionObserver { private static final byte[] FAMILY = Bytes.toBytes("f1"); private static final byte[] COL_NAME = Bytes.toBytes("name"); private static final byte[] COL_PASSWORD = Bytes.toBytes("password"); @Override public void prePut(ObserverContext<RegionCoprocessorEnvironment> c, Put put, WALEdit edit, Durability durability) throws IOException { //如果插入的是password列,那么就报错 List<Cell> passwords = put.get(FAMILY, COL_PASSWORD); if (passwords.size() != 0 || !passwords.isEmpty()){ throw new IllegalArgumentIOException("Password insertion is not allowed !!"); } //如果添加的是name,那么就增加相对应的name的hash的password列 List<Cell> names = put.get(FAMILY, COL_NAME); names.forEach(name -> { int hash = Objects.hash(name); put.addColumn(FAMILY,COL_PASSWORD,Bytes.toBytes(hash)); }); } @Override public void preDelete(ObserverContext<RegionCoprocessorEnvironment> c, Delete delete, WALEdit edit, Durability durability) throws IOException { //阻止删除密码,不可单独删除密码 List<Cell> passwords = delete.get(FAMILY, COL_PASSWORD); if (passwords.size() != 0 || !passwords.isEmpty()){ throw new IllegalArgumentIOException("Password insertion is not delete !!"); } //到这就代表不是密码了,获取name,然后删除对应的password List<Cell> names = delete.get(FAMILY, COL_NAME); names.forEach(name -> { delete.addColumns(FAMILY,COL_PASSWORD); }); } } }
- 如果你对上面的
Optional
类感到陌生,你就记住他是用来判断值是否是空的就可以,如果想进一步了解,你可以参考我的专辑Java8学习,希望能帮助到你 - 上面的代码逻辑还是比较清晰的,在外部类的重写的方法
getRegionObserver
中返回了我们自己实现的Observer类,当然这个方法是不用必须重写的,当我们需要实现自己的Observer的时候必须重写getRegionObserver
,而当我们实现自己的EndPoint的时候,就需要重写getEndpointObserver
方法了,还有一个在RegionCoprocessor
中的方法是getBulkLoadObserver
,从名字我们可以看到是批加载用的 - 当我们实现好了自己的Observer的时候,我们就需要部署到HBase集群中去看看效果,在网上我并没有找到类似本地测试Observer的办法,这可真是麻烦.
-
开始部署,当然打成jar,我就不说了,打完后,部署到HBase的方式有两种,一种是通过配置文件的方式去部署,一种是通过动态的方式去部署,前者称为静态部署,后面即动态部署, 静态部署 我将简单介绍一下,如下,而自己使用的是动态部署的方法
在hbase-site.xml中配置静态部署 <property> <name>hbase.coprocessor.region.classes</name> #这是配置region的协处理器,对应的就是:hbase.coprocessor.master.classes <value>你的全类名</value> #如果想同时配置多个协处理器,可以用逗号分隔多个协处理器的类名 </property> 相关的配置 hbase.coprocessor.enabled:是否启用协处理器机制,默认true,即开启 hbase.coprocessor.user.enabled:即是否允许用户动态配置,如果为false,就只能在xml中配置了 hbase.coprocessor.wal.classes:即监控wal的 hbase.coprocessor.abortonerror:即如果个别协处理器启动失败整个hbase是否启动,默认是如果有的协处理器启动失败,那么hbase就不起了 需要注意的是 1.配置顺序决定了执行顺序 2. 所有协处理器是以系统级优先级加载的 (优先级一会儿会提到) 3. 重启集群起作用
-
动态配置
- 首先你如果使用我上面的代码测试,那么请
create 't1','f1'
- 然后将你的jar上传到集群,然后上传到hdfs,如果你使用我的代码测试,请直接将jar上传到
hdfs:///hbase.jar
- 然后
disable 't1'
- 然后使用特定命令在表上配置协处理器:
alter 't1','coprocessor'=>'hdfs:///hbase.jar|qidai.MyRegionCoprocessor|1001|''
- 再然后
enabled 't1'
- 然后
desc 't1'
,你就会看到
hbase(main):086:0> desc 't1' Table t1 is ENABLED #注意这里代表已经配置上去了,如果没有配置上的话,就是 : 紧接着COLUMN FAMILIES DESCRIPTION 字眼,并没有下面的说明 t1, {TABLE_ATTRIBUTES => {coprocessor$1 => 'hdfs://hbase.jar|qidai.MyRegionCoprocessor|1001|'} COLUMN FAMILIES DESCRIPTION
- 到了这一步其实就可以了,然后在t1表中尽情测试需求就好了,我测试的时候就很简单的测试了一下,如果你使用我的代码测试,如果有问题,请及时指正哈!!!如果你发现你的动态配置貌似没起作用,那么请重启一下对应的RegionServer,我出现了这个问题,估计是我的笔记本内存上限了,没反应过来??????
- 首先你如果使用我上面的代码测试,那么请
-
到这就该 说说优先级 了,如上如果配置了处理器,多出来的那一行我们分析一下
alter 't1','coprocessor'=>'hdfs:///hbase.jar|qidai.MyRegionCoprocessor|1001|' alter '表名','coprocessor'=>'hdfs地址|你的全类名|优先级|传入给你实现类的参数' 这里的优先级是0是最高的,以整数来表示的,越小值越先被执行
-
好了到这之后,大概的一些问题已经解决了,那么怎么将Observer给 卸载 呢 ?
- 静态部署的话,那么删除对应的xml内容,然后重启HBase即可
- 动态部署的话,我们可以先
disable
,然后使用alter 't1', METHOD => 'table_att_unset', NAME => 'coprocessor$1'
来进行卸载,在最后面的coprocessor$1
需要自己改一下,看看自己的处理器名是啥就写啥,然后在enable
就行了
- 剩余的EndPoint的使用这篇暂时不介绍, 不过迟早会补上的 , 因为现在只对Avro了解一些, 而Protobuf 不太清楚, 恩..今天这篇涉及的只是Observer一些最基本的操作,是并没有涉及什么原理之类的, 所以在了解其原理之前这也是必备的知识, 如果本片文章对你有些许帮助那就点个赞吧哈哈