开发者学堂课程【NoSQL 数据库 Kudu 教程:Spark 操作 kudu --环境搭建 & ;创建表操作】学习笔记,与课程紧密联系,让用户快速学习知识。
课程地址:https://developer.aliyun.com/learning/course/723/detail/12908
Spark 操作 kudu --环境搭建 & ;创建表操作
内容介绍:
一.引入依赖
二.创建表
到目前为止,已经听说过几个上下文,例如 SparkContext , SQLContext , HiveContext , SparkSession , 现在,我们将使用 Kudu 引入一个 KuduContext 。这是可在 Spark 应用程序中广播的主要可序列化对象。此类代表在 Spark 执行程序中与 Kudu java 客户端进行交流。 KuduContext 提供执行 DDL 操作所需的方法,与本机 Kudu RDD 的接口,对数据执行更新/插入/删除,将数据类型从 Kudu 转换为 Spark 等。
通过 spark 去操作 kudu。kudu 作为数据存储系统,spark 作为数据分析计算的引擎,两个段可以直接做一个整合,有时候后续可以通过 Apache 直接查询 kudu 当中表的数据,或者说针对 kudu 进行增删改查操作法,那么在 spark 操作 kudu 的时候,整体逻辑跟 java 操作 kudu 比较类似,也是通过一个所谓的客户端跟我们交互,只不过在 spark 当中,这个类叫做 KuduContext 一个上下文对象,这个对象就会代表我们的 spark 程序,跟我们的 kudu 的 java 客户端进行交互,进行数据表的相关的增删改查操作,所以说这个上下文很关键。事实上学习完之后,我们会明白在 spark ,甚至说在当下,大家会接触了很多上下文, SparkContext , HaviContext , SparkSession 等,我们当下就出现了一个新的,有了它之后,它上面就封装好了各种 app ,各种方法,看到它们所规定的属性参数传递就可以执行相关的操作。
一.引入依赖
<repositories>
<repository>
<id>cloudera</id>
<url>https://repository.cloudera.com/artifactory/cloudera-repos/</url>
</repository>
</repositories>
<dependencies>
<dependency>
<groupld>org.apache.kudu</groupld>
<artifactld>kudu-client-tools</artifactld>
<version>1.6.0-cdh5.14.0</version>
</dependency>
<dependency>
<groupld>org.apache.kudu</groupld>
<artifactld>kudu-client</artifactld>
<version>1.6.0-cdh5.14.0</version>
</dependency>
<!--https://mvnrepository.com/artifact/org.apache.kudu/kudu-spark2-->
<dependency>
<groupld>org.apache.kudu</groupld>
<artifactld>kudu-spark2_2.11</artifactld>
<version>1.6.0-cdh5.14.0</version>
</dependency>
<!--https://mvnrepository.com/artifact/org.apache.spark/spark-sql-->
<dependency>
<groupld>org.apache.spark</groupld>
<artifactld>spark-sql_2.11</artifactld>
<version>2.1.0</version>
</dependency>
</dependencies>
首先先要引入一下依赖,这里我们需要几个包:kudu-client-tools 工具包, kudu-client 包以及 kudu-spark2_2.11 整合包。 kudu-spark2_2.11 包很重要,这里面存放着 KuduContext ,没有它我们就无法开发。还有 spark-sql_2.11 包,后面我们也可以通过 spark-sql 来去访问 Kudu 数据。在这里有个细微区别,我们当下这里面使用的几个包都是来自于 cdh5.14.0 商业版本,并不是 Apache 版本。所以说需要在 program 中加入一个 repository ,因为我们知道 cdh 的 jar 包并没有出现在中央仓库中,所以必须指定用 cloudera 的仓库下载,事实上 apache 版本和这些版本从我们的 API 层面来说没有太大的区别,在软件兼容性上面连续做会更好一点。明白这些以后,我们进行相关操作。
打开 IDEA
打开 pom.xml 文件。在当中把之前 apache 版本的 kudu-client 给它注释掉,引入我们新的 kudu-client 版本,选择好下面这些包之后,重新加载一下。
如果说你自己的本地环境没有联网的话,能起到一个联网的环境,使得把这些依赖下载进来。
下载完之后,编写 spark 程序。这里可能会遇到一个工具上的小问题,比如说:在这里我们右键选择一个 Scala Class ,但是有的时候会发现并没有 Scala Class ,只有 Java Class 。这时候选择 File ,找到 Project Structure 工程结构。
选择 Global Libraries ,这里有一个 scala-sdk-2.11.8 编译器,其实如果出错了也会有,这是后把这个编译器选中,点击减号,把它删除。
删除之后,点击加号,重新手动再加一遍 scala-sdk-2.11.8 ,把这个系统中已绑定的版本 scala-sdk ,点击 ok ,加入到当前的 example-kudu 当中,点击 ok 。就可以编写 scala 程序了。
如果有 Scala Class 就直接用,如果没有就按上述方法来做一个简单的修改。接下来我们简单编译一下,先 new 一个 Scala Class ,定义一个 Object ,名字叫做 testSparkKudu ,点击 ok 。
创建好之后,进来是一个 spark 的对象,首先来一个 mian 方法。
二.创建表
接下来是想办法固定使得 api ,当去创建表的时候,先不看具体 api ,应该先明白整个思路。需要建立连接,要清楚创建的表叫什么名字,有哪些字段,有哪些是主键,有什么重要的选项分居属于什么,然后就调用 create table api 。怎么猜想就怎么操作,只不过 api 叫什么名字,我们可不知道,可以去官网找。这里要注意,在具体去创建表之前,需要去初始化,构建很多跟开发环境相关的对象,最终要创建的就是 KuduContext , KuduContext 就是 spark 当中用与跟 kudu 进行通信的一个对象,里面分组的方法,但是需要它并不是一下子就出来的,它依赖于SparkContext , SparkContext 需要 SparkConf 对象,所以说在前面我们需要创建这几个对象,这里我们采用 spark 2.0 之后的一个新的方式 SparkSession , 有了 SparkConf 可以直接用 SparkContext ,也可以用 Session 进行一些具体的操作。
首先打开编译器。
1. 构建 sparkconf 对象。
2. 对它进行一个相关的设置,比如说这个 spark 程序的名字,用什么模式来启动,这里我们根据讲义来使用本地模式,因为现在不提到继续本地性测试一下,感受一下 spark 与 kudu 的一个整合,所以说先 set 一个叫做 AppName 。名字就和 SparkKudu 一样。
3. 接下来再 set 一下 Master。Spark 主节点 set.Master 。使用 local 模式,启动两核 local 。通过这个 api 使用快捷键 ctrl +alt + v ,就得到了 sparkconf 。
4. 接下来我们去构建 sparksession 对象。直接用 sparksession中的builder 方法,通过这个方法调用它的 config ,还需要 sparkConf ,叫做 getOrCreate ,直接返回就得到 sparksession 。
5. 构建 sparkcontext 对象。接下来用sparkSession.sparkContext ctrl + alt + v 返回一个 sparkContext 。通常称作 sc 。
6. 接下来就是构建最重要的 kuducontext 对象,用于和kudujava 客户端交互,操作 kudu 数据。直接通过api 去 new 一个 KuduContext ,里面需要两个参数,一个是 kudumaster ,另一个是 SparkContext 。
接下来找到 kudumaster 的地址。
当前这个 KuduContext 已经过时了,但是不妨碍我们使用,还需要使用一个 sc ,之前是直接指定 KuduContext 是一个过程 api ,但是传到 sc 这个是不过时的,通过它,ctrl + alt + v 就得到了 kuduContext 。
后面的操作都是根据 kuduContext 来的。我们可以测试一下。这里面很多方法已经提供给我们了。诸如,创建表 createTable ,删除表 deleteTable ,表是否存在 tableExists 等等。创建表之后,基于表再进行数据的增删改查也是比较方便的。
7. 接下来定义一个表的创建。通过 kuducontext 创建一张表。创建一张表显然和 createTable 有关,但是敲完这个 api 后我们会发现里面的属性有点多,点进去可以发现,这里面有很多个参数,表名 tableName 是我们所需要的, schema 也需要,有哪些字段,还有 StructType ,这是 schema 的数据类型, keys 主键,还有 options 选项,所以说和 java api 最大的区别就是一点。最重要的一点是必须结合的 api ,它需要什么,就指定什么。根据之前的要求,大概需要这几个信息,表名, schema , 主键 keys ,表的选项 options 。
8. 创建所需要的对象,首先先把相关的东西提到上面来,比如说相关的表名 tableName ,指定一下我们创建的表名,创建一个名叫 spark_kudu_student 的表名。表名是通过这个 spark 来创建的。
schema 需要的对象属性: StructType 。 StructType 这个类型比较好创建,我们去指定表有哪些字段类型的时候,必须去创建一个 StructType ,用 StructField 来指定字段和数据类型。但是要注意 StructType 需要的是一个集合,所以这里就需要用 spark 的语法去创建,
比如这里定义了四个字段,这些字段是由加一直加到 Nil 一个空的集合当中,在集合当中,首部添加上这四个字段构成我们的 StructType 。当中做一个导包,选中 StructType ,alt + enter ,一定要注意导的是哪个包。
在上面看一下,看看导的包对不对。
StringTble 也需要导包。注意我们导的都是 spark.sql 类型。
IntegerType 也需要导包。 false 是什么意思呢?后面叫做 nullable ,Boolean 类型的。说明它这里用 Boolean 类型来控制这个字段是不是可以为空。比如说,这里是 false ,它必须不能为空,如果是 true 就是允许为空。这里 false 表示不能为空,最后就能满足我们的需求了。
接下来指定表的主键信息。这里我们以 Id 为主键。注意写完之后,还得看看 p_key 到底是不是 String 类型。这里会发现,类型不匹配。 expected 是 Seq[String] ,但是 actual 是 String 。所以这里要进行一个修改。
指定表的 option 属性。在 java 中通过 createtable option 。点击 option ,会发现里面也叫做 CreateTableOptions , api 都是一样的。
通过它返回我们会得到 options
接下来在 option 当中,指定一个 Hash 的分区方式。后面 columns 是一个集合, List 列根据哪个字段进行分区,以及分区的个数 int 。
指定用于分区的字段。它所要的是一个集合,而且是一个List<String> 集合。然后 new 一个ArrayList ,这时候我们可以直接把 java 当中的api 直接调过来,在 spark 中可以无缝地调用 java api 。然后再指定主键,以及字段的分区数。
判断该表是否存在,如果不存在,进行表的创建。首先做一个判断,用 kuduContext 判断 tableExists 。意思是如果表名 tableName 不存在,就执行 kuduContext.createTable 操作。至此,我们开发创建表的操作就都完成了,我们会发现 api 非常多,但是思路又是很清晰,这几个字段信息包括表名,表的 schema ,表的主键,表的 option 性和 java 操作一摸一样,只不过这里需要注意 api , api 不需要记,可以从官网,或者参考 pdf ,最后再对后面的信息做一下简单修改就可以了。
9. 最后运行一下 spark_kudu_student 表。最后运行出来的结果为 exit code 0 ,运行成功。
最后回到 kudu 进行一个刷新,点击 Tables ,这里多了一个表叫做 spark_kudu_student 。点击 00d2c5f477e5403bb018effc69dfdcde 。
表的字段:id , name , age , sex 都没有问题,根据 id 做一个 Hash 分为6个部分,这样我们就完成相关的创建操作。跟之前 java 操作一摸一样,只不过 api 换成了 spark scala 的这一套语法。
代码如下:
package cn.itcast.kudu
import org.apache.spark.SparkConf
/**
*Created by Allen Woon
*/
object testSparkKudu{
//指定创建的表名
val tableName =
“spark_kudu_student”
def main(args:Array[string]):Unit={
//构建 sparkconf 对象
val sprkConf: SparkConf=new SparkConf().setAppName(
“SparkKudu ”).setMaster(“local[2]”)
//构建 sparksession 对象
val sparkSession: SparkSession = SparkSession.builder().config(sparkConf).getOrCreate()
//构建 sparkcontext 对象
val sc: SparkContext=sparkSession.sparkContext
//todo 构建 kuducontext 对象 用于和 kudujava 客户端交互 操作 kudu 数据
val kuduContext=new KuduContext(
“node-1:7051,node-2:7051,node-3:7051”,sc)
//通过 kuducontext 来创建一张表(需要传递表名 schema 主键 option)
//定义表的 schema 信息
val schema = StructType(
//用 StructFeild 封装字段(名称,类型,是否可以为空)
StructFeild(
“Id”,StringType,false) ::
StructFeild(
“name”,StringType,false) ::
StructFeild(
“age”,IntegerType,false) ::
StructFeild(
“sex”,StringType,false) :: Nil)
//指定表的主键信息
val p_key =Seq(
“Id”)
//指定表的 option 属性
val options = new CreateTableOptions
//指定用于分区的字段
val list = new util.ArrayList[String]()
list.add(
“Id”)
options.addHashPartitions(list,6)
//判断该表是否存在 如果不存在 进行表的创建
If(!kuduContext.tableExists(tableName)){
kuduContext.createTable(tableName,schema,p_key,options)
}
}
}