开发者学堂课程【2020版大数据实战项目之 DMP 广告系统(第五阶段):IP 转换_数据落地】学习笔记,与课程紧密联系,让用户快速学习知识。
课程地址:https://developer.aliyun.com/learning/course/680/detail/11813
IP 转换_数据落地
数据落地
1、回顾整体思路。第一步,创建 SparkSession,在 SparkSession 当中 locadConfig,locadConfig 是在 SparkConfigHelper 当中编写的一个帮助方法。第二步,读取数据集,读取数据集就是读取本地的一个 pmt.json。第三步,对数据进行相应的操作,数据操作可能会有很多,所以创建了一个 processor 工具,这个 processor 接收一个 source,这个 source 就是原始的数据集,source 当中有一列叫做 IP,要把 IP 取出来,扩展数据集,把数据集当中增加四列,四列信息是省市和经纬度。然后就生成了一个数据集,这个数据集当中包含了 location 信息。这个时候,IPProcessor 需要做的事情就是添加这几列数据。这样就需要把dsWithLocation 落地到 Kudo 当中。前面写了相应的工具,可以直接进行落地,写了 kudohelper,spark.createkudo,大家会发现没有这个方法,那该怎么办?应该导入隐式转换,
import cn. itcast. utils.SparkConfigHelper._
import cn. itcast. utils.KuduHelper._
把 kudohelper 当中的所有方法导入,导入以后 createKudoTable,这个时候就有方法了。但是 createKudoTable 当中需要三个参数,第一个参数叫做 tablename,第二个参数叫 schema,第三个参数叫 List。
必须有这三个参数,把这三个参数写下来。
def main(args:Array[String]):Unit={
import cn. itcast. utils.SparkConfigHelper.
import cn. itcast. utils.KuduHelper.
//1.创建 SparkSession
val spark=SparkSession. builder()
.appName(name="pmt json et1")
. master(master="local[6]")
.loadConfig()
·get0rCreate()
//2.读取数据集
val source=spark. read. json(path="dataset/pmt. json ")
//3.数据操作
//对于不同的数据处理操作来说,需要把操作在这个位置调用
//processor. process
//source的意思是刚读取的数据
// 经过 IPProcessor 的处理,拿到一个包含了省市,和经纬度的数据
集
val dsWithLocation= IPProcessor. process(source)
//4.数据落地
spark.createKuduTable()
}
2、一般情况下,像 tablename 这种常量的东西叫做 TARGET,TARGEY 是目标的意思,这样会规范很多。考虑第二个问题,要创建一个 schema 信息。第三个 keys,用于分区的组件。接下来就可以把这三个参数传进去,然后就可以进行数据落地了。数据落地要使用 dsWithLocation.saveToKudo,但是没有这个方法,一会儿去补全它。这个原因主要是因为 kudohelper, kudohelper 当中的隐式转换,就是把 dataset 转化为 kudohelper 的隐式转换,限定的接收是 Any。如果想让所有的 dataset 转化为 KudoHelper,就不能有 Any 这个限定。所以要给一个通配符,就是下划线,这个地方改过之后点击 KudoHelper,
Private var dataset: Dataset[Any] = _
Def this(dataset: Dataset[Any]) = {
以上位置中的 any 都改成下划线,这样就没有问题了。再回到 etlrunner当中,saveToKudo 就出现了。这个时候,数据落地就已经完成了,但是这三个参数要怎么办?
Private val TARGET_TABLE_NAME = “”
Private val schema = _
Private val keys = _
3、第一个,什么叫做 TARGET_TABLE_NAME?ODS 表的表名该怎样命名?这个 etl 应该每天执行一次,表名为了表示可以保留历史数据,会按照时间进行手动的分区,也就是说表名上要带上当前日期。这样的话就要有一个前缀,前缀暂定为 ODS,后面要加上日期。打开 utils,找到 KudoHelper,KudoHelper 里面有 object 构造。可以在这直接提供一个公共的方法,这个方法就是获取到一个格式化的时间字符串,可以把它命名为 formattedDate,他什么都不用接收,但需要返回给一个字符串。这个方法当中,要获取当前的时间,把当前的时间转化为字符串。正常情况下,可以使用 SimpleDateFormat 这样一个工具进行转换,但是这个工具不是特别好用,其次,它的线程是不安全的。所以可以使用另外一个,叫做 FastDataFormat,FastDataFormat 当中要传入一些内容,getInstance,getInstance 可以传入模式,比如说 yyyMMdd。接下来可以 format 一个时间,这个时间可以直接 new date,或者也可以使用 datalay 当中的一个静态,util data,然后有一个叫 now 的函数,在普通的 data 里面是没有的,但是没关系,直接 new date 对象,这就是当前的时间。现在把时间按照当前的模式,格式化成了一个字符串,并且返回给外部。
object KuduHelper {
//一:隐式转换,SparkSession->KuduHelper DotaFrane->KuduHelper
/**
*设计隐式转换的时候,只需要考虑一件事,把 XX 转为 YY
*XX 是 SparkSession,转换函数的传入参数就是 SparkSession
*yy 是 KuduHeLper ,转换函数的结果类型应该就是 KuduHelper
*/
implicit def sparkSessionToKuduHelper(spark:SparkSession): KuduHelper = {
new KuduHelper(spark)
}
implicit def dataFrameToKudu(databaset:Dataset[ ]):Kudu Helper={
new KuduHelper(dataset)
}
def formattedDate(): String= {
FastDateFormat,getInstance(pattern="yyyMMdd"), format(new Date())
}
}
4、进入到 ETLRunner 当中,加上 kudohelper 里面的
formattedData,这个时候表名就创建出来了。
Private val TARGET_TABLE_NAME = “ODS”+KuduHelper.formattedDate()
表名创建出来是一件比较容易的事情,就是 ods 加上当前日期的格式化。
5、下一步应该创建 schemer 信息,由于 schema 信息太多,所以直接进入到写好的代码当中,进行复制,scheme 复制好以后。复制好以后把 scheme 导入,这个 scheme 就有了。需要做的事情就是导入对应的包,先导入 schema,这个 schema 要导入 kudo 的 schema,然后再导入 ColumnSchemaBuilder,Type 在导入的时候要找 kudo 的 type,要使用 kudo 的 type 才行,否则导入的内容是错误的。这个时候红线就褪去了,整个 schema 就给出来了。但是最后有 asjava,asjava 要导入 scala 的 convents。
整体上的结构是,最外层是一个 scheme,schema 里面接收一个 List,List 里面每一个条目都是 ColumnSchemaBuilder,ColumnSchemaBuilder 当中第一项是这一列的 name,第二项是这一列的类型,第三项 nullable,是否为为空,实际上是非空约束,第四项 key,说的是这一列是不是主件,builde 是为了获取ColumnSchem。
这些 ColumnSchem 组成了 List,List 最终还要转换为 Java 类型。
6、所以要导入 scala 下的 collection 下的 JavaConverters._,把所有的隐式转换导入,这时候会发现 asJava错误消失,schema 信息到此就配置完成了。scheme 配置完成以后要传入 createKudoTable,数据集在进行 saveToKudo 的时候要 save 到这样一个 schema 表当中,
如果 dsWithLocation 数据集和 schema 当中的信息不匹配会报错,存不进去信息,所以要确保 dsWithLocation 的 schema 和 这张表的 schema 是一模一样的。所以要筛选 dsWithLocation,使用 dsWithLocation,进行 select,这个 select 就是取得所有的列,把所有的列在 dsWithLocation 数据集当中筛选出来,一定要做到和图片中的 schemer 一模一样。直接复制过去,
这就是所有要进行筛选的列,把这些列放到 val selectRows: Seq【Column】= Seq( 位置后,Import sparksq. Column,这种写法要用到隐式转换,这样的话就要把 spark 的隐式转换导入,spark seq 当中的 implicits,有了 implicits 隐式转换才能使用表的形式选择列。
import spark. Implicits._
继续往下看,就可以把 selectRows 传入,通过:_* 把 seq 展开。因为 select 的当中接收的是一个类型后面带一个星号,指一个可变参数。所以不能直接传一个数据集进来,要把这个集合打开。在 spark当中通过:_*把它转换成一个可变参数的形式,传到 select 当中,这个时候就获取到了result。用 result 进行相应的 saveToKudo,这样整份代码就完成,
val dsWithLocation= IPProcessor. process(source)
val selectRows: Sea[Column]=Seq(
'sessionid,'advertisersid,'adorderid,'adcreativeid,'adplatformproviderid,'sdkversion,'adplatformkey,'putinmodeltype,'requestmode,'adprice,'adppprice,'requestdate,'ip,'appid,'appname,'uuid,'device,'client,'obversion,'density,'pw,'ph,'longitude,'latitude,'region,'city,'ispid,'ispname,'networkmannerid,'networkmannername,'iseffective,'isbilling,'adspacetype,'adspacetype,'devicetype,'processnode,'apptype,'district,'paymode,'isbid,'bidprice,'winprice.'iswin,'cur,'rate,'cnywinprice,'imei,'mac,'idfa,'openudid,'androidid,'rtbprovince,'rtbcity,'rtbdistrict,'rtbstreet,'storeurl,'realip,'isqualityapp,'bidfloor,'aw,'ah,'imeimd5,'macmd5,'idfamd5,''onudidmd5,'androididmd5,'imeishal,'macshal,'idfashal,'openudidshal,'androididshal,'uvidunknow,'userid,'reqdate,'reqhour,'iptype,'initbidprice,'adpayment,'agentrate,'1omarkrate/'adxrate,'title,'keywords,'tagid,'callbackdate,'channelid,'mediatype,'email,
'tel,'age,'sex
)
val result=dsWithLocation. select(selectRows: ")
//4.数据落地
spark.createKuduTable()
result.saveToKudu()
但是还差一样东西,就是 keys。
7、下面把 keys 完成,Seq(“uuid”),使用 uuid 作为分区的哈什件。看一下 structure,其实整个就是一个 main 方法,main 方法后面有一个 TARGET_TABLE_NAME,再有一个 schema 和 keys,但是 schema 太长了,大家可能会有一些不方便看清楚。把信息都设置给 createKudoTable,第一个 TARGET_TABLE_NAME,第二个 schema 信息,第三个 keys,这样表就创建出来了,表创建出来以后就可以把数据保存到表当中。这个地方报错肯定是类型的问题,他要求的是 list,但我们给的是 seq。
在 structure 当中找到 keys,然后把 seq 修改为 list。整个的代码就完成了,回到 main 方法处,运行代码。
private val TARGET TABLE NAME="ODS "+KuduHelper. formatted Date()
import scala. collection.JavaConverters.
private val schema=new Schema(List(
Private val keys = Seq(“uuid”)
8、已经运行成功了,并且没有任何问题。
但是要跟大家提一下,刚才做了两处修改。第一处修改是 mode ,
.mode(SaveMode.Append)
刚才指定了在 KudoHelper 当中进行数据追加使用 saveToKudo 的时候,忘记指定 mode,mode 只支持 Append。还有一个小修改,在 IPProcessor 当中,默认的 longitude,lotitude,默认是 fluck 类型,然后 toDouble,就做了这两处修改。这个时候数据已经添加进 kudo 了,打开浏览器进行查看,ODS_190709(6a74ba7db1364fb6bf2c7ab47668e832)是刚才创建的一个表,这个表当中就是创建的所有列。
往下会发现 impala CREATE TABLE statement,直接拷贝这个字符串,
CREATE EXTERNAL TRBLEMC ‘ODS_190709’ STORED KUDU ]
TBLPROPERTIES(
‘kudu. Table_name'='ODS_190709',
'kudu. master _addresses’=’ cdh01:7051,cdh02:7051,
cdh03:7051')
然后进入到 sell 窗口当中,进入到 impala sell 当中,直接输入这个字符串,就可以在 impala 当中创建一个表来进行相应的查看。进入到 sell 当中,使用 impala sell,连上以后,创建一个新的 create database dmp,接下来把 seq 拷过来,粘贴过来以后不要忘记加一个分号,回车,表已经创建成功了。
复制一下 select * from ODS_190709,运行的时候不要忘记加 limit,只看20条。速度会稍微有点慢,但是数据已经打印出来了。
觉得格式有问题,可以使用 hue 进行相应的查看。进入到 cdh01 当中,端口是8888。
在 hue 当中,点击 query,找到 editor,找到 impala,然后选择一个数据库,数据库要选择 dmp 的数据库,选完数据库以后 select * from ODS_190709 limit 20,按 Ctrl 进行相应的查看,它显示没有表叫做 ods_190709。
把 dmp 改为 default 这个问题就能解决了,进行查看,数据已经展示出来了,并且可以横向滑动,所以整个的效果比在 impala sell 当中进行打印的效果好。列实在是太多了,在一个窗口没有办法完全展示完。到此为止,数据查询和数据处理都已经介绍完毕。大家在进行数据落地的时候,一定不要忘记 schema 和 kudo 表的 schema 要和本身数据集的 schema 相对应,如果出现数据类型不对应或者数据名字不对应,都会影响保存。