IP 转换_数据落地 | 学习笔记

简介: 快速学习 IP 转换_数据落地

开发者学堂课程【2020版大数据实战项目之 DMP 广告系统(第五阶段)IP 转换_数据落地】学习笔记,与课程紧密联系,让用户快速学习知识。

课程地址:https://developer.aliyun.com/learning/course/680/detail/11813


IP 转换_数据落地

 

数据落地

 

1、回顾整体思路。第一步,创建 SparkSession,在 SparkSession 当中 locadConfig,locadConfig 是在 SparkConfigHelper 当中编写的一个帮助方法。第二步,读取数据集,读取数据集就是读取本地的一个 pmt.json。第三步,对数据进行相应的操作,数据操作可能会有很多,所以创建了一个 processor 工具,这个 processor 接收一个 source,这个 source 就是原始的数据集,source 当中有一列叫做 IP,要把 IP 取出来,扩展数据集,把数据集当中增加四列,四列信息是省市和经纬度。然后就生成了一个数据集,这个数据集当中包含了 location 信息。这个时候,IPProcessor 需要做的事情就是添加这几列数据。这样就需要把dsWithLocation 落地到 Kudo 当中。前面写了相应的工具,可以直接进行落地,写了 kudohelper,spark.createkudo,大家会发现没有这个方法,那该怎么办?应该导入隐式转换,

import cn. itcast. utils.SparkConfigHelper._

import cn. itcast. utils.KuduHelper._

把 kudohelper 当中的所有方法导入,导入以后 createKudoTable,这个时候就有方法了。但是 createKudoTable 当中需要三个参数,第一个参数叫做 tablename,第二个参数叫 schema,第三个参数叫 List。

必须有这三个参数,把这三个参数写下来。

def main(args:Array[String]):Unit={

import cn. itcast. utils.SparkConfigHelper.

import cn. itcast. utils.KuduHelper.

//1.创建 SparkSession

val spark=SparkSession. builder()

.appName(name="pmt json et1")

. master(master="local[6]")

.loadConfig()

·get0rCreate()

//2.读取数据集

val source=spark. read. json(path="dataset/pmt. json ")

//3.数据操作

//对于不同的数据处理操作来说,需要把操作在这个位置调用

//processor. process

//source的意思是刚读取的数据

// 经过 IPProcessor 的处理,拿到一个包含了省市,和经纬度的数据

val dsWithLocation= IPProcessor. process(source)

//4.数据落地

spark.createKuduTable()

}

2、一般情况下,像 tablename 这种常量的东西叫做 TARGET,TARGEY 是目标的意思,这样会规范很多。考虑第二个问题,要创建一个 schema 信息。第三个 keys,用于分区的组件。接下来就可以把这三个参数传进去,然后就可以进行数据落地了。数据落地要使用 dsWithLocation.saveToKudo,但是没有这个方法,一会儿去补全它。这个原因主要是因为 kudohelper, kudohelper 当中的隐式转换,就是把 dataset 转化为 kudohelper 的隐式转换,限定的接收是 Any。如果想让所有的 dataset 转化为 KudoHelper,就不能有 Any 这个限定。所以要给一个通配符,就是下划线,这个地方改过之后点击  KudoHelper,

Private var dataset: Dataset[Any] = _

Def this(dataset: Dataset[Any]) = {

以上位置中的 any 都改成下划线,这样就没有问题了。再回到 etlrunner当中,saveToKudo 就出现了。这个时候,数据落地就已经完成了,但是这三个参数要怎么办?

Private val TARGET_TABLE_NAME = “”

Private val schema = _

Private val keys = _

3、第一个,什么叫做 TARGET_TABLE_NAME?ODS 表的表名该怎样命名?这个 etl 应该每天执行一次,表名为了表示可以保留历史数据,会按照时间进行手动的分区,也就是说表名上要带上当前日期。这样的话就要有一个前缀,前缀暂定为 ODS,后面要加上日期。打开 utils,找到 KudoHelper,KudoHelper 里面有 object 构造。可以在这直接提供一个公共的方法,这个方法就是获取到一个格式化的时间字符串,可以把它命名为 formattedDate,他什么都不用接收,但需要返回给一个字符串。这个方法当中,要获取当前的时间,把当前的时间转化为字符串。正常情况下,可以使用 SimpleDateFormat 这样一个工具进行转换,但是这个工具不是特别好用,其次,它的线程是不安全的。所以可以使用另外一个,叫做 FastDataFormat,FastDataFormat 当中要传入一些内容,getInstance,getInstance 可以传入模式,比如说 yyyMMdd。接下来可以 format 一个时间,这个时间可以直接 new date,或者也可以使用 datalay 当中的一个静态,util data,然后有一个叫 now 的函数,在普通的 data 里面是没有的,但是没关系,直接 new date 对象,这就是当前的时间。现在把时间按照当前的模式,格式化成了一个字符串,并且返回给外部。

object KuduHelper {

//一:隐式转换,SparkSession->KuduHelper DotaFrane->KuduHelper

/**

*设计隐式转换的时候,只需要考虑一件事,把 XX 转为 YY

*XX 是 SparkSession,转换函数的传入参数就是 SparkSession

*yy 是 KuduHeLper ,转换函数的结果类型应该就是 KuduHelper

*/

implicit def sparkSessionToKuduHelper(spark:SparkSession): KuduHelper = {

new KuduHelper(spark)

}

implicit def dataFrameToKudu(databaset:Dataset[ ]):Kudu Helper={

new KuduHelper(dataset)

}

def formattedDate(): String= {

FastDateFormat,getInstance(pattern="yyyMMdd"), format(new Date())

}

}

4、进入到 ETLRunner 当中,加上 kudohelper 里面的

formattedData,这个时候表名就创建出来了。

Private val TARGET_TABLE_NAME = “ODS”+KuduHelper.formattedDate()

表名创建出来是一件比较容易的事情,就是 ods 加上当前日期的格式化。

5、下一步应该创建 schemer 信息,由于 schema 信息太多,所以直接进入到写好的代码当中,进行复制,scheme 复制好以后。复制好以后把 scheme 导入,这个 scheme 就有了。需要做的事情就是导入对应的包,先导入 schema,这个 schema 要导入 kudo 的 schema,然后再导入 ColumnSchemaBuilder,Type 在导入的时候要找 kudo 的 type,要使用 kudo 的 type 才行,否则导入的内容是错误的。这个时候红线就褪去了,整个 schema 就给出来了。但是最后有 asjava,asjava 要导入 scala 的 convents。

image.png

整体上的结构是,最外层是一个 scheme,schema 里面接收一个 List,List 里面每一个条目都是 ColumnSchemaBuilder,ColumnSchemaBuilder 当中第一项是这一列的 name,第二项是这一列的类型,第三项 nullable,是否为为空,实际上是非空约束,第四项 key,说的是这一列是不是主件,builde 是为了获取ColumnSchem。

 image.png

这些 ColumnSchem 组成了 List,List 最终还要转换为 Java 类型。

6、所以要导入 scala 下的 collection 下的 JavaConverters._,把所有的隐式转换导入,这时候会发现 asJava错误消失,schema 信息到此就配置完成了。scheme 配置完成以后要传入 createKudoTable,数据集在进行 saveToKudo 的时候要 save 到这样一个 schema 表当中,

如果 dsWithLocation 数据集和 schema 当中的信息不匹配会报错,存不进去信息,所以要确保 dsWithLocation 的 schema 和  这张表的 schema 是一模一样的。所以要筛选 dsWithLocation,使用 dsWithLocation,进行 select,这个 select 就是取得所有的列,把所有的列在 dsWithLocation 数据集当中筛选出来,一定要做到和图片中的 schemer 一模一样。直接复制过去,

image.png

这就是所有要进行筛选的列,把这些列放到 val selectRows: Seq【Column】= Seq( 位置后,Import sparksq. Column,这种写法要用到隐式转换,这样的话就要把 spark 的隐式转换导入,spark seq 当中的 implicits,有了 implicits 隐式转换才能使用表的形式选择列。

import  spark. Implicits._

继续往下看,就可以把 selectRows 传入,通过:_* 把 seq 展开。因为 select 的当中接收的是一个类型后面带一个星号,指一个可变参数。所以不能直接传一个数据集进来,要把这个集合打开。在 spark当中通过:_*把它转换成一个可变参数的形式,传到 select 当中,这个时候就获取到了result。用 result 进行相应的 saveToKudo,这样整份代码就完成,

val dsWithLocation= IPProcessor. process(source)

val selectRows: Sea[Column]=Seq(

'sessionid,'advertisersid,'adorderid,'adcreativeid,'adplatformproviderid,'sdkversion,'adplatformkey,'putinmodeltype,'requestmode,'adprice,'adppprice,'requestdate,'ip,'appid,'appname,'uuid,'device,'client,'obversion,'density,'pw,'ph,'longitude,'latitude,'region,'city,'ispid,'ispname,'networkmannerid,'networkmannername,'iseffective,'isbilling,'adspacetype,'adspacetype,'devicetype,'processnode,'apptype,'district,'paymode,'isbid,'bidprice,'winprice.'iswin,'cur,'rate,'cnywinprice,'imei,'mac,'idfa,'openudid,'androidid,'rtbprovince,'rtbcity,'rtbdistrict,'rtbstreet,'storeurl,'realip,'isqualityapp,'bidfloor,'aw,'ah,'imeimd5,'macmd5,'idfamd5,''onudidmd5,'androididmd5,'imeishal,'macshal,'idfashal,'openudidshal,'androididshal,'uvidunknow,'userid,'reqdate,'reqhour,'iptype,'initbidprice,'adpayment,'agentrate,'1omarkrate/'adxrate,'title,'keywords,'tagid,'callbackdate,'channelid,'mediatype,'email,

'tel,'age,'sex

)

val result=dsWithLocation. select(selectRows: ")

//4.数据落地

spark.createKuduTable()

result.saveToKudu()

但是还差一样东西,就是 keys。

7、下面把 keys 完成,Seq(“uuid”),使用 uuid 作为分区的哈什件。看一下 structure,其实整个就是一个 main 方法,main 方法后面有一个 TARGET_TABLE_NAME,再有一个 schema 和 keys,但是 schema 太长了,大家可能会有一些不方便看清楚。把信息都设置给 createKudoTable,第一个 TARGET_TABLE_NAME,第二个 schema 信息,第三个 keys,这样表就创建出来了,表创建出来以后就可以把数据保存到表当中。这个地方报错肯定是类型的问题,他要求的是 list,但我们给的是 seq。

image.png

在 structure 当中找到 keys,然后把 seq 修改为 list。整个的代码就完成了,回到 main 方法处,运行代码。

private val TARGET TABLE NAME="ODS "+KuduHelper. formatted Date()

import scala. collection.JavaConverters.

private val schema=new Schema(List(

image.png

Private val keys = Seq(“uuid”)

8、已经运行成功了,并且没有任何问题。

但是要跟大家提一下,刚才做了两处修改。第一处修改是 mode ,

.mode(SaveMode.Append)

刚才指定了在 KudoHelper 当中进行数据追加使用 saveToKudo 的时候,忘记指定 mode,mode 只支持 Append。还有一个小修改,在 IPProcessor 当中,默认的 longitude,lotitude,默认是 fluck 类型,然后 toDouble,就做了这两处修改。这个时候数据已经添加进 kudo 了,打开浏览器进行查看,ODS_190709(6a74ba7db1364fb6bf2c7ab47668e832)是刚才创建的一个表,这个表当中就是创建的所有列。

image.png

往下会发现 impala CREATE TABLE statement,直接拷贝这个字符串,

CREATE EXTERNAL TRBLEMC ‘ODS_190709’  STORED KUDU ]

TBLPROPERTIES(

‘kudu. Table_name'='ODS_190709',

'kudu. master _addresses’=’ cdh01:7051,cdh02:7051,

cdh03:7051')

然后进入到 sell 窗口当中,进入到 impala sell 当中,直接输入这个字符串,就可以在 impala 当中创建一个表来进行相应的查看。进入到 sell 当中,使用 impala sell,连上以后,创建一个新的 create database dmp,接下来把 seq 拷过来,粘贴过来以后不要忘记加一个分号,回车,表已经创建成功了。

image.png

复制一下 select * from ODS_190709,运行的时候不要忘记加 limit,只看20条。速度会稍微有点慢,但是数据已经打印出来了。

image.png

觉得格式有问题,可以使用 hue 进行相应的查看。进入到 cdh01 当中,端口是8888。

image.png

在 hue 当中,点击 query,找到 editor,找到 impala,然后选择一个数据库,数据库要选择 dmp 的数据库,选完数据库以后 select * from ODS_190709 limit 20,按 Ctrl 进行相应的查看,它显示没有表叫做 ods_190709。

image.png

把 dmp 改为 default 这个问题就能解决了,进行查看,数据已经展示出来了,并且可以横向滑动,所以整个的效果比在 impala sell 当中进行打印的效果好。列实在是太多了,在一个窗口没有办法完全展示完。到此为止,数据查询和数据处理都已经介绍完毕。大家在进行数据落地的时候,一定不要忘记 schema 和 kudo 表的 schema 要和本身数据集的 schema 相对应,如果出现数据类型不对应或者数据名字不对应,都会影响保存。

相关文章
|
1月前
|
数据处理 数据安全/隐私保护
智能推荐映射关系,加速数据标准落地进程
在V4.0版本中,Dataphin推出了智能推荐映射关系功能,用户可以基于内置特征或创建自定义特征,对数据内容进行表示,并将其与数据标准关联,进而智能映射映射关系,尤其在字段分布广泛和命名多变的情况下,可以提高映射的准确性和效率,加速了数据标准实施。
240 0
|
1月前
|
安全 Devops API
F5是什么意思?从BIG-IP Next核心价值了解F5
F5是什么意思?从BIG-IP Next核心价值了解F5
12 0
F5是什么意思?从BIG-IP Next核心价值了解F5
|
2月前
|
运维 监控 持续交付
服务器基础知识(IP地址与自动化技术的使用)
服务器基础知识(IP地址与自动化技术的使用)
24 0
|
4月前
|
大数据 Java 分布式数据库
使用记忆法打造你的大数据组件的默认端口号记忆宫殿
使用记忆法打造你的大数据组件的默认端口号记忆宫殿
30 0
|
12月前
|
存储 SQL 人工智能
如何解决IoT数据的存储计算的实践案例
物联网数据往往来自于智能设备或者传感器设备,这些设备只要运行中,将持续不断产生设备数据。同时这些数据的使用价值根据场景千差万别,其中时效性对不同场景有很大影响,如何解决好存储和计算是物联网数字化场景的关键问题。
|
存储
《深入解析数据存储技术原理及发展演进—数据存储技术概述》电子版地址
深入解析数据存储技术原理及发展演进—数据存储技术概述
54 0
《深入解析数据存储技术原理及发展演进—数据存储技术概述》电子版地址
|
JSON 分布式计算 数据处理
IP 转换_环境准备 | 学习笔记
快速学习 IP 转换_环境准备
50 0
IP 转换_环境准备 | 学习笔记
|
OLAP
《分析型数据库标准发展与行业观察》电子版地址
分析型数据库标准发展与行业观察
52 0
《分析型数据库标准发展与行业观察》电子版地址
|
数据中心
《测试数据中心-互联网模式下新型的数据准备引擎》电子版地址
测试数据中心-互联网模式下新型的数据准备引擎
66 0
《测试数据中心-互联网模式下新型的数据准备引擎》电子版地址