开发者学堂课程【2020版大数据实战项目之 DMP 广告系统(第三阶段): KuduSpark_DDL】学习笔记,与课程紧密联系,让用户快速学习知识。
课程地址:https://developer.aliyun.com/learning/course/678/detail/11784
KuduSpark_DDL
内容介绍:
一、准备工作
二、DDL
刚才介绍了如何使用 Java API 来操作 kudu,接下来介绍如何使用 spark 来操作 kudu。Kudu 是介于 OLTP 和 OLAP 之间的一个系统,它拥有这两个数据库系统的优势,能够快速的插入,也能够进行大规模的数据分析,但是 kudu 也是一个大数据系统,它能够存储大量的数据,甚至是以 TB 来去计算的数据,所以和 kudu 进行配合,一般情况下可能会使用 spark 这样的操作,也有一部分是使用 Java API,因为有可能需要在 web 程序里面向 kudu 中去插数据,但是更多见的还是使用 spark 来操作 kudu。这样,在 spark 操作这一部分就要简单说几个知识点,第一个知识点,如果想使用 spark 和 kudu 来进行整合,需要做一些前期工作。接下来介绍如何使用 spark 操作 kudu 的表去创建表、去删除表、去判断表是否存在,这个操作就称之为 DDL。
DDL 是对于模式的定义,在 DDL 当中要讲三个小知识点,第一个是创建表,第二个是删除表,第三个是判断表是否存在。
一、准备工作
首先,准备工作上来看,如果想使用 spark 和 kudu 进行整合,要导入一些 maven 的依赖。需要什么样的 maven 依赖呢?第一个,需要使用到 kudu-client,因为无论怎么去操作 kudu,这个 SDK 是不会变的,使用的客户端是不会变的,不过有可能 kudu-client 在其之上做了一层封装,提供了 spark 的 API。这个上层封装叫 kudu-spark,kudu-spark 这样的一个maven 库,主要目的是让 kudu 和 spark 进行整合,把 kudu 的操作能够融入到 spark 当中,这就是 kudu-spark 这个 API 的作用。然后要去使用到 spark 全家桶,既要用到 spark-core,也要用到 spark-sql,也要用到 spark-hive,如果要使用 spark,还需要去导入 Scala 的基础库,这就是 maven 里面可能需要的一些内容。
二、DDL
DDL 操作,按顺序大概有如下几个步骤。创建一个 kudu 表,但是一般情况下,如果要在 hive 当中去判断表、去创建表的时候,会写create table if not exist,会先去判断这个表是否存在。第一个操作就是通过 tableExists 判断表是否存在,假如这个表,在大数据系统当中和以往的关系型数据库是有区别的,大数据系统的数据库表一般是可以回溯的,比如在大数据系统当中创建了用于分析的一个表,这个表可能来源前面的一些其他的表,比如有可能是用户表,以及销售表,然后这两个表集合起来,变成了一个分析表,这张分析表是可以删除的,如果把这个分析表删除了,那么它里面数据有一些问题,或者这张表本身有一些问题,删除掉它以后,还是依然可以通过用户表和销售表重新创建一个分析表,所以在大数据系统当中,通过 delete table 删除某一张表是可以的。
接下来可以来做第三步操作,第三步操作就是使用 createTable 创建表,但是创建表的时候需要用到很多东西,其中有几个东西是没有办法忽略的,主要就是这三项,第一项,一定要提供创建表的 schema 信息,第二点,一定要提供创建表的主键,用哪个键来做分区,第三列,必须要给定访问表的一些参数,比如如何分区、一些附加信息,比如 factor 就是复制因子是多少,一个表要复制多少份一个 tablet 复制多少份。在做所有操作之前,需要先去配置 kudu 和 spark,使用 kudu 配合 spark 进行操作,有一个 API 是必须要创建的,叫做 KuduContext,接下来,就这些内容来写一些代码,测试并运行,看一看怎么操作。
第一步,拷贝这个 maven 库,
<!--Spark -->
<dependency>
<groupId>org. scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>2.11.8</version>
</ dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core 2.11</artifactId>
<version>2.2.0</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sqL_ 2.11</artifactId>
<version>2.2.0</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-hive_ 2.11</artifactId>
<version>2.2.0</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>2.6.0</version>
</dependency>
<!-- Kudu Spark -->
<dependency>
<groupId>org.apache.kudu</groupId>
<artifactId>kudu-spark2 2.11</artifactId>
<version>1.7.0-cdh5.16.1</version>
</dependency>
之后放到工程当中,打开 pom 文件,放在 dependency 节点下,其实就是一些 spark 的依赖,然后再加上一个 kudu spark 这样的一个包,这样 kudu 和 spark 的整合就已经做完了。
接下来创建新的 scala class,命名为 KuduSparkAPI,
然后等待这个类的创建,创建好了以后,现在要做一个方法。
@Test
def ddl(): Unit = {
// 1.创建 KuduContext 和 SparkSession
val spark = SparkSession.builder()
.master( master = "1oca1[6]")
.appName( name = "kudu")
.getorCreate( )
val KUDU_ MASTERS = 192.168.169.101:7051,192.168.169. 102:7051,192.168.169.103:7051,
val kuduContext = new KuduContext (KUDU_ MASTERS, spark. sparkContext)
// 2.判断表是否存在,如果存在则删除表
val TABLE_ NAME = "students"
if (kuduContext.tableExists(TABLE_ NAME)) {
kuduContext.deleteTable(TABLE_ NAME)
}
//3.创建一张 Kudu 表
val schema = StructType(
structField("name", stringType, nullable = false)::
StructField("age", IntegerType, nullable = false)::
StructField("gpa", DoubleType, nullable = false)::Nil
)
val keys = Seq("name" )
import scala.collection. JavaConverters._
val options = new CreateTableOptions()
.setRangePartitionColumns(List("name").asJava)
.setNumReplicas(1)
kuduContext.createTable(tableName = TABLE_ NAME ,
schema = schema,
keys = keys,
options = options)
}
}
要去做一些和表相关的 DDL 操作,直接把方法名命名为 DDL,第一步,在做所有操作之前要先创建 KuduContext 和 SparkSession,因为操作 kudu 和 spark 的整合就必须要用到 KuduContext,操作 spark 就必须要用到 SparkSession,这两个是必须要去创建的。第二步,要去判断表是否存在,如果存在则删除表,第三步,创建一张 kudu 表,这就是所做的三大步操作。有一点需要声明,KuduContext 在这一小节一定是用到的,SparkSession 并不会直接用到,但是还是要给它创建出来,因为它是基础。
接下来先去做第一个,创建 SparkSession,直接 SparkSession,然后 builder,builder 以后可以设置 master,设置 master 里面传入 local6,然后设置 appName,这个 appName 叫做 kudu,getOrCreate,这个时候这行代码就写完了,整理一下格式,一定要注意自己的格式,因为如果把这个代码写好了,其实不是浪费时间,这个程序你的代码大部分时间是给人看的,偶尔放在机器里面运行,所以要善待自己,因为看这份代码的人最多的还是自己。
要创建 KuduContext,拿到 KuduContext new 一个 KuduContext,之后需要在里面传入两个参数,第一个,一定要传入 KUDU_MASTERS,说明是有多个 master,第二个,要去传入 spark 的 sparkcontext,这是两个必须要传的参数。接下来就创建出来 KUDU_MASTERS,它就是一个字符串,这个字符串叫做 cdh01,但是还是写192.168.169.101:7051 这样的 IP 地址,然后复制三个这样的字符串,分别把第二项改为102,第三项改为103,KUDU_MASTERS 创建好了,KuduContext 也创建好了。
If 一下判断表是否存在,判断 kuduContext,然后判断 tableExists,需要指定一个表名,叫做 TABLE_NAME,暂且命名为 students,一个学生表,这个时候把 TABLE_NAME 传进来,这时就判断了表名是否存在,如果存在,tableExist 会返回 true,会进入到里面的这一部分,如果这个表存在,要去删除这张表,使用 kuduContext 可以deleteTable,然后 deleteTable 里面也要传入表名叫做 TABLE_NAME,这时就做完了第二步操作,判断表是否存在,如果存在则删除表。
接下来要去创建一张新的表,如果创建一张新的表 API 是 kuduContext.createTable 注意判断表是否存在就是 tableExists,删除表就是 deleteTable,创建一张表就是 createTable,createTable 当中接收三个参数,第一个参数就是 tableName,就是 TABLE_NAME,接下来逗号,然后另起一行写第二个参数,第二个参数叫做 schema,这个 schema 现在暂时给不了,然后再另起一行写第三个参数,第三个参数就是这个主键列,叫做 key,主建列暂时先放在这,然后第四个参数是 table options,现在把它补全,所以把这给一个空 null,然后这个主键列应该叫做 prime key,第四个参数是一个 create table options,这个 options 也先放在这,现在需要创建的参数有三个,第一个是 schema,第二个是 Key,第三个是 options,但是对于 key,它是多个 key,有可能 kudu 当中会有多个主键列,所以把这指定为 keys,这就是内容,就分别创建出来没有的这三个东西,第一个东西称之为 val schema,第二个是需要一个 keys,第三个是需要一个 options,分别就要把这三个内容创建出来,如果是这样的话,这个 schema 非常简单就可以创建了,现在是和 spark 进行交互,所以这个 schema 还是要给 spark 的这个 schema,给出一个 StructType,这个 StructType 就是 spark当中的 schema 的定义,StructType 当中可以给出 StructField,这个 StructField 当中可以去接收一些首先列名,给出一个 name 列。
然后,是 string 类型的,就给出 StringType,然后,第三个它不可以为空,nullable = false,然后可以去拼接一个 StructField,后面跟上第二列是 age 列,age 列有了列名以后它是 IntegerType 的类型,接下来它不可以为空,然后再去拼接下一个 StructField,然后其中可以给出第三列的列名叫做 gpa,原来那个数据集其中有一项是平均成绩,就是一个 double 类型的 doubleType,肯定还是不可以为空,然后就可以再去拼接一个 Nil,在这个 StructType 内部就拼接出来了一个 list,这就是创建方式。Keys 比较简单,要指定出来有几列的数据作为主键,在这个里面可以只给一列,把 name 当做主键,然后 options 当中需要去指定几个参数,第一个参数,new 出来一个 createTableOptions,接下来设置 set。
可以使用 RangePartitionColumns 这样的方式来进行分区,可以指定按照哪一列来进行分区,它要求接收的是一个 list,但是这个 list 与scala 当中的 list 不是同一个 list,所以,要把 scala 当中的 list 转为 java 当中的 list,这一步要去导入一个隐式转换,引入scala 中的 collection ,然后引入collection 当中的 JavaConverters._,在这个 list 上可以调用 as.java 来去转为 java 的 list,再接下来,就可以设置 NumReplicas,就是这个复制因子设置为1,因为现在根本没有3个 tablet server,接下来 schema 传进来,然后下一项 keys 也传进来,再下一项,options 也传进来,这个时候 kudu 的表就创建出来了。
接下来就一起去运行一下这段代码
运行完毕以后,没有发现任何问题,也没有任何报错信息,说明这个表已经创建出来了。
可以打开浏览器,在浏览器地址栏里面输入101,端口是8051,打开 master 这样一个界面,选择 Tables,就能发现students 这张表已经创建出来了。