spark集成hbase与hive数据转换与代码练习

简介:   帮一个朋友写个样例,顺便练手啦~一直在做平台的各种事,但是代码后续还要精进啊。。。 1 import java.util.Date 2 3 import org.apache.hadoop.

  帮一个朋友写个样例,顺便练手啦~一直在做平台的各种事,但是代码后续还要精进啊。。。

 1 import java.util.Date
 2 
 3 import org.apache.hadoop.hbase.HBaseConfiguration
 4 import org.apache.hadoop.hbase.client.{Put, Scan, Result}
 5 import org.apache.hadoop.hbase.io.ImmutableBytesWritable
 6 import org.apache.hadoop.hbase.mapred.TableOutputFormat
 7 import org.apache.hadoop.hbase.mapreduce.TableInputFormat
 8 import org.apache.hadoop.hbase.util.Bytes
 9 import org.apache.hadoop.mapred.JobConf
10 import org.apache.log4j.{Level, Logger}
11 import org.apache.spark.rdd.RDD
12 import org.apache.spark.sql.DataFrame
13 import org.apache.spark.sql.hive.HiveContext
14 import org.apache.spark.{SparkContext, SparkConf}
15 
16 /**
17  * Created by ysy on 2/10/17.
18  */
19 object test {
20 
21     case class ysyTest(LS_certifier_no: String,loc: String,LS_phone_no: String)
22 
23     def main (args: Array[String]) {
24       val sparkConf = new SparkConf().setMaster("local").setAppName("ysy").set("spark.executor.memory", "1g")
25       val sc = new SparkContext(sparkConf)
26       val sqlContext = new HiveContext(sc)
27       sqlContext.sql("drop table pkq")
28       val columns = "LS_certifier_no,LS_location,LS_phone_no"
29       val hbaseRDD = dataInit(sc,"EVENT_LOG_LBS",columns).map(data =>{
30         val id =Bytes.toString(data._2.getValue("f1".getBytes, "LS_certifier_no".getBytes))
31         val loc = Bytes.toString(data._2.getValue("f1".getBytes, "LS_location".getBytes))
32         val phone = Bytes.toString(data._2.getValue("f1".getBytes, "LS_phone_no".getBytes))
33         (id,loc,phone)
34       })
35       val showData = hbaseRDD.foreach(println)
36       val datas = hbaseRDD.filter(_._1 != null).filter(_._2 != null).filter(_._3 != null)
37       val hiveDF = initHiveTableFromHbase(sc:SparkContext,sqlContext,datas)
38       writeHiveTableToHbase(sc,hiveDF)
39 
40 
41     }
42 
43   def initHiveTableFromHbase(sc:SparkContext,sqlContext: HiveContext,hiveRDD:RDD[(String,String,String)]) : DataFrame = {
44     val hRDD = hiveRDD.map(p => ysyTest(p._1,p._2,p._3))
45       val hiveRDDSchema = sqlContext.createDataFrame(hiveRDD)
46       hiveRDDSchema.registerTempTable("pkq")
47       hiveRDDSchema.show(10)
48       hiveRDDSchema
49   }
50 
51   def dataInit(sc : SparkContext,tableName : String,columns : String) : RDD[(ImmutableBytesWritable,Result)] = {
52     val configuration = HBaseConfiguration.create()
53     configuration.addResource("hbase-site.xml")
54     configuration.set(TableInputFormat.INPUT_TABLE,tableName )
55     val scan = new Scan
56     val column = columns.split(",")
57     for(columnName <- column){
58       scan.addColumn("f1".getBytes(),columnName.getBytes())
59     }
60     val hbaseRDD = sc.newAPIHadoopRDD(configuration,classOf[TableInputFormat],classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],classOf[org.apache.hadoop.hbase.client.Result])
61     System.out.println(hbaseRDD.count())
62     hbaseRDD
63   }
64 
65   def writeHiveTableToHbase(sc : SparkContext,hiveDF : DataFrame) = {
66     val configuration = HBaseConfiguration.create()
67     configuration.addResource("hbase-site.xml ")
68     configuration.set(TableOutputFormat.OUTPUT_TABLE,"EVENT_LOG_LBS")
69     val jobConf = new JobConf(configuration)
70     jobConf.setOutputFormat(classOf[TableOutputFormat])
71 
72     val putData = hiveDF.map(data =>{
73       val LS_certifier_no = data(0)
74       val LS_location = data(1)
75       val LS_phone_no = data(2)
76       (LS_certifier_no,LS_location,LS_phone_no)
77     })
78 
79     val rdd = putData.map(datas =>{
80       val put = new Put(Bytes.toBytes(Math.random()))
81       put.addColumn("f1".getBytes(),"LS_certifier_no".getBytes(),Bytes.toBytes(datas._1.toString))
82       put.addColumn("f1".getBytes(),"LS_location".getBytes(),Bytes.toBytes(datas._2.toString))
83       put.addColumn("f1".getBytes(),"LS_phone_no".getBytes(),Bytes.toBytes(datas._3.toString))
84       (new ImmutableBytesWritable, put)
85     })
86     val showRdd = rdd.foreach(println)
87     rdd.saveAsHadoopDataset(jobConf)
88   }
89 
90   }

相关实践学习
lindorm多模间数据无缝流转
展现了Lindorm多模融合能力——用kafka API写入,无缝流转在各引擎内进行数据存储和计算的实验。
云数据库HBase版使用教程
&nbsp; 相关的阿里云产品:云数据库 HBase 版 面向大数据领域的一站式NoSQL服务,100%兼容开源HBase并深度扩展,支持海量数据下的实时存储、高并发吞吐、轻SQL分析、全文检索、时序时空查询等能力,是风控、推荐、广告、物联网、车联网、Feeds流、数据大屏等场景首选数据库,是为淘宝、支付宝、菜鸟等众多阿里核心业务提供关键支撑的数据库。 了解产品详情:&nbsp;https://cn.aliyun.com/product/hbase &nbsp; ------------------------------------------------------------------------- 阿里云数据库体验:数据库上云实战 开发者云会免费提供一台带自建MySQL的源数据库&nbsp;ECS 实例和一台目标数据库&nbsp;RDS实例。跟着指引,您可以一步步实现将ECS自建数据库迁移到目标数据库RDS。 点击下方链接,领取免费ECS&amp;RDS资源,30分钟完成数据库上云实战!https://developer.aliyun.com/adc/scenario/51eefbd1894e42f6bb9acacadd3f9121?spm=a2c6h.13788135.J_3257954370.9.4ba85f24utseFl
目录
相关文章
|
1月前
|
分布式计算 大数据 Apache
ClickHouse与大数据生态集成:Spark & Flink 实战
【10月更文挑战第26天】在当今这个数据爆炸的时代,能够高效地处理和分析海量数据成为了企业和组织提升竞争力的关键。作为一款高性能的列式数据库系统,ClickHouse 在大数据分析领域展现出了卓越的能力。然而,为了充分利用ClickHouse的优势,将其与现有的大数据处理框架(如Apache Spark和Apache Flink)进行集成变得尤为重要。本文将从我个人的角度出发,探讨如何通过这些技术的结合,实现对大规模数据的实时处理和分析。
142 2
ClickHouse与大数据生态集成:Spark & Flink 实战
|
2月前
|
分布式计算 关系型数据库 MySQL
大数据-88 Spark 集群 案例学习 Spark Scala 案例 SuperWordCount 计算结果数据写入MySQL
大数据-88 Spark 集群 案例学习 Spark Scala 案例 SuperWordCount 计算结果数据写入MySQL
56 3
|
1月前
|
分布式计算 大数据 OLAP
AnalyticDB与大数据生态集成:Spark & Flink
【10月更文挑战第25天】在大数据时代,实时数据处理和分析变得越来越重要。AnalyticDB(ADB)是阿里云推出的一款完全托管的实时数据仓库服务,支持PB级数据的实时分析。为了充分发挥AnalyticDB的潜力,将其与大数据处理工具如Apache Spark和Apache Flink集成是非常必要的。本文将从我个人的角度出发,分享如何将AnalyticDB与Spark和Flink集成,构建端到端的大数据处理流水线,实现数据的实时分析和处理。
72 1
|
3月前
|
SQL JavaScript 前端开发
基于Python访问Hive的pytest测试代码实现
根据《用Java、Python来开发Hive应用》一文,建立了使用Python、来开发Hive应用的方法,产生的代码如下
83 6
基于Python访问Hive的pytest测试代码实现
|
2月前
|
SQL 分布式计算 Java
大数据-96 Spark 集群 SparkSQL Scala编写SQL操作SparkSQL的数据源:JSON、CSV、JDBC、Hive
大数据-96 Spark 集群 SparkSQL Scala编写SQL操作SparkSQL的数据源:JSON、CSV、JDBC、Hive
67 0
|
3月前
|
SQL JavaScript 前端开发
基于Java访问Hive的JUnit5测试代码实现
根据《用Java、Python来开发Hive应用》一文,建立了使用Java、来开发Hive应用的方法,产生的代码如下
82 6
|
4月前
|
监控 数据安全/隐私保护 异构计算
借助PAI-EAS一键部署ChatGLM,并应用LangChain集成外部数据
【8月更文挑战第8天】借助PAI-EAS一键部署ChatGLM,并应用LangChain集成外部数据
106 1
|
4月前
|
存储 分布式计算 Java
|
4月前
|
JSON 数据管理 关系型数据库
【Dataphin V3.9】颠覆你的数据管理体验!API数据源接入与集成优化,如何让企业轻松驾驭海量异构数据,实现数据价值最大化?全面解析、实战案例、专业指导,带你解锁数据整合新技能!
【8月更文挑战第15天】随着大数据技术的发展,企业对数据处理的需求不断增长。Dataphin V3.9 版本提供更灵活的数据源接入和高效 API 集成能力,支持 MySQL、Oracle、Hive 等多种数据源,增强 RESTful 和 SOAP API 支持,简化外部数据服务集成。例如,可轻松从 RESTful API 获取销售数据并存储分析。此外,Dataphin V3.9 还提供数据同步工具和丰富的数据治理功能,确保数据质量和一致性,助力企业最大化数据价值。
226 1

热门文章

最新文章