Spark 操作 kudu--dataFrame , sparkSQL 操作 | 学习笔记

简介: 快速学习 Spark 操作 kudu--dataFrame , sparkSQL 操作

开发者学堂课程【NoSQL 数据库 Kudu 教程Spark 操作 kudu--dataFrame , sparkSQL 操作学习笔记,与课程紧密联系,让用户快速学习知识。

课程地址:https://developer.aliyun.com/learning/course/723/detail/12910


Spark 操作 kudu--dataFrame , sparkSQL 操作


内容介绍:

一.使用 dataFrame 进行读取 kudu 数据

二.使用 dataFrame 写数据到 kudu

三.使用 sparksql 操作 kudu

 

spark 操作 kudu 当中的另一套 api 。主要是通过 dataFrame 相关的 api 进行数据的读取和写操作,甚至最后可以使用 sparkSQL 写标准的 SQL 语句来操作 kudu 中的表 。

 

一.使用 dataFrame 进行读取 kudu 数据

打开 IDEA,用 dataFrame 首先得去获得 dataFrame 。这时使用 sparkSession 在当中我们就可以进行 read 读取,可以读取 json 文件格式,可以读取 text 文本格式,现在还有读取 kudu 的方式,我们需要去指定读取的数据在哪里,这里提供了一个 option 的选项,就是把我们读取数据的哪些以及在哪里相关的信息告诉给我们,可以选择 option 往里面封装,也可以选择 options 里面封装的一个集合,例如我们需要几个甚至多个字段属性都放到 Map 当中。这里我们使用 options ,还需要 kuduOption

image.png

接下来我们需要指定 kuduOption ,它需要的是一个集合,所以我们直接来创建一个 Map 。在这个当中的我们需要两个,第一个叫做 kudu.master kudu.master 在下面指定。

image.png

为了方便,把这一块做一个稍微的修改。

image.png

把下面的代码截取上来。为了引用方便,指定 kudu 集群 master 地址

image.png

不要忘了在下面也填上 kuduMaster 。要不然等会创建不了 kuduContext

image.png

指向 kuduMaster 这是第一个属性,第二个属性是 kudu.table ,它指向 tableName 。有了这两个 option 就可以构建 dataFrame

image.png

返回来创建 kuduOption ,接下来采用 kudu 的方法,帮助转化成 dataFrame ,但是这个方法需要手动地去导报,导报的路径就在下面 import org.apache.kudu.spark.kudu_ 。一定需要导包。

image.png

点击 kudu 返回 ,这个方法返回来就是 DataFrame 的一个类型。接下来直接通过 show 方法,来展示结果。这样就完成了使用 dataFrame 读取数据。右键执行一下。上面是通过 dataFrame 来读取数据的,下面是传统的查询数据,两个结果应该是一样的。

image.png

执行结束,这就是通过 dataFrame 来读取数据,上面显示的是表的形式,下面是之前传统的形式。

image.png

 

二.使用 dataFrame 写数据到 kudu

首先准备一下数据,直接复制一下

image.png

插入一个新的数据,记录为5,名字叫做 itheima ,年龄 20 ,性别男。通过 data 创建一个 RDD ,把它转化成一个 DataFrame

image.png

接下来用 dataFrame.write 写到 options 里,这里还需要 kuduOption ,这样就会把数据写到 kudu 表当中。后面再加上 .kudu

image.png

接下来看一下操作能否执行。最终查询没问题,插入操作出现了错误。之前再讲 dataFrame 进行数据写的时候有所谓的几种模式。

image.png

在这里进行 write 的时候 ,加入 mode 。这里默认的是报错模式,我们应该选择 append

image.png

覆盖模式和忽略模式都不支持,默认是一个报错模式,所以一定要说明它的模式 mode ,接下来我们运行一下。上面呢是我们在插入之前操作的两条记录,下面是我们插入之后的。这就完成了通过 dataFrame 读数据到 kudu

image.png


三.使用 sparksql 操作 kudu

最后,这里还支持通过 sparksql 操作 kudu 。拿到 dataFrame ,可以把它注册成一个表,然后用 sql 语句来查询,更加地方便。

在之前读取数据的时候,已经得到一个 DataFrame ,来做一个结果返回。在 kudu 这里 ctrlaltv

返回一个叫做 dataFrame1 ,然后再把使用 dataFrame 写数据的代码先注释掉。

image.png

首先把 dataFrame1 给它创建成一个临时表,起个名字叫做 tmp 。基于这个表进行创建会非常方便

image.png

sql里面就能写标准 sql 了,比如说这里写 select* from tmp 。讲义上面更高大上,还创建了临时表,把查询的结果往里面进行插入,做一个条件判断。我们这里做一个查询显示结果。另外再统计一个 sparkSession.sql 叫做 select cout(*) from tmp

image.png

右键运行一下,看是否能运行成功。这样就返回出了结果。第一个 select * 的结果,第二个 count(1) 的结果,第三个传统的结果。Spark sql java 更加地简单。

image.png

代码如下:

package cn.itcast.kudu

import org.apache.spark.SparkConf

/**

*Created by Allen Woon

*/

//定义样例类 用于封装表的一行数据

case class StudentId:String,name :String,ageInt,sex:Stringobject testSparkKudu{

//指定创建的表名

val tableName = spark_kudu_student

//指定 kudu 集群 master 地址

Val kuduMaster = node-1:7051,node-2:7051,node-3:7051

//指定 kuduOption

Val kuduOption = Map(

kudu.master ->

)

def main(args:Array[string]):Unit={

//构建 sparkconf 对象

val sprkConf: SparkConf=new SparkConf().setAppName(SparkKudu   ).setMaster(local[2])

//构建 sparksession 对象

val sparkSession: SparkSession = SparkSession.builder().config(sparkConf).getOrCreate()

//构建 sparkcontext 对象

val sc: SparkContext=sparkSession.sparkContext

sc.setLogLevel(warn)

//todo 构建 kuducontext 对象 用于和kudujava 客户端交互 操作 kudu 数据

val kuduContext=new KuduContext(kuduMaster,sc)

//todo 通过 kuducontext 操作 kudu

//1.通过 kuducontext 来创建一张表(需要传递表名 schema 主键 option)

createTable(kuduContext)

//2.插入数据操作

insertData(sparkSession,sc,kuduContext)

//3.查询表的数据

//指定查询表的字段

queryData(sc,kuduContext)

/*val columnProjection List(Id,name,age,sex)

val rowRDDRDD[Row]=kuduContext.kuduRDD(sc,tableName,columnProjection)*/

//4.更新表的数据

updateData(sparkSession,sc,kuduContext)

queryData(sc,kuduContext)

/* val data = List(Student(1”,“zhangsan,20,m))

val studentRDD: RDD[student] = sc.parallelize(data)

import sparkSession.implicits._

val dataFrame: DataFrame = studentRDD.toDF

//kuduContext.updateRows(); 更新表的数据如果记录不存在报错

kuduContext.upsertRows() //记录存在进行更新操作,否则执行插入操作。*/

//5.删除表的数据

deleteData(sparkSession,sc,kuduContext)

/*val data = List(Student(2”,“lisi,30,w))

val studentRDD: RDD[student] = sc.parallelize(data)

import sparkSession.implicits._

val dataFrame: DataFrame = studentRDD.toDF.select(Id)

kuduContext.deleteRows(dataFrame,tableName)*/

//6.使用 dataFrame 进行读取 kudu 数据

val dataFrame1: DataFrame = sparkSession.read.opitons(kuduOption).kudu

//7.使用 dataFrame 写数据到 kudu

val data = List(Student(5”,“itheima,20,m))

val studentRDD: RDD[student] = sc.parallelize(data)

import sparkSession.implicits._

val dataFrame: DataFrame = studentRDD.toDF

//Currently,only Append is supported 当下只支持追加模式到数据

dataFrame.write.mode(append).options(kuduOption).kudu

//8.使用 sparksql 读取 kudu 数据

dataFrame1.createTempView(tmp)

sparkSession.sql(select * from tmp).show()

SparkSession.sql(select count(*) from tmp).show

queryData(sc,kuduContext)

sc.stop()

}

Private def deleteData(sparkSession: SparkSession: SparkContext,kuduContext: KuduContext)= {

//准备待插入的数据

val data = List(Student(1”,“zhangsan,20,m),Student(1”,“zhangsan,20,m),Student(1”,“zhangsan,20,m))

val studentRDD: RDD[student] = sc.parallelize(data)

import sparkSession.implicits._

val dataFrame: DataFrame = studentRDD.toDF

kuduContext.insertRows(dataFrame,tableName)

}

Private def createTable(kuduContext: KuduContext)={...}

//判断该表是否存在 如果不存在 进行表的创建

If(!kuduContext.tableExists(tableName)){

//定义表的 schema 信息

val schema = StructType(

// StructFeild 封装字段(名称,类型,是否可以为空)

StructFeild(Id,StringType,false) ::

StructFeild(name,StringType,false) ::

StructFeild(age,IntegerType,false) ::

StructFeild(sex,StringType,false) :: Nil)

//指定表的主键信息

val p_key =Seq(Id)

//指定表的 option 属性

val options = new CreateTableOptions

//指定用于分区的字段

val list = new util.ArrayList[String]()

list.add(Id)

options.addHashPartitions(list,6)

kuduContext.createTable(tableName,schema,p_key,options)

}

}

}

相关文章
|
6月前
|
SQL 分布式计算 API
Spark学习------SparkSQL(概述、编程、数据的加载和保存)
Spark学习------SparkSQL(概述、编程、数据的加载和保存)
149 2
|
1月前
|
分布式计算 大数据 Spark
大数据-95 Spark 集群 SparkSQL Action与Transformation操作 详细解释与测试案例(二)
大数据-95 Spark 集群 SparkSQL Action与Transformation操作 详细解释与测试案例(二)
39 1
|
1月前
|
SQL 分布式计算 大数据
大数据-97 Spark 集群 SparkSQL 原理详细解析 Broadcast Shuffle SQL解析过程(一)
大数据-97 Spark 集群 SparkSQL 原理详细解析 Broadcast Shuffle SQL解析过程(一)
42 0
|
1月前
|
SQL 分布式计算 算法
大数据-97 Spark 集群 SparkSQL 原理详细解析 Broadcast Shuffle SQL解析过程(二)
大数据-97 Spark 集群 SparkSQL 原理详细解析 Broadcast Shuffle SQL解析过程(二)
78 0
|
1月前
|
SQL 分布式计算 Java
大数据-96 Spark 集群 SparkSQL Scala编写SQL操作SparkSQL的数据源:JSON、CSV、JDBC、Hive
大数据-96 Spark 集群 SparkSQL Scala编写SQL操作SparkSQL的数据源:JSON、CSV、JDBC、Hive
34 0
|
1月前
|
存储 SQL 分布式计算
大数据-95 Spark 集群 SparkSQL Action与Transformation操作 详细解释与测试案例(一)
大数据-95 Spark 集群 SparkSQL Action与Transformation操作 详细解释与测试案例(一)
38 0
|
1月前
|
SQL 分布式计算 大数据
大数据-94 Spark 集群 SQL DataFrame & DataSet & RDD 创建与相互转换 SparkSQL
大数据-94 Spark 集群 SQL DataFrame & DataSet & RDD 创建与相互转换 SparkSQL
50 0
|
1月前
|
SQL 存储 分布式计算
大数据-93 Spark 集群 Spark SQL 概述 基本概念 SparkSQL对比 架构 抽象
大数据-93 Spark 集群 Spark SQL 概述 基本概念 SparkSQL对比 架构 抽象
37 0
|
4月前
|
SQL 分布式计算 大数据
MaxCompute操作报错合集之 Spark Local模式启动报错,是什么原因
MaxCompute是阿里云提供的大规模离线数据处理服务,用于大数据分析、挖掘和报表生成等场景。在使用MaxCompute进行数据处理时,可能会遇到各种操作报错。以下是一些常见的MaxCompute操作报错及其可能的原因与解决措施的合集。
|
4月前
|
SQL 分布式计算 数据处理
MaxCompute操作报错合集之使用Spark查询时函数找不到的原因是什么
MaxCompute是阿里云提供的大规模离线数据处理服务,用于大数据分析、挖掘和报表生成等场景。在使用MaxCompute进行数据处理时,可能会遇到各种操作报错。以下是一些常见的MaxCompute操作报错及其可能的原因与解决措施的合集。