开发者学堂课程【2020版大数据实战项目之 DMP 广告系统(第五阶段): IP 转换_思路梳理】学习笔记,与课程紧密联系,让用户快速学习知识。
课程地址:https://developer.aliyun.com/learning/course/680/detail/11811
IP 转换_思路梳理
思路梳理
1、具体的 IP 转换的工作,IP转换的工作把它放在一个新的processor 类下。先创建一个文件,文件名称叫做 IPProcessor,创建好这样一个文件之后,把 class 改为 object。
Package cn.itcast.et1
class IPProcessor
有一个 trait 叫possessor,可以让IPProcessor extends Processor。这个时候就可以添加代重写的一个方法,叫做 process。
object IPProcessor extends Processor{
override def process():Unit=???
但是要注意,IPProcessor 里面应该接收什么内容?应该返回什么东西?既然要对整个数据集进行 ET L,可能会对数据集作一些操作,比如说删掉某些列,或者增加某些列,或者是删掉某些数据,或增加某些数据。最方便的是把 datafree 传下来,进入 processor 的 trait 当中,process 方法里面应该接收一个 dataset,对应的 dataset 应该是 row,把 row 导入,返回的时候返回什么不太确定。现在对 dataset 进行处理,处理完成以后,应该给一个完整的数据集回去,应该在 dataset 里面还放上 row,因为 row 是一个通用的对象,这个时候 processor 的类就设计好了。
import org. apache. spark. sql.{Dataset, Row}
trait Processor{
def process(databaset:Datasest:Datasest[Row]):Dataset[Row]
}
2、回到 IPProcessor 当中,把 process 方法去掉,重新 inplemention,就能得到完整的 process 方法,把这个方法补全。在 process 类当中应该做的事情,如果不明确要做什么事情,不妨先调用一下。进入到 etlrunner 当中,
ETLRunner 当中第三步应该拿到 IPProcessor.process,这里面应该传入一个数据集,把 show 去掉,这个数据集就是 source。source 代表的意思是刚读取出来的数据,然后经过 process 的处理,应该拿到一个包含了省市和经纬度的数据集,然后就可以拿到 result ip,或者是 dswith 这样一个数据集。
//2.读取数据集
val source=spark. read. json(path="dataset/pm. json ")
//3.数据操作
// 对于不同的数据处理操作来说,需要把操作在这个位置调用
//processor. process
//source的意思是刚读取的数据
// 经过 IPProcessor的处理,拿到一个包含了省市,和经纬度的数据集
val dsWi|thIP= IPProcessor. process(source)
3、在 processor 当中要做什么事情就比较明确了。首先,把一个没有省市信息和经纬度的数据集加入这些信息,再返回。这样,就需要在 process 当中具体的做这些事。
原始数据集:c1,c2,c3,…,c4,c5,ip
这些是什么列并不关心 ,关心的是它里面必须要有的列叫做 ip,要 有 IP 才能够进行处理。转换过的数据集,原来的数据都要有,但是要在后面加上 region,city,longitude,Latiude。
转换过的数据集:c1,c2,c3,…,c4,c5,ip,region,city,longitude,Latiude。
region 是省,city 是市,longitude,Latiude 是经纬度信息。就是等于在后面追加四个列。已经明确了要做的事情,再回顾一下步骤。首先,在 ETLRunner 当中调用 IPProcessor,IPProcessor 指的是 IPProcessor .scala。下面就可以调用 IPProcessor 当中的 process 方法,调用完以后就拿到了dswith IP,这里面包含了 IP 信息的数据集。在 IPProcessor 当中怎样做到这样一个转换?
/**
* 把一个没有省市信息和经纬度的数据集,加入这些信息,再返回
*/
object IPProcessor extends Processor{
override def process(datet:Dataset[Row]):Dataset[Row]={
//原始数据集:c1,c2,c3,……,c4,c5, ip
//转换过的数据集:c1,c2,c3,…,c4,c3, ip, region, city, longitude, latitude
}
}
4、如何实现转换?
方法一:可以把完整的数据集只生成只有 IP 的数据集,进行过滤或转换,把 IP 对象转换为四个属性( ip, region, city, longitude, latitude),接下来生成 dataset,和原始数据集按照 IP 列进行 Join。
方法二:直接向原始数据集当中插入这四列。有一个方法叫 withColumn,withColumn 是建立一个新的列,这个新的列要使用横竖去生成,所以需要编写 UDF,编写 UDF 生成新的列,在 spark dataset 章节进行过介绍。
原始的数据集当中有这么多列,他其实是有 schema 信息的,scheam 信息中只有这几个列。
//原始数据集:c1,c2,c3,...,c4,c5,ip -> schema
如果直接往里面添加四个新的列,其实就是在原始数据集 row 当中添加四条数据,这个时候,原始的 schema 会跟着改变。
方法三是生产新的数据,修改原始的 schema。此处采用方法三,因为修改 schema 信息,也是一个比较常见的操作。
//如何实现转换?
//方法一:生成只有IP的数据集,转换,把IP对象转换为( ip, region, city, Longitude, Latitude)
//接下来,生成 Dataset,和原始数据集按照 IP 列对来进行 Join
//方法二:能否直接插入四列?withCotumn,编写 UDE
//方法三:生成新的数据,然后修改原始的 schema
//此处采用方法三,因为修改 scheme 信息,也是一个比较常见的操作