KuduSpark_DF 读写 Kudu 表 | 学习笔记

简介: 快速学习 KuduSpark_DF 读写 Kudu 表

开发者学堂课程【2020版大数据实战项目之 DMP 广告系统(第三阶段)KuduSpark_DF 读写 Kudu 表学习笔记,与课程紧密联系,让用户快速学习知识。

课程地址:https://developer.aliyun.com/learning/course/678/detail/11787


KuduSpark_DF 读写 Kudu 表


刚才介绍了 dataframe kudu 当中进行读取的优势,这两点优势需要建立在对于 spark 的机制,对于 kudu 的副本机制特别了解的前提之下,才能比较好的理解。如果在课上没有完全理解原理上的内容,那么课下复习 spark 的分区机制,kudu 当中的tablet 机制,然后再去思考什么叫做谓词下推,什么叫做可以读 follower,以及这两点为什么又称之为叫做优势。原理上的东西不懂没关系,代码要学会写,也可以先通过代码的编写,然后再反过来反补原理的理解。

所以接下来介绍 dataframe 如何落地到 kudu 以及如何读取 kudu 表,那么来进行一个操作。

 

一、DataFrame 写入表

切换到 idea 当中,在 idea 当中创建一个新的方法,这个方法就先去说 kudu 的表如何使用 dataframe 来进行写入。

@Test

def dfWrite(): Unit = {

// 1.创建 KuduContext SparkSession

val spark = SparkSession. builder()

.master( master = "1oca1[6]")

.appName( name = "kudu")

.getorcreate()

// 2.读取数据

val originDF = spark . read

.option( "header", value = false)

.option("delimiter", value = "\t" )

.csv( path = "dataset/studenttab1ek")

//3.写入 Kudu

val TABLE_ NAME = " students"

val KUDU_ MASTERS =192.168. 169.101:7051, 192.168.169.102:7051,192.168.169.103:7051,"

import org. apache.kudu.spark.kudu._

originDF.write

.option("kudu.table", TABLE_ NAME)

.option("kudu.master", KUDU_ MASTERS)

.mode( SaveMode.Append)

.kudu

}

}

case class Student(name: string, age: Int, gpa: Double)

 

第一个就是 dfwrite,这个时候,第一步肯定还是要创建 sparksession,直接拷一份,因为这个代码已经写过很多遍了。第二步,如果要把一个数据写到 kudu 当中,是要先去读取这个数据。前面介绍了一个 API student,有一个学生表大概有1万条数据,接下来就读取那个学生表的数据来进行相应的操作。第三步,读取完了数据就写入 kudu 中。

读取数据第一步要把数据集给整出来,先进入到 DMP 的课程目录当中,有一个 files 目录,里面有一个studenttab10K,复制这个数据集,然后打开 idea,拷贝到 idea 当中 dataset 的这个目录当中,拷贝过来以后就是这样的一个数据集,有三列。

image.png

第一列姓名,第二列年龄,第三列平均成绩,也就是 GPA,美国的成绩计算跟这不太一样,满分是五分。接下来就进入到这个代码当中,第一步,读取数据,使用 spark.readread 的时候怎么读呢?可以直接 csv。但是注意,要给定一个 option,第一个 option 要去指定 header 是什么,是没有这个 header 的,所以指定为 false。第二点要去指定 delimiter,对应的是 /t,刚才数据集中间的分隔符是一个制表符,接下来使用 csv 进行读取,从 dataset 下的 studenttab10K 读取,接下来就创建出来一个 dataframe,叫做 originDForigin 就是原始的意思,原始数据集。写入到 kudu 当中,originDF 直接 writewrite 以后需要指定一些参数,比如要告诉 dataframe,要告诉写入的框架,要访问的 kudu 的表叫什么名字。那么就 kudu.table,这个表叫什么名字呢?可以定义出来,尽量要注意这个代码规范,要先定义 TABLE_NAME,这个 TABLE_NAME 叫做 students,把 TABLE_NAME 放在这。

第二步,指定 option,要去指定 kudu master 在什么位置。 把 master 拷贝过来,之后直接把 KUDU_MASTERS 传进来。

第三步,指定 modeSaveMode 对应的是 Append,为什么写 Append 不能写 over right,因为 kudu 这个版本,就只是当前这个版本,直到在录这个视频的时候的这个版本,kudu 还不支持 data frame over right 这种模式,只支持 Append,也就是数据追加到 kudu 的表当中,但是不能覆盖以往的数据,这个时候如果要把内容写到 MySQL 当中,应该调用 jdbc 这个方法,但是如果要写到 kudu 当中,就应该调用 kudu 这个方法,但是没有 kudu 这个方法,这明显也是一个隐式转换,这个时候就可以导入对应的隐式转换,import org.apche.kudu.spark.kudu._,就这样的一个这个操作就可以使用 kudu 来去进行相应的写入了,可以去运行一下。

@Test

def dfRead(): Unit = {

val spark = SparkSession. builder()

.master( master = "1ocal[6]")

.appName( name = "kudu")

.getorcreate()

val TABLE_ NAME = " s tudents'

val KUDU MASTERS = "192.168.169. 101:7051,192.168.169.102:7051,192.168.169.103:7051"

import org.apache.kudu.spark.kudu._

val kuduDF = spark. read

.option( "kudu.table", TABLE_ NAME )

.option("kudu. master", KUDU_ _MASTERS)

.kudu

kuduDF . createOrReplaceTempView( viewName = "kudu_ students")

val projectDF = spark.sq1( sqlText = "select name from kudu_ students where gpa > 2")

projectDF.show()

}

}

运行一下以后,如何进行读取,相对来说更简单了,就直接 test def-dfReaddfRead 当中第一步把 sparksession 拷过来,还要把 table master 的名字拷过来,所以把这两个参数也拷过来,就可以进行相应的读取了。直接使用spark 来进行 read 这样的一个方法进行读取,读取的时候第一步,要指定 optionoption 里面要有 kudu.table,要指定是哪张表,那么就是 TABLE_NAME

第二点,要指定一个 option 去告诉 kudumaster 在哪,要把 kudu master 指定出来,第三点,在读取的时候可以直接.kudu 来进行相应的读取。但是在使用 kudu 之前,应该先去引入刚才的隐式转换,把隐式转换拿出来放在相应位置,这个时候就可以进行读取了,读取出来一个 dataframe,叫做 kuduDF,读取出来 kuduDF 以后可以把 kuduDF createOrReplaceTempView,创建出来 kudu_students 这样的一张表,创建出来这张表以后直到 kuduDF.createorReplaceTempview(viewName="kudu_students") 这行代码的时候,其实还没有发生读取这一行为,还没有遇到 action 操作,这个读取根本就不会发生。创建一个 projectDF,也就是投影过的 df,去验证一下投影,可以直接 select name 这一列,from kudu_students where gpa>2,这样就做了又有筛选,又有投影。接下来生成的 df val projectDF = spark.sql( sqlText = "select name from kudu_students where gpa > 2") 这行代码为止,读取行为还没有发生,在拿到 projectdf show 之后要去展示数据的时候就是一个 action 了,遇到 action 以后数据才可能真正的发生读取。在运行这段代码之前先看一看刚才出错了

image.png

unknown column:_cs,这个是一个非常常见的问题,这个问题是由于在读取 student 的时候,在 Write 的这个方法当中, 在读取 student 的数据当中里面没有 header,没有 header 是没有指定 schema,读取出来的列名叫做 c01c02这样的列名,应该把schema 拷过来,读取数据的时候要去指定 schema,指定完 schema 以后把 schema 通过这个 schema传进来,然后 schema,再试着去读取这个数据集写入到 kudu 当中。在进行写入,写入还没有执行完,再去执行一下写入,通过 schema 读取 student 数据集的时候,通过 schema 指定了 dataframe 当中的列名,在往 kudu 里面塞的时候再去存储到 kudu 的时候,这个列名就对应了 kudu 表当中的 nameage 对应了 kudu 表当中的 age,这个时候就能存进去了,它就知道哪一行数据、哪一列数据对应它表中的哪一列数据

image.png

同时也能看到执行是没有问题的,Test past

写入没有问题了,来读一下,来去 project,来去谓词上推一下,看看有没有问题。去运行 read

image.png

这个时候数据很明显已经获取到了,直接查看这个数据集,这些人其实就是成绩超过平均分2分的人,只打印出来了 name 这一列,这就是这一份代码,这就是 spark dataframe 整合 kudu 的操作方式。

简单回顾, spark 整合kudu,首先写就是 write,然后读就是 read

相关文章
|
20天前
|
SQL 消息中间件 数据处理
DataX读取Hive Orc格式表丢失数据处理记录
DataX读取Hive Orc格式表丢失数据处理记录
150 0
|
5天前
|
SQL 分布式计算 JavaScript
利用SparkSQL读写Excel数据
利用SparkSQL读写Excel数据
12 0
|
20天前
|
SQL 分布式计算 关系型数据库
Spark【Spark SQL(二)RDD转换DataFrame、Spark SQL读写数据库 】
Spark【Spark SQL(二)RDD转换DataFrame、Spark SQL读写数据库 】
|
9月前
|
SQL HIVE
Hive新增表在Impala查不到问题解决
Hive新增表在Impala查不到问题解决
93 0
|
SQL 存储 HIVE
hive简单操作练习-表操作
hive简单操作练习-表操作
|
SQL 分布式计算 HIVE
spark sql编程之实现合并Parquet格式的DataFrame的schema
spark sql编程之实现合并Parquet格式的DataFrame的schema
298 0
spark sql编程之实现合并Parquet格式的DataFrame的schema
|
SQL 存储 数据库
hive 删除某个分区中部分数据
hive 删除某个分区中部分数据
|
SQL 分布式计算 Java
SparkSQL 读写_Hive_写入数据_配置 | 学习笔记
快速学习 SparkSQL 读写_Hive_写入数据_配置
195 0
|
SQL 分布式计算 大数据
SparkSQL 读写_Hive_读取 Hive 表 | 学习笔记
快速学习 SparkSQL 读写_Hive_读取 Hive 表
247 0
SparkSQL 读写_Hive_读取 Hive 表 | 学习笔记
|
SQL JSON 分布式计算
SparkSQL 读写_分区 | 学习笔记
快速学习 SparkSQL 读写_分区
337 0