本文主要带大家了解Coprocessor的原理。首先从四个角度出发讲解了什么是Coprocessor,Coprocessor适合哪些使用场景。并详细分析了两种类型Coprocessor,分别介绍了Observer和Endpoint的实现及其区别。最后从实际应用角度介绍了Coprocessor在RsGroup中的具体使用。
演讲嘉宾简介:
陈杨,快手大数据高级研发工程师。毕业于浙江大学,现负责快手HBase的维护与研发,支持视频、特征、用户画像、IM等海量数据的存储。一直致力于大数据基础架构和Hadoop生态的学习与研发。同时在HBase与HDFS的基础上,和团队一起研发了大数据存储与分析相关系统,如blobstore、bitbase等。
本次直播视频精彩回顾,戳这里!https://yq.aliyun.com/live/945
以下内容根据演讲嘉宾视频分享整理而成。https://yq.aliyun.com/download/3468
本次的分享主要围绕以下三个方面:
一、Coprocessor简介及使用场景
二、Coprocessor实现
三、Coprocessor应用
一、Coprocessor简介及使用场景
Coprocessor是协处理器,一种实现类似触发器,AOP和计算本地性等扩展能力的框架。它主要分为两种类型,Observer和Endpoint。Observer相当于一个监听者,Endpoint相当于一种service。触发器和AOP的场景下使用Observer类型。计算本地性和扩展RsGroup都是通过Endpoint的方式来实现。
触发器。触发器主要出现在关系型数据库中。当用户写入一个数据或者读一个数据时就是一个事件触发了另外一个事件。比如,插入数据时,如果rowKey=‘A’,则将数据发送给Kafka,是一次触发事件。在实际使用场景中,发给Redis也是触发事件。
AOP。AOP是一种切面编程,即在一件事情发生的前或后拦截请求,并对这一请求进行统一处理。在MVC中,一个典型的例子是权限认证。用户请求一个Service的URL,需要统一认证。在HBase中,在写入之前,在rowKey前面添加‘001’,即直接修改rowKey,就是属于AOP。AOP和触发器存在区别,它们不完全相等但也没有特别明确的区分。与关系型数据库相比,在HBase 的Coprocessor中触发器需要实现方法的原子性,而且触发事件的原子性一般比较困难。
计算本地性。计算本地性指聚合或计算同一个region上的一批数据。数据在同一个Region Server上,需要对这一批数据进行计算,此时便会考虑到Coprocessor。其优点是不需要将数据请求放到真正请求的service里面,可以降低网络开销,尤其是service connect端的日程开销。比如,对一个table做count运算。阿里云开发了一个对bitmap做运算的系统,叫bitbase。一个大的bitmap是一个bit数组可以达到十亿+位。目前一个bitmap的笔头有15亿位,切成一块一块放在HBase中。两个bitmap之间做与、或、非、异或的运算,直接存到HBase的Coprocessor里。
扩展RsGroup。Coprocessor可以扩展HBase接口能力且不影响主流程。如果用户想实现增删改查之外其它接口,这些接口可能即不是table的接口,也不是Master的接口,那么Coprocessor是一个较好的选择。Coprocessor可以通过封装服务逻辑的方式,在HBase里面重新建立一个RPC服务。
1. Observer
- MasterObserver。 Master 是HBaseMaster角色。Master做为监听者,监听的事件包括post MoveServers,post MoveTables,postTruncateTable,postTableFlash和postSnapshot等。Truncate可能会删数据,清理数据,或者重新分区。post表示Truncate一个table之后要做的事情,如打一行日志。另外还有pre方法,pre相当于preTruncateTable,指在Truncate之前做的事情。
- RegionServerObserver。 包括postCreateReplicatoin方法,表示开启了Peplicatoin之后要做的事情。以及preStopRegionServer,指在stop RegionServer之前需要做的事情。还包括rollBack,merge等。与RegionServer相关的所有事件都在RegionServerObserver里面。
- RegionObserver。 前面所提到的count一个table,改的rowKey的方法正是在RegionObserver这一层进行处理。 向一个region put一个数据时改变它的rowKey。常用的方法包括preGetOP,prePut和preScannerNext。 如果要拦截病毒,需要注意除了要拦截get之外还要拦截scanner。另外,三种pre方法都有对应的post方法。
- WALObserver。 RegionServer在写数据时会新写HLog。WAL指的是写的HLog发生事件的时候需要做的事情。比如,一个pos WALright代表向一个WAL写了ats之后要发生的事情。如果HLog写满了,重新进一个HLog时,需要做的事情便是preWALRoll。
- BulkLoadObserver。 BulkLoad数据可能大于table。BulkLoadObserver作用主要在region上,将一些HFile文件BulkLoad到region时,可以在两个点上做一些事情。比如在prePrepareBulkLoad之前clean BulkLoad命令,指BulkLoad在Bulk完成之后需要clean up ,相当于把原来的文件删掉,做一堆clean的方法,然后在clean之前可以做切面。
- EndpointObserver。 用户在自己的HBase里写一个RPC服务,通过接口来调用RPC服务,在调用Endpoint之前可以插入用户自己的逻辑代码。
2. Endpoint
Endpoint需要实现CoprocessorService接口,与Observer最大的区别是如果Coprocessor继承着上面六种Observer的话,那便属于Observer。而CoprocessorService接口其实只有一个方法,就是getService,Service是通过protobuf定义的服务。在HBase里面定义了自己的Service,一般Service都有服务发现和服务注册,HBase是通过已有的RPCService来包装扩展定义的Service。在HBase protoc文件里定义了两个Service,即MasterService和ClientService。ClientService中定义了一个方法ExcodeService,ExcodeService的输入是CoprocessorServiceRequest,相当于CoprocessorService的一个请求。首先,ExcodeService将所有的请求参数封装在它的结构里面,然后把所有的返回返回到CoprocessorService。可以理解为ExcodeService方法把服务请求和返回包装在了HBase原来的RPC的请求参数和返回参数里面。
二、Coprocessor实现
- Observer 实现
假如有一张表叫Users Table,其中存了用户信息,需求是管理员的信息不被所有的get RPC的请求获得到,即别人无法获得管理员信息,但可以获得其它数据是。针对这个需求需要做两点。第一点是在preGetOp中一个get请求过来后,发现rowKey之后对其进行处理,一种是返回空的result,另一种是在result中放入假数据或者警告。第二点规避scan。新进一个scanner每次都是调postScanNext,所以希望在每次scan一堆数据之后,如果scan到了admin,则将result中的rowKey删掉。
下图展示了一段工程(HBaseCoprocessor)中的代码(RegionObserverExample)。 首先将Region的ObserverExample记成BaseRegionObserver。定义几个变量,包括ADMIN、FAMILY、COLUMN、VALUE等。VALUE是返回的警告。实现preGetOp,即表示在get之前做的事情。将Get中的rowKey提取出来,如果等于admin的话则newCell,COLUMN_FAMILY和COLUMN等。之后返回提示性信息,之后result.addnewCell返回result。c.bypass表示不再继续,直接返回。postScannerNext方法从result中提取迭代器,如果发现result等于Admin,直接将其删掉,最后hasNext不改变它的原意,直接返回,相当于修改了result中的存储。
加载方式
在实现Observer之后需要把它放到HBase中。加载指的是开发完成之后,需要将Coprocessor打包为jar包,部署到HBase的过程。加载包含两种方式,一种是静态加载,另一种是动态加载。
a.静态加载。 静态加载是直接配置HBase-site.xml。HBase-site.xml是RegionServer和Master启动时加载的配置项。第一步,配置的属性项主要与HBase.Coprocessor.Master,HBase.Coprocessor.regionserve,HBase.Coprocessor.region,HBase.Coprocessor.user.region和HBase.Coprocessor.wal等相关。region和user.region 的区别是region可以加载多个Coprocessor,但是Coprocessor A和Coprocessor B之间有执行的优先级顺序。region.classes比user.region的优先级高。第二步,将代码放在HBase的classpath下,让HBase感知到加载的类。最后重启HBase。
b.动态加载。 动态加载是配置table的属性。动态加载只适合RegionObserver。第一步disable table。之后改用alter命令修改‘table_att’,相当于修改table的属性。属性名字叫Coprocessor,后面一段是本地路径,其中的original是本机上的Coprocessor jar包。分隔线前面第一段是oprocessor jar包,后一段是Coprocessor类,即RegionObserverExample,第三段是优先级,最后一段是需要配置的属性。第三步enable table。如此Observer就已经生效了。
2. Endpoint 实现
Endpoint的实现,主要经历几个流程。首先定义protoc文件,之后生成Java类,第三步编写Endpoint类实现Coprocessor和CoprocessorService,第四步编写客户端。第四步中编写客户端不再是table.get,而是要调用table.Coprocessor。
第一步,定义protoc文件。分别选取了java_package,生成类的名字,是否产生Service,生成equals_and_hash,optimize_for代表生成的类有几种SPEED,指生成的文件是照顾运行速度还是生成包的大小。最后在RowCountService中定义了两个接口。
第二步,用protoc生成Java类。
- cd src/main/proto
- 执行protoc命令,生成row_count.proto。 然后产生HBase包,产生RowCountProtos.java类。
第三步,编写RowCountEndpoint。RowCountEndpoint继承自RowCountProtos.RowCountService,又实现Coprocessor和CoprocessorService。首先定义一个变量RegionCoprocessorEnvironment,Environment其实与Master相关,即与region server相关。因为RowCount针对Region,所以需要定义RegionCoprocessorEnvironment才能从Environment中得到Region。在Coprocessor实例化时,调用start方法进行初始化。检测RegionCoprocessorEnvironment是不是RegionCoprocessor的Environment,否则便认为不符合预期。初始化之后getRowcount,调用一个service接口。定义scan,FirstKeyOnlyFilter指拿到第一个convalue,在RowCount时可以节省吞吐,之后通过Region得到一个scanner,保存results,最后build一个response将count设置进去。
client程序
需要注意Endpoint需要写Client程序,但Observer不需要。下图中Client写在了main里面。首先build请求参数,新建connection,新建table。,调table.CoprocessorService,传进去的第一个参数是RowCountProtos.RowCountService,恰好在proto文件里面有定义,即EndpointService的class类。相当于通知RegionServer调用哪个Endpoint。在开始split几个Region时,Coprocessor会根据startKey和endKey去count请求会落到多少个Region上。第二个参数Long表示群表Count,之后是new Batch。第三个参数Call,一个函数式接口,它传一个t类型,instance型返回一个result。传进来的instance是RowCountService,CoprocessorService底层会实例化RowCountService。RowCountService传给用户,用户自己定义RowCountService请求哪个接口,接口得到结果之后返回给上层数据。Call的核心功能是确定每次调用时调Service的哪个接口,调到接口之后将得到的结果返回给需要的数据类型,处理的是调用的业务逻辑。得到的results是一个map,包含有byte和long。调用CoprocessorObserver时它真正作用的是每个Region,所以byte指每个Region,即在Region上调用得到的结果。调用Coprocessor时得到的是一个结果集,需要对结果集做并列加和才能得到整个table的Count。
调用路由
Observer的调用过程非常简单直接。Observer的调用过程不需要客户端做额外的开发,在调用相应接口时会自动出发。首先是客户端table.get(),然后请求rsRpcService.get(),调用cop.preGetOp(),调用region,get(),最后再返回cop.postGetOp()。Endpoint首先调Service.coutRow(),然后调CoprocessorRpcChannel,表示将Service的参数封装到真正定义的RPC参数里面,再调用CoprocessorRpcChannel。 CoprocessorRpcChannel分为三种,RegionServer,Region和Master。通过Channel调到executorService,然后再通过注册机制发现服务。最后调用Endpoint。
卸载和升级
静态卸载首先删除配置文件,重写HBase,然后从lib或相关classpath下将class相应的包删掉。动态卸载仍然是disable,然后alter,注意调用的是‘table_att_unse’,然后卸载掉Coprocessor$1,再enable,最后从path下删除相关的jar包。升级指的是更新jar包,重启HBase。由于class loader的原因,仅仅更新jar包是无效的,或者仅仅是更新jar包后再unset然后再set也不一定是有效的。提供几个建议,首先Cop jar包打包成一个大的jar包,如此直接加载一个jar包便可。第二点,注意包冲突。Cop所依赖的包要与HBase 的包保持兼容,尤其是protobuf相关包。
三、Coprocessor应用
- RsGroup简介
Coprocessor的应用主要是RsGroup。RsGroup主要实现的功能是将Region Server分成不同的组,每一组负责特定的业务。RsGroup最大的优点可以将业务进行隔离,业务之间不相互影响,以及隔离meta表。RsGroup的核心功能模块有以下几块,proto文件,AdminEndpoint,Servicelmpl,Client,Server和BasedLoadBalancer等。AdminEndpoint实现CoprocessorService和MasterObserver。Servicelmpl没有使用Endpoint来实现,而是自己写了lib类实现。Client真正的业务逻辑在RsGroupAdminServer和BasedLoadBalancer里面。BasedLoadBalancer主要是通知Master启用Endpoint。
- RsGroup proto文件
proto文件定义比较简单。首先,RsGroupAdminService里面主要有Inf接口,Get接口,MoveServers,MoveTables,AddRSGroup,MoveRSGroup,BalanceRSGroup和ListRSGroupInfos。这些接口都表示RsGroup不同的目的。
- RsGroup Endpoint
RSGroupAdminEndpoint应用Coprocesso包装RsGroup RPC服务。下图可以看到它集成了BaseMasterObserver,监听MasterObserver的事件。在串进一个table时,如果有RSGroup,需要感知到table是不是能够找到默认的default RSGroup,还是会有另一个目标的RSGroup。RSGroupAdminEndpoint继承了MasterObserver,实现了CoprocessorService。实现CoprocessorService的时候仅仅是做GetService,然后返回了RSGroupAdminService,有一个它的实例类RSGroupAdminServiceImpl。 其中groupInfoManager,和groupAdminservice才是RSGroupAdminEndpoint中真正的逻辑。RSGroupAdminServiceImpl中只是简单地包了一些接口,如得到请求参数,解析请求参数,包装返回结果。
从上面两段代码可以意识到Coprocessor只是实现RPC服务的一个框架,它不应该去承载业务逻辑。