Spark 操作 kudu -- 增加,删除,修改,查询操作 | 学习笔记

简介: 快速学习 Spark 操作 kudu -- 增加,删除,修改,查询操作

开发者学堂课程【NoSQL 数据库 Kudu 教程Spark 操作 kudu -- 增加,删除,修改,查询操作学习笔记,与课程紧密联系,让用户快速学习知识。

课程地址:https://developer.aliyun.com/learning/course/723/detail/12909


Spark 操作 kudu -- 增加,删除,修改,查询操作


内容介绍:

一.插入数据 insert 操作

二.查询表的数据

三.更新数据 upsert 操作 更新数据 update 操作

四.删除数据 delete 操作

 

用  spark  中的  dataFrame  去操作  kudu  中的表。之前它封装了一个类型叫做  kuducontext  ,里面有各种各样的方法,  api 供我们使用。

数据的增删改查的相关操作:Kudu 支持许多 DML 类型的操作   ,其中一些操作包含在Spark on Kudu 集成,包括:

NSERT- DataFrame 的行插入 Kudu 表。请注意,虽然 API 完全支持 INSERT ,但不鼓励在 Spark中使用它。使用 INSERT 是有风险的,因为 Spark 任务可能需要重新执行,这意味着可能要求再次插入已插入的行。这样做会导致失败,因为如果行已经存在, INSERT 将不允许插入行(导致失败)。相反,我们鼓励使用下面描述的 INSERT IGNORE

INSERT-IGNORE- DataFrame 的行插入 Kudu 表。如果表存在,则忽略插入动作。

DELETE- Kudu 表中删除 DataFrame 中的行。

UPSERT- 如果存在,则在 Kudu 表中更新 DataFrame 中的行,否则执行插入操作。

UPDATE -更新dataframe中的行。

首先打开 IDEA ,因为代码大部分是一样的,比如说构建相关的对象,包括去连接的地址,没有必要重写一遍,为了方便,在这里做一下代码的重构。

首先把创建表的语句给它抽成一种方法,让它单独存在。整个逻辑是从 todo 通过 kuducontext 操作 kudu 开始,第一个方法叫做创建一张表,需要传递表名,下面可以做一下局部优化。如果表不存在,可以进行一个创建表的操作。但是在判断之前,前面这些方法或者定义的数据类型已经有了,所以我们应该更好地表达,把If(!kuduContext.tableExists(tableName)) 提到前面,在创建表之前,判断好之后,把这一段逻辑从我们这里做一个相关的抽取。从判断该表是否存在一直到末尾,按住快捷键 altShiftM (有的人应该是 ctrlaltM ) ,抽取完之后,把这个方法处理到当前的 sparkkudu 当中。给它起个名字 createTable 。创建好之后,

image.png

创建好之后,建表的语句就放在下面了。后面作为方法可以关闭。这样就完成了第一个。事实上专业一点,最后应该把 sc 给关闭一下,不关闭也没关系,因为随着方法的结束,会把它强制关闭。专业一点可以这么写。这样在进行增删改查之前,做了与代码相关的重构,使得看起来稍微简洁一点。

image.png

 

一.插入数据 insert 操作

kuduContext ,点进去发现方法有很多,这里用 insertRows 插入一行记录,插入数据的时候,首先需要数据 data ,数据类型 dataFrame 以及 表名 tableName 。接下来准备待插入的数据。

image.png

创建一个集合,dataFrame 可以用好多方式去创建,这里我们创建一个 List 来存放数据。为了方便封装数据,定义一个样例类。有了样例类之后。样例类关键是case class ,起个名字叫做 student ,这个类中需要一些字段属性,比如: string 类型的 ID , name , sex ,还有 int 类型的 age 。有了样例类之后,在这个数据里面就比较方便了,直接来一个 Student 对象,然后简单准备几个数据,比如说1号叫做 zhangsan ,年龄是 20 ,性别男的表示 m ,女的表示 w 。多在集合里面存放几个数据,接下来放几个样例类。2号叫做 lisi ,年龄为30岁,是一个女生。3号叫做 wangwu ,年龄为30岁,性别为男。有了数据之后,把它们转化成 dataFrame ,可以用 sc 去简析集合, sparkcontext 可以使文件读取,一种数据读取。所以我们直接调用sc.parallelize 简析 data ,把它转为RDD ,叫做 studentRDD ,这个 RDD 当中存放的就是每一个数据的对象。有了 RDD 之后,我们把它转化为 dataFrame ,调用它的 toDF 方法。但是发现没有这种方法,这是因为需要把 sparksession 的一次转化给导入进来,所以还需要进行 import 操作, import 当前环境下 sparkSession 路径下的叫做 implicits 下的所有。转化完之后,就可以把 toDF 转化为 dataFrame 。再把 toDF 传给它,进行相关的 insertRows 操作,就完成了数据的插入。接下来做一下执行,看一下是否能插入成功。再插入之前,把创建表的语句给注释掉。运行一下。

image.png

这里面会涉及大量 spark 的语法

image.png

这个地方出现一个错误 Unknown column: ID ,这表名我们有的字段写错了,或者说字段不能够识别 ID

image.png

下面定义的是 Id , 而上面写的是 ID

image.png

image.png

修改完之后,重新执行一遍。没有报错,执行成功。这里显示打印的日志比较多,我们设置一下日志级别。这样就完成了插入表数据。

image.png

image.png

插入表数据通过 kudu 页面,看不出来里面的数据。

image.png

接下来我们做一下数据的相关的查询。为了方便,把这段方法逻辑给它抽成一个方法。选中这些逻辑, shiftaltm 快捷键,定义一个方法叫做 insertData

image.png

完成之后,系统报错。

image.png

这里我们针对这个方法做一个相关的修改。在这里直接把sparkSession 传进来,否则转换之后找不到。把当前环境中的 sparkSession 传进来。在 private def 里面也加一个 sparkSession。这样也就多加了一个方法,完成数据插入工作。

image.png

 

二.查询表的数据

首先用 kuduContext ,调用 kuduRDD ,里面需要 sc , tableName 以及还需要指定查询表的字段。创建一个集合 List ,这几条数据进行 ctrlaltv 返回 columnProjection 。这样就完成了字段的指定。 RDD 返回的结果 ctrlaltv ,就变成了 rowRDD ctrlaltv 可以看出 RDD 里面封装的 Row 就是一行记录,kuduRDD 通过去查询数据,它把表当中的一行记录变成 RDD 当中的一条记录,所以这样查阅起来对 RDD 的便利就已经搞定了。通过 rowRDD 来访问它的 foreach 方法,来个匿名函数,直接 println ,把里面的每一个进行一个打印。这样就完成了数据的打印。现在把插入数据操作给注释掉,看看查询数据是否正常。

image.png

可以看出最后成功地返回了结果。符合我们刚才所查询的记录。

image.png

为了方便,也把这一段抽成一个方法,选中指定查询表字段的代码 shiftaltm ,将这种方法命名为 queryData

image.png

 

三.更新数据 upsert 操作 更新数据 update 操作

首先先把查询数据的代码给注释掉。接下来就是更新表的数据,我们会发现这些 api 相比较于 java 是非常简单的。还是通过 kuduContext ,更新数据的方法这里有两个,一个叫做 updateRows ,另一个叫做 upsertRows 。我们对这两个进行一个简单的说明:updateRows 更新表的数据如果记录不存在报错; upsertRows 记录存在进行更新操作,否则执行插入操作。接下来看一下 upsertRows 需要那些方法,更新数据需要 dataFrame , tableName tableName 有了,需要准备 dataFrame 。需要更新的数据从下借鉴一个,将中间这段进行复制,把它放到 upsertRows 里。接下来更新 zhangsan 这个数据,1zhangsan 更改为 itcast ,年龄更改为18,性别选择不变。通过它我们创建了 studentRDD.toDF 。直接进行相关的操作。

image.png

接下来运行一下,看看是否能完成更新操作。结果发现没有报错,因为我们调用的是 upsertRows 操作。

image.png

把查询的方法进行一个复制

image.png

接下来我们把更新数据也抽成一个方法。选中这些代码,shiftaltm ,将其命名为 updateData

image.png

这里还需要改为 sparkSession 。这就是完整的一个抽取。

image.png

接下来先把更新表数据的代码先注释掉,然后运行一下,看是否把它变过来。最终结果成功地更新了数据。如果说我们指定的 Id 不存在,不是我们的 Id ,这时候就会报错,当然具体还得看相关的方法,我们调用的是 upsertRows 还是 updateRows

image.png

 

四.删除数据 delete 操作

删除数据操作分为删除一条记录和删除表

删除表数据就比较简单了。还是通过 kuduContext寻找 deleteRows 也是需要 DataFrame tableName 。下面还有一个叫做 deleteTable ,只有 tableName ,其作用是直接删除表。删除表就不在这里详细叙述了。

image.png

同样也得准备数据,我们也从下面借鉴一下。

image.png

比如说把2 lisi 把这个数据准备成 dataFrame 来进行删除。能不能删除,运行一下。发现报错了,删除不应该包含这个字段 name

image.png

column 当中我们要删除表,我们只要根据 Id ,就是我们所谓的主键就可以进行删除了,那么至于叫什么不用关心,所以说我们这里准备的数据有两个,比如说 List 当中构建的字段中有四个属性:Id ,name,age,sex, 但是删除的时候只需要 Id dataFrame 提供了studentRDD.toDF 方法,基于这个我们再调用一下它的 select 。这个时候只需要拿出 Id 就行了。

image.png

注意我们只能根据主键进行删除,再次运行一下,最终返回两条记录。最终成功删除记录。

image.png

这就是我们通过 spark kudu 整合的一个 api ,重点用 kuduContext 进行数据表的增删改查操作。最后为了方便,也可以把删除表数据的代码抽成一种方法。选中代码,shiftaltm ,将其命名为 deleteDate

image.png

代码如下:

package cn.itcast.kudu

import org.apache.spark.SparkConf

/**

*Created by Allen Woon

*/

//定义样例类 用于封装表的一行数据

case class StudentId:String,name :String,ageInt,sex:String

object testSparkKudu{

//指定创建的表名

val tableName = spark_kudu_student

def main(args:Array[string]):Unit={

//构建 sparkconf 对象

val sprkConf: SparkConf=new SparkConf().setAppName(SparkKudu   ).setMaster(local[2])

//构建 sparksession 对象

val sparkSession: SparkSession = SparkSession.builder().config(sparkConf).getOrCreate()

//构建 sparkcontext 对象

val sc: SparkContext=sparkSession.sparkContext

sc.setLogLevel(warn)

//todo 构建 kuducontext 对象 用于和 kudujava 客户端交互 操作 kudu 数据

val kuduContext=new KuduContext(node-1:7051,node-2:7051,node-3:7051,sc)

//todo 通过 kuducontext 操作 kudu

//1.通过 kuducontext 来创建一张表(需要传递表名 schema 主键 option)

createTable(kuduContext)

//2.插入数据操作

insertData(sparkSession,sc,kuduContext)

//3.查询表的数据

//指定查询表的字段

queryData(sc,kuduContext)

/*val columnProjection List(Id,name,age,sex)

val rowRDDRDD[Row]=kuduContext.kuduRDD(sc,tableName,columnProjection)*/

//4.更新表的数据

updateData(sparkSession,sc,kuduContext)

queryData(sc,kuduContext)

/* val data = List(Student(1”,“zhangsan,20,m))

val studentRDD: RDD[student] = sc.parallelize(data)

import sparkSession.implicits._

val dataFrame: DataFrame = studentRDD.toDF

//kuduContext.updateRows(); 更新表的数据如果记录不存在报错

kuduContext.upsertRows() //记录存在进行更新操作,否则执行插入操作。*/

//5.删除表的数据

deleteData(sparkSession,sc,kuduContext)

/*val data = List(Student(2”,“lisi,30,w))

val studentRDD: RDD[student] = sc.parallelize(data)

import sparkSession.implicits._

val dataFrame: DataFrame = studentRDD.toDF.select(Id)

kuduContext.deleteRows(dataFrame,tableName)*/

queryData(sc,kuduContext)

sc.stop()

}

Private def deleteData(sparkSession: SparkSession: SparkContext,kuduContext: KuduContext)={

//准备待插入的数据

val data = List(Student(1”,“zhangsan,20,m),Student(1”,“zhangsan,20,m),Student(1”,“zhangsan,20,m))

val studentRDD: RDD[student] = sc.parallelize(data)

import sparkSession.implicits._

val dataFrame: DataFrame = studentRDD.toDF

kuduContext.insertRows(dataFrame,tableName)

}

Private def createTable(kuduContext: KuduContext)={...}

//判断该表是否存在 如果不存在 进行表的创建

If(!kuduContext.tableExists(tableName)){

//定义表的 schema 信息

val schema = StructType(

// StructFeild 封装字段(名称,类型,是否可以为空)

StructFeild(Id,StringType,false) ::

StructFeild(name,StringType,false) ::

StructFeild(age,IntegerType,false) ::

StructFeild(sex,StringType,false) :: Nil)

//指定表的主键信息

val p_key =Seq(Id)

//指定表的 option 属性

val options = new CreateTableOptions

//指定用于分区的字段

val list = new util.ArrayList[String]()

list.add(Id)

options.addHashPartitions(list,6)

kuduContext.createTable(tableName,schema,p_key,options)

}

}

}

相关文章
|
5月前
|
SQL 分布式计算 大数据
【大数据技术Spark】DStream编程操作讲解实战(图文解释 附源码)
【大数据技术Spark】DStream编程操作讲解实战(图文解释 附源码)
43 0
|
5月前
|
Java Shell 分布式数据库
【大数据技术Hadoop+Spark】HBase数据模型、Shell操作、Java API示例程序讲解(附源码 超详细)
【大数据技术Hadoop+Spark】HBase数据模型、Shell操作、Java API示例程序讲解(附源码 超详细)
84 0
|
5月前
|
SQL 分布式计算 数据库
【大数据技术Spark】Spark SQL操作Dataframe、读写MySQL、Hive数据库实战(附源码)
【大数据技术Spark】Spark SQL操作Dataframe、读写MySQL、Hive数据库实战(附源码)
103 0
|
7月前
|
分布式计算 Hadoop 大数据
大数据Hadoop之——Apache Hudi 数据湖实战操作(Spark,Flink与Hudi整合)
大数据Hadoop之——Apache Hudi 数据湖实战操作(Spark,Flink与Hudi整合)
|
5月前
|
分布式计算 大数据 Scala
【大数据技术Hadoop+Spark】Spark RDD创建、操作及词频统计、倒排索引实战(超详细 附源码)
【大数据技术Hadoop+Spark】Spark RDD创建、操作及词频统计、倒排索引实战(超详细 附源码)
95 1
|
4月前
|
分布式计算 监控 数据处理
Spark Streaming的DStream与窗口操作
Spark Streaming的DStream与窗口操作
|
4月前
|
JSON 分布式计算 关系型数据库
Spark中使用DataFrame进行数据转换和操作
Spark中使用DataFrame进行数据转换和操作
|
4月前
|
缓存 分布式计算 监控
Spark RDD操作性能优化技巧
Spark RDD操作性能优化技巧
|
4月前
|
分布式计算 数据处理 Apache
Spark RDD的行动操作与延迟计算
Spark RDD的行动操作与延迟计算
Spark RDD的行动操作与延迟计算
|
6月前
|
SQL 分布式计算 Java
201 Spark SQL查询程序
201 Spark SQL查询程序
35 0