借hbase-rdd二次开发谈如何在Spark Core之上扩建自己的模块

简介:

借hbase-rdd二次开发谈如何在Spark Core之上扩建自己的模块


hbase-rdd是一个构建在SparkContext基础之上的用于对Hbase进行增删改查的第三方开源模块,目前最新版本为0.7.1。目前该rdd在操作hbase时,默认调用隐式方法。


 
 
  1. implicitdef stringToBytes(s: String): Array[Byte] = {  
  2. Bytes.toBytes(s)  

将RDD的key转换成字节b,然后调用Hbase的put(b)方法保存rowkey,之后将RDD的每一行存入hbase。

在轨迹图绘制项目数据计算中,我们考虑到hbase的rowkey的设计——尽量减少rowkey存储的开销。虽然hbase-rdd最终的rowkey默认都是采用字节数组,但这个地方我们希望按自己的方式组装rowkey。使用MD5(imei)+dateTime组成的字节数组作为rowkey。因此默认的hbase-rdd提供的方法是不满足我们存储需求的,需要对源代码进行修改。在toHbase方法中,有一个convert方法,该方法将对RDD中的每一行数据进行转化,使用RDD中的key生成Put(Bytes.toBytes(key))对象,该对象为之后存储Hbase提供rowkey。

在convert函数中,对其实现进行了改造,hbase-rdd默认使用stringToBytes隐式函数将RDD的String类型的key转换成字节数组,这里我们需要改造,不使stringToBytes隐式方法,而是直接生成字节数据。


 
 
  1. protected def convert(id: String, values: Map[String, Map[String, A]], put: PutAdder[A]) = {  
  2. val strs = id.split(",")  
  3. val imei = strs {0}  
  4. val dateTime = strs {1}  
  5. val b1 = MD5Utils.computeMD5Hash(imei.getBytes())  
  6. val b2 = Bytes.toBytes(dateTime.toLong)  
  7. val key = b1.++(b2)  
  8. val p = new Put(key)//改造  
  9. var empty = true  
  10. for {  
  11. (family, content) <- values  
  12. (key, value) <- content  
  13. } {  
  14. empty = false  
  15. if (StrUtils.isNotEmpty(family) &&StrUtils.isNotEmpty(key)) {  
  16. put(p, family, key, value)  
  17. }  
  18. }  
  19. if (empty) None else Some(new ImmutableBytesWritable, p)  

这样就实现了使用自己的方式构建rowkey,当然基于此思想我们可以使用任意的方式构建rowkey。

在使用hbase-rdd插件的过程中,我在思考,默认的RDD上是没有toHbase方法的,那为什么引入hbase-rdd包之后,RDD之上就有toHbase方法了?经过查看源码,发现hbase-rdd包中提供了两个隐式方法:


 
 
  1. implicitdef toHBaseRDDSimple[A](rdd: RDD[(String, Map[String, A])])(implicit writer: Writes[A]): HBaseWriteRDDSimple[A] =new HBaseWriteRDDSimple(rdd, pa[A]) 
  2. implicit def toHBaseRDDSimpleTS[A](rdd: RDD[(String, Map[String, (A, Long)])])(implicit writer: Writes[A]): HBaseWriteRDDSimple[(A, Long)] =new HBaseWriteRDDSimple(rdd, pa[A]) 

这两个方法在发现RDD上没有toHbase方法时会自动尝试调用,从隐式定义中尝试找到解决方案,尝试之后发现有定义toHBaseRDDSimple隐式方法,于是调用该隐式方法新建HBaseWriteRDDSimple类,返回hBaseWriteRDDSimple,而在hBaseWriteRDDSimple对象中是有toHbase方法的,因此在引入hbase-rdd之后,可以发现原本没有toHbase方法的RDD上有toHbase方法了。这一切都要归功于Scala强大的隐式转换功能。

那明白了原理,是否我们可以基于RDD写自己的模块,说干就干!

第一步:新建Trait


 
 
  1. traitHaha{ 
  2. implicitdef gaga[A](rdd: RDD[String]): Hehe= 
  3. newHehe(rdd) 

第二步:新建Hehe类


 
 
  1. final  class Hehe(rdd:RDD[String]) { 
  2. def wow(tableName:String,family:String): Unit ={ 
  3. println("---------------------------------------------"
  4. println("tableName:"+tableName+" - family:"+family) 
  5. println("size:"+rdd.count()) 
  6. rdd.collect().foreach(data=>println(data)) 
  7. println("---------------------------------------------"
  8.    } 

第三步:新建包对象


 
 
  1. package object test extends Haha 

第四步:新建test类


 
 
  1. object Test{ 
  2. def main(args: Array[String]) { 
  3. valsparkConf = new SparkConf().setAppName("Test"
  4. valsc = new SparkContext(sparkConf) 
  5. sc.makeRDD(Seq("one","two","three","four")).wow("taskDataPre","T"
  6.   } 

项目结构图:

借hbase-rdd二次开发谈如何在Spark Core之上扩建自己的模块

运行效果图:

借hbase-rdd二次开发谈如何在Spark Core之上扩建自己的模块

希望对大家以后的开发有帮助,同时借鉴本案例,在Spark Core之上构建自己的小模块。


作者:张敏

来源:51CTO

相关实践学习
lindorm多模间数据无缝流转
展现了Lindorm多模融合能力——用kafka API写入,无缝流转在各引擎内进行数据存储和计算的实验。
云数据库HBase版使用教程
&nbsp; 相关的阿里云产品:云数据库 HBase 版 面向大数据领域的一站式NoSQL服务,100%兼容开源HBase并深度扩展,支持海量数据下的实时存储、高并发吞吐、轻SQL分析、全文检索、时序时空查询等能力,是风控、推荐、广告、物联网、车联网、Feeds流、数据大屏等场景首选数据库,是为淘宝、支付宝、菜鸟等众多阿里核心业务提供关键支撑的数据库。 了解产品详情:&nbsp;https://cn.aliyun.com/product/hbase &nbsp; ------------------------------------------------------------------------- 阿里云数据库体验:数据库上云实战 开发者云会免费提供一台带自建MySQL的源数据库&nbsp;ECS 实例和一台目标数据库&nbsp;RDS实例。跟着指引,您可以一步步实现将ECS自建数据库迁移到目标数据库RDS。 点击下方链接,领取免费ECS&amp;RDS资源,30分钟完成数据库上云实战!https://developer.aliyun.com/adc/scenario/51eefbd1894e42f6bb9acacadd3f9121?spm=a2c6h.13788135.J_3257954370.9.4ba85f24utseFl
相关文章
|
6月前
|
设计模式 SQL 分布式计算
Spark Day06:Spark Core之Spark 内核调度和SparkSQL快速入门
Spark Day06:Spark Core之Spark 内核调度和SparkSQL快速入门
75 0
|
6月前
|
Java Shell 分布式数据库
【大数据技术Hadoop+Spark】HBase数据模型、Shell操作、Java API示例程序讲解(附源码 超详细)
【大数据技术Hadoop+Spark】HBase数据模型、Shell操作、Java API示例程序讲解(附源码 超详细)
157 0
|
6月前
|
存储 缓存 分布式计算
【Spark】Spark Core Day04
【Spark】Spark Core Day04
51 1
|
6月前
|
分布式计算 分布式数据库 API
Spark与HBase的集成与数据访问
Spark与HBase的集成与数据访问
|
6月前
|
SQL 分布式计算 大数据
Hudi数据湖技术引领大数据新风口(三)解决spark模块依赖冲突
Hudi数据湖技术引领大数据新风口(三)解决spark模块依赖冲突
236 0
|
6月前
|
分布式计算 监控 分布式数据库
Spark Day05:Spark Core之Sougou日志分析、外部数据源和共享变量
Spark Day05:Spark Core之Sougou日志分析、外部数据源和共享变量
169 0
|
SQL 存储 分布式计算
什么是 Spark?Spark 的核心模块、Spark 的核心组件
什么是 Spark?Spark 的核心模块、Spark 的核心组件
676 0
|
分布式计算 分布式数据库 Scala
Spark查询Hbase小案例
写作目的 1)正好有些Spark连接HBase的需求,当个笔记本,到时候自己在写的时候,可以看 2)根据rowkey查询其实我还是查询了好久才找到,所以整理了一下 3)好久没发博客了,水一篇
211 0
Spark查询Hbase小案例
|
分布式计算 数据处理 分布式数据库
《基于HBase和Spark构建企业级数据处理平台》电子版地址
基于HBase和Spark构建企业级数据处理平台
110 0
《基于HBase和Spark构建企业级数据处理平台》电子版地址
|
分布式计算 Hadoop Linux
云计算集群搭建记录[Hadoop|Zookeeper|Hbase|Spark | Docker]更新索引 |动态更新
为了能够更好的查看所更新的文章,讲该博文设为索引 小约定 为了解决在编辑文件等操作的过程中的权限问题,博主一律默认采用root账户登录 对于初次安装的用户可以采用如下命令行:
140 0
云计算集群搭建记录[Hadoop|Zookeeper|Hbase|Spark | Docker]更新索引 |动态更新
下一篇
无影云桌面