spark集成hbase与hive数据转换与代码练习

简介:   帮一个朋友写个样例,顺便练手啦~一直在做平台的各种事,但是代码后续还要精进啊。。。 1 import java.util.Date 2 3 import org.apache.hadoop.

  帮一个朋友写个样例,顺便练手啦~一直在做平台的各种事,但是代码后续还要精进啊。。。

 1 import java.util.Date
 2 
 3 import org.apache.hadoop.hbase.HBaseConfiguration
 4 import org.apache.hadoop.hbase.client.{Put, Scan, Result}
 5 import org.apache.hadoop.hbase.io.ImmutableBytesWritable
 6 import org.apache.hadoop.hbase.mapred.TableOutputFormat
 7 import org.apache.hadoop.hbase.mapreduce.TableInputFormat
 8 import org.apache.hadoop.hbase.util.Bytes
 9 import org.apache.hadoop.mapred.JobConf
10 import org.apache.log4j.{Level, Logger}
11 import org.apache.spark.rdd.RDD
12 import org.apache.spark.sql.DataFrame
13 import org.apache.spark.sql.hive.HiveContext
14 import org.apache.spark.{SparkContext, SparkConf}
15 
16 /**
17  * Created by ysy on 2/10/17.
18  */
19 object test {
20 
21     case class ysyTest(LS_certifier_no: String,loc: String,LS_phone_no: String)
22 
23     def main (args: Array[String]) {
24       val sparkConf = new SparkConf().setMaster("local").setAppName("ysy").set("spark.executor.memory", "1g")
25       val sc = new SparkContext(sparkConf)
26       val sqlContext = new HiveContext(sc)
27       sqlContext.sql("drop table pkq")
28       val columns = "LS_certifier_no,LS_location,LS_phone_no"
29       val hbaseRDD = dataInit(sc,"EVENT_LOG_LBS",columns).map(data =>{
30         val id =Bytes.toString(data._2.getValue("f1".getBytes, "LS_certifier_no".getBytes))
31         val loc = Bytes.toString(data._2.getValue("f1".getBytes, "LS_location".getBytes))
32         val phone = Bytes.toString(data._2.getValue("f1".getBytes, "LS_phone_no".getBytes))
33         (id,loc,phone)
34       })
35       val showData = hbaseRDD.foreach(println)
36       val datas = hbaseRDD.filter(_._1 != null).filter(_._2 != null).filter(_._3 != null)
37       val hiveDF = initHiveTableFromHbase(sc:SparkContext,sqlContext,datas)
38       writeHiveTableToHbase(sc,hiveDF)
39 
40 
41     }
42 
43   def initHiveTableFromHbase(sc:SparkContext,sqlContext: HiveContext,hiveRDD:RDD[(String,String,String)]) : DataFrame = {
44     val hRDD = hiveRDD.map(p => ysyTest(p._1,p._2,p._3))
45       val hiveRDDSchema = sqlContext.createDataFrame(hiveRDD)
46       hiveRDDSchema.registerTempTable("pkq")
47       hiveRDDSchema.show(10)
48       hiveRDDSchema
49   }
50 
51   def dataInit(sc : SparkContext,tableName : String,columns : String) : RDD[(ImmutableBytesWritable,Result)] = {
52     val configuration = HBaseConfiguration.create()
53     configuration.addResource("hbase-site.xml")
54     configuration.set(TableInputFormat.INPUT_TABLE,tableName )
55     val scan = new Scan
56     val column = columns.split(",")
57     for(columnName <- column){
58       scan.addColumn("f1".getBytes(),columnName.getBytes())
59     }
60     val hbaseRDD = sc.newAPIHadoopRDD(configuration,classOf[TableInputFormat],classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],classOf[org.apache.hadoop.hbase.client.Result])
61     System.out.println(hbaseRDD.count())
62     hbaseRDD
63   }
64 
65   def writeHiveTableToHbase(sc : SparkContext,hiveDF : DataFrame) = {
66     val configuration = HBaseConfiguration.create()
67     configuration.addResource("hbase-site.xml ")
68     configuration.set(TableOutputFormat.OUTPUT_TABLE,"EVENT_LOG_LBS")
69     val jobConf = new JobConf(configuration)
70     jobConf.setOutputFormat(classOf[TableOutputFormat])
71 
72     val putData = hiveDF.map(data =>{
73       val LS_certifier_no = data(0)
74       val LS_location = data(1)
75       val LS_phone_no = data(2)
76       (LS_certifier_no,LS_location,LS_phone_no)
77     })
78 
79     val rdd = putData.map(datas =>{
80       val put = new Put(Bytes.toBytes(Math.random()))
81       put.addColumn("f1".getBytes(),"LS_certifier_no".getBytes(),Bytes.toBytes(datas._1.toString))
82       put.addColumn("f1".getBytes(),"LS_location".getBytes(),Bytes.toBytes(datas._2.toString))
83       put.addColumn("f1".getBytes(),"LS_phone_no".getBytes(),Bytes.toBytes(datas._3.toString))
84       (new ImmutableBytesWritable, put)
85     })
86     val showRdd = rdd.foreach(println)
87     rdd.saveAsHadoopDataset(jobConf)
88   }
89 
90   }

目录
相关文章
|
3月前
|
SQL 人工智能 JSON
Flink 2.1 SQL:解锁实时数据与AI集成,实现可扩展流处理
简介:本文整理自阿里云高级技术专家李麟在Flink Forward Asia 2025新加坡站的分享,介绍了Flink 2.1 SQL在实时数据处理与AI融合方面的关键进展,包括AI函数集成、Join优化及未来发展方向,助力构建高效实时AI管道。
788 43
|
3月前
|
SQL 人工智能 JSON
Flink 2.1 SQL:解锁实时数据与AI集成,实现可扩展流处理
本文整理自阿里云的高级技术专家、Apache Flink PMC 成员李麟老师在 Flink Forward Asia 2025 新加坡[1]站 —— 实时 AI 专场中的分享。将带来关于 Flink 2.1 版本中 SQL 在实时数据处理和 AI 方面进展的话题。
284 0
Flink 2.1 SQL:解锁实时数据与AI集成,实现可扩展流处理
|
3月前
|
SQL 关系型数据库 Apache
从 Flink 到 Doris 的实时数据写入实践 —— 基于 Flink CDC 构建更实时高效的数据集成链路
本文将深入解析 Flink-Doris-Connector 三大典型场景中的设计与实现,并结合 Flink CDC 详细介绍了整库同步的解决方案,助力构建更加高效、稳定的实时数据处理体系。
1761 0
从 Flink 到 Doris 的实时数据写入实践 —— 基于 Flink CDC 构建更实时高效的数据集成链路
|
3月前
|
机器学习/深度学习 SQL 大数据
什么是数据集成?和数据融合有什么区别?
在大数据领域,“数据集成”与“数据融合”常被混淆。数据集成关注数据的物理集中,解决“数据从哪来”的问题;数据融合则侧重逻辑协同,解决“数据怎么用”的问题。两者相辅相成,集成是基础,融合是价值提升的关键。理解其差异,有助于企业释放数据潜力,避免“数据堆积”或“盲目融合”的误区,实现数据从成本到生产力的转变。
什么是数据集成?和数据融合有什么区别?
|
4月前
|
分布式计算 Java 大数据
springboot项目集成dolphinscheduler调度器 可拖拽spark任务管理
springboot项目集成dolphinscheduler调度器 可拖拽spark任务管理
299 2
|
10月前
|
容灾 安全 关系型数据库
数据传输服务DTS:敏捷弹性构建企业数据容灾和集成
数据传输服务DTS提供全球覆盖、企业级跨境数据传输和智能化服务,助力企业敏捷构建数据容灾与集成。DTS支持35种数据源,实现全球化数据托管与安全传输,帮助企业快速出海并高效运营。瑶池数据库的全球容灾、多活及集成方案,结合DTS的Serverless和Insight功能,大幅提升数据传输效率与智能管理水平。特邀客户稿定分享了使用DTS加速全球业务布局的成功经验,展示DTS在数据分发、容灾多活等方面的优势。
288 0
|
10月前
|
人工智能 安全 Dubbo
Spring AI 智能体通过 MCP 集成本地文件数据
MCP 作为一款开放协议,直接规范了应用程序如何向 LLM 提供上下文。MCP 就像是面向 AI 应用程序的 USB-C 端口,正如 USB-C 提供了一种将设备连接到各种外围设备和配件的标准化方式一样,MCP 提供了一个将 AI 模型连接到不同数据源和工具的标准化方法。
4547 100
|
5月前
|
运维 安全 数据管理
Dataphin V5.1 企业级发布:全球数据无缝集成,指标管理全新升级!
企业数据管理难题?Dataphin 5.1版来解决!聚焦跨云数据、研发效率、指标管理和平台运维四大场景,助力数据团队轻松应对挑战。无论是统一指标标准、快速定位问题,还是提升管理安全性,Dataphin都能提供强大支持。3分钟了解新版本亮点,让数据治理更高效!
106 0
|
9月前
|
Java 关系型数据库 MySQL
SpringBoot 通过集成 Flink CDC 来实时追踪 MySql 数据变动
通过详细的步骤和示例代码,您可以在 SpringBoot 项目中成功集成 Flink CDC,并实时追踪 MySQL 数据库的变动。
2374 45
|
10月前
|
机器学习/深度学习 PyTorch 测试技术
LossVal:一种集成于损失函数的高效数据价值评估方法
LossVal是一种创新的机器学习方法,通过在损失函数中引入实例级权重,直接在训练过程中评估数据点的重要性,避免了传统方法中反复重训练模型的高计算成本。该方法适用于回归和分类任务,利用最优传输距离优化权重,确保模型更多地从高质量数据中学习。实验表明,LossVal在噪声样本检测和高价值数据点移除等任务上表现优异,具有更低的时间复杂度和更稳定的性能。论文及代码已开源,为数据价值评估提供了高效的新途径。
237 13
LossVal:一种集成于损失函数的高效数据价值评估方法