开发者学堂课程【2020版大数据实战项目之DMP广告系统(第四阶段):框架搭建_Kudu工具类_读取表】学习笔记,与课程紧密联系,让用户快速学习知识。
课程地址:https://developer.aliyun.com/learning/course/679/detail/11805
框架搭建_Kudu工具类_读取表
内容介绍:
一、代码
二、读取 kudu 表
一、代码
Import org.apache.spark.sql.{datafram,sparksession}
Class kuduhelper{
Private var kuducontext: kuducontext=_
Private var spark : sparksession = _
Private val config = configfactory.load(“kudu”)
Def this(spark:sparksession)={
This()
This spark = spark
//create1:创建 kuducontext
//create2:加载配置文件
Val master =””
Kuducontext = new kuducontext(master,spark,sparkcontext,some(900000))
}
Def this(dateset:dateset[any])={
This(dateset.sparksession)
}
//二:具体功能的开发
//1.创建表
//2.读取表
//3.写入数据
/*1.通过隐式转换,将 sparksession 对象转 kuduhelper 的对象
/*2.通过 kuduhelper 中的 createkudutable()就可以创建一张表
*
*调用方式 spark.createkudutable()
*/
Def createkudutable():unit={
//问题一:kuducontext 来创建
//问题二:kuducontext 创建的时候,需要 kudu 服务器的地址,
//create3:如果存在则删掉旧的表
If(kuducontext.tableexists(tablename)){
Kuducontext.deletetable(tablename)
}
//create4:创建表
Import scala.collection.javaconverters._
Val options=new createtableoptions()
.setnumreplicas(conifg.getint(“kudu.table.factor”))
.addhashpartitions(keys,2)
Kuducontext.createtable(tablename,schema,)
}
/**
*参数: tablename
*返回: dateframe
*/
Def readkudutable(tablename:string):option[dateframe] ={
Import org.apache.kudu.spark.kudu_
If(kuducontext.tableexists(tablename)){
Val result = Spark.read
.option(“kudu.master”,kuducontext.kudumaster)
.option(“kudu.table”,tablename)
.kudu
Some(result)
}else{
None
}
//val m1=map(“”->””)
//val v:option[string]=m1.get(“”)
}
Def savetokudu():unit={
}
}
二、读取 kudu 表
创建库表当中大致了解 kudu 该怎么去访问,接下来就 read 一个 kudu 表。步骤已经明确了了,直接编写这个代码。
第一件事要先想清楚,去创建一个表的时候,读取一个表参数 name 要有。使用spark.read xxx CSV 读出的date frame ,那么返回也应该是 dateframe ,所以把这个方法再完善一下,要接收第一个 table name 就暂定为是一个 string ,返回值应该给 data frame ,那么 table name 和返回值已经有了。就可以直接去读,可以使用 spark.read 要稍微注意一点,是要通过 spark read 但是暂时还没有 spark ,所以第一步应该先把 spark session 放在成员级别。
接下来就要去创建新的成员变量,叫做 spark ,类型是 spark session 为 spark session 进行赋值, spark=spark ,这样其实有一点不合理,所以再加上 This 。大家就能注意到可以进行 spark read 。在 read 的时候,需要传入两个参数,第一个参数,是指定 kudu 的 master , master 要通过 config 来进行读取,也可以通过 kudu context ,读到了 kudu master 这是第一步。
第二步 option ,指定 kudu的 table ,指定一下它的表名。我们表名就是 table name 。其实它也是隐式转换,我们要判断,导入一下隐式转换, Import org.apache.kudu.spark.kudu_ 这样就有 kudu 的隐式转换了。其实这种框架,也会考虑使用隐式转换来进行,通过这一小节,了解了怎么去写 scala 的隐式转换。已经写了两个方法。假如说创建 map ,返回值是什么。例如 string 对象,返回值不是 string 而是 option ,因为 get 有可能取不到内容,如果是这样前面是 get 获取,那么读 table 也是在获取数据,应该直接把东西就写在这,就这样做不合理。 table name 有可能传进来,没有这个表,传进来直接就读,它就会报错。所以不应该直接返回,应该返回 option 对象,还要再写if else 。If (kuducontext. Tableexists (tablename)){ ,判断它是否存在,它如果存在返回对象是 option ,option 两个情况,一个是 some ,还有 none 。应该返回 some ,因为有值。接下来是 else,就是这个表不存在,应该返回 none 这个时候才算搞定,其实也就是我们必须要考虑这个表不存在的这种情况。