开发者学堂课程【2020版大数据实战项目之DMP广告系统(第四阶段):框架搭建_Kudu工具类_写入数据】学习笔记,与课程紧密联系,让用户快速学习知识。
课程地址:https://developer.aliyun.com/learning/course/679/detail/11806
框架搭建_Kudu工具类_写入数据
内容介绍:
一、代码
二、写入数据
三、小结
一、代码
Import org.apache.spark.sql.{datafram,sparksession}
Class kuduhelper{
Private var kuducontext: kuducontext=_
Private var spark : sparksession = _
Private val dataset:dataset[any]=_
Private val config = configfactory.load(“kudu”)
Def this(spark:sparksession)={
This()
This spark = spark
//create1:创建 kuducontext
//create2:加载配置文件
Val master =””
Kuducontext = new kuducontext(master,spark,sparkcontext,some(900000))
}
Def this(dateset:dateset[any])={
This(dateset.sparksession)
This.dataset=dataset
}
//二:具体功能的开发
//1.创建表
//2.读取表
//3.写入数据
/*1.通过隐式转换,将 sparksession 对象转 kuduhelper 的对象
/*2.通过 kuduhelper 中的 createkudutable()就可以创建一张表
*
*调用方式 spark.createkudutable()
*/
Def createkudutable():unit={
//问题一:kuducontext 来创建
//问题二:kuducontext 创建的时候,需要 kudu 服务器的地址,
//create3:如果存在则删掉旧的表
If(kuducontext.tableexists(tablename)){
Kuducontext.deletetable(tablename)
}
//create4:创建表
Import scala.collection.javaconverters._
Val options=new createtableoptions()
.setnumreplicas(conifg.getint(“kudu.table.factor”))
.addhashpartitions(keys,2)
Kuducontext.createtable(tablename,schema,)
}
/**
*参数: tablename
*返回: dateframe
*/
Def readkudutable(tablename:string):option[dateframe] ={
Import org.apache.kudu.spark.kudu_
If(kuducontext.tableexists(tablename)){
Val result = Spark.read
.option(“kudu.master”,kuducontext.kudumaster)
.option(“kudu.table”,tablename)
.kudu
Some(result)
}else{
None
}
//val m1=map(“”->””)
//val v:option[string]=m1.get(“”)
}
/**
*spark.creatkudutable
*spark.creatkudutable
*dateset.savetokudu
*/
Def savetokudu():unit={
//1.判断本方法是从 dataset 上调用的
If(dataset==null){
Throw new runtimeexception(“请从 dataset 上开始调用“)
}
//2.保存数据
Import org.apache.kudu.spark.kudu._
Dataset.write
.option(“kudu.master”,kuducontext.kudumaster)
.option(“kudu.table”,rablename)
.kudu
}
}
二、写入数据
最后一个方法很复杂,读取数据的时候通过spark.read kudu table 。创建表通过 spark.create table。在进行写的时候,通过 spark 来 save 这样调用不合理。一般情况下拿到数据,把数据存到数据库当中, data frame 是数据,或者 data set 是数据。之前都是通过 spark 来调用,接下来就要通过 data set 进行调用了,就应该把 data set 拿到,这个方法既然只能在 data set 上调用,理论上 spark 也可以调用这个方法。
因为 spark 也可以被隐式转换过来,所以第一步肯定要去判断,第二步才可以去保存数据。如果是这样,要拿到这个sight,大家注意前面隐式转换的方法,在方法里给了 dataset ,既然是这样,就走到类的定义这个位置。构造函数明显不靠谱,要的不是构造函数,因为构造函数拿到的是 sparks 。
所需要的是这个函数,这个函数才能拿到 data set 。所以 Private val dataset:dataset[any]=_ 在不同函数里,就把 data set 赋值变成了成员变量,接下来,就可以进行具体功能了。、第一步,判断本方法是从 set 上调用的,那么判断一下 data set 如果等于空,就说明没有这个概念,应该 throw new exception ,然后data set 上开始调用,其实这是 runtime time exception ,在 spark 上调用了 kudu 没有任何结果,就会直接报错,代码中断。
接下来保存数据,通过 data set , white 写入到 kudu 当中。但是要指定 option ,第一个option to do ,第二个 master 通过 kudu context 的成员变量指定。接下来第二个 option ,可以指定kudu table ,首先要添加 table name 。
可以调用 kudu 进行保存,这个地方报红线,因为要进行隐式转换的导入, Import org. apache. kudu. spark. kudu._ 这个时候,就有了 kudu 隐式转换,整个 kudu helper 工具类就已经搞定了。
三、小结
回顾这部分的内容,个人建议,大家尽量不要顺着代码从头往后复习。有的同学一个一个看一行一行去复习,这样没有意义,不要这样去看这份代码。
首先找到叫做 structure 一个视图,点开视图,大家就会发现类里面,就是 kuduhelper 文件当中有两个东西。第一个是 class ,叫 kudu helper ,第二个是 object ,也叫 kuduhelper ,是一个伴生对象, kudu helper 伴生对象当中,两个方法,第一个是 implicit ,第二个还是 implicit ,就可以去看它传入的参数是谁,既然知道传入参数是什么类型了,也知道传出参数是什么类型了,那么就可以推断隐式转换,是把一个什么对象转成了另外一个什么对象,结果就是隐式转换的函数,把 spartsession 转为 kudu helper ,另一个把 dataset 转为 kuduhelper ,就是把 kudu helper 当中的功能方法扩展给 spartsession 和 dataset 。因此明白 object 是什么了。接下来再打开 class ,其实就是把 kuduhelper 类的功能和方法拓展给 spartsession 和dataset ,直接看最后三个方法,第一个 create kudu table ,第二个 Read kudu table ,第三个 save to kudu table ,创建表,读取表和保存数据。这样整个流程就串起来了,大家也要这样去复习。 kudu helper 暂时告一段落。