开发者学堂课程【2020版大数据实战项目之 DMP 广告系统(第五阶段):IP 转换_功能实现】学习笔记,与课程紧密联系,让用户快速学习知识。
课程地址:https://developer.aliyun.com/learning/course/680/detail/11812
IP 转换_功能实现
功能实现
1、思路已经梳理过了,下面直接编写代码。首先,拿到原始的数据集,原始的数据集里有 row 对象。第一步是把 dataset 转化为一个新的数据集,添加 region,city,longitude,Latiude 这四列数据。这四列数据添加以后,第二步是生成新的schema。第三步是使用数据( RDD ),加上新的 Schema,生成新的 Dataset,这就是大致的三个步骤。
//1.把 databaset 转为新的一个数据集,添加数据, region, city, longitude, latitude
//2.生成新的 Schema
//3.使用数据( RDD ),加上新的 Schema ,生成新的 Dataset
2、下面来做第一步,把 dataset 转为一个新的数据集。要知道一件事情,要生成 region,city,longitude,Latiude 这四列数据,需要使用到 ip2region 和 GeoLite。打开 test 类,找到 IPTest,对于 ip2region 需要生成 DbSearcher,对于 ip2Location 需要生成 LookupService。第一个 ip2region,在 btreeSearch 的时候出现了 btree,也就是在内存里面生成了一个新的数据集,这个数据结构是btree 的数据结构,所以在加载这个配置文件,加载这个数据库文件的时候,是把数据库文件加载到内存当中了。
下面来看 ip2Location ,LookupService 明确指定了数据缓存在什么地方,这样就意味着 LookupService 当中也会进行数据的缓存。如果是这样的,在IPProcessor 当中使用 Map 还是 MapPartitions,应该使用 MapPartitions 。如果在每一个 Map 当中都创建 LookupService 和 DbSearcher,整个效率就会很低,因为创建了很多额外的结构。所以把 dataset 转为一个新的数据集,添加数据,不能在 dataset row 里直接添加,需要生成 rdd,dataset.rdd 直接生成,里面还是 row,拿到 rdd 后 mapParttitions,mapParttitions 接收一个函数,这个函数直接创建一个方法来代替。
3、方法作为 IP,convertIP,convertIP 应该接收什么参数?把 convertIP 放在 mapParttitions 的括号里。
这个时候在报错,参数不对,参数应该是 iterator 类型,iterator 里面应该放的是 Row,rdd 也是 row 类型,在 mapParttitions 的时候会把一个分区的数据全部返回,他返回的就是你想返回的东西。首先必须是 Iterator,其次,它里面是什么类型 dataset 就会被转为什么类型,可以把它设置为 Row,使用 row 这个对象进行处理,传进来是 row,传出去还是 row,这个时候就得到了一个 rdd,这个 rdd 就是 converted,converted 类型是 rdd,里面放置了 row。
4、ConvertIP 方法该怎样进行处理,这个方法第一步生成省市信息,第二步生成经纬度信息,这就是两大步骤。怎样生成省市信息?val dbsearcher = new Dbsearcher(),Dbsearcher 当中接收两个参数,第一个是 Dbconfig,第二个要指定数据集在哪?数据集在 dataset 下 一个叫 ip2region.db 里。接下来要生成经纬度的信息,创建 service,val lookupService = new LookupService,LookupService 当中接收两个参数,第一个是数据库文件,在 dataset 下叫 GeoLiteCity.dat。接下来还要指定 LookupService 缓存到哪里?把它缓存到 MEMORY,缓存到内存当中。Dbsearcher 和 lookupService 这两个东西都生成了,就可以进行具体的省市信息的生成。
def convertIP(iterator:Iterator[Row]):Iterator[Row]={
//1.生成省市信息
val dbsearcher=new DbSearcher(new Dbconfig(), dbFile="dataset/ip2region. db")
//2、生成经纬度信息
val lookupService=new Lookupservice(
databaseFile= "dataset/GeoLiteCity. dat",
LookupService.GEOIP_MEMORY_CACHE
)
5、拿 iterator 里面的 row 进行处理,生成省市信息。iterator .map(),给进来一个 row,再返回一个 row。第一步还是要先拿到 IP,在 row 中 getAs 获取IP,返回的类型是 string 类型,拿到 IP 以后就可以使 dbsearcher 搜索 city。用 dbSearcher.btreeSearch 生成 region,把 ip 传进去,传进去以后就拿到了 region 信息。最终目的是要有省的信息,getregion 获取 region 信息,getregion 拿到的是字符串,前面有通道符进行分隔,前面也说到过取第几项第几项是做什么。其实 regionALL 按照通道符来进行切分,切分的时候不要忘了,如果想对通道符来进行切分,他在正则当中是有特殊含义的,所以要把它转一下意。转完意以后就可以取其中的第几项,第一项是中国,第二项是中国的代号,第三项是省,第四项是市,所以第三项的下标为2。val city=regionAll. split(regex="\\|")(3),region 和 city 已经获取到了,下面应该获取 longitude,获取 longitude 的十分简单,直接使用 lookupservice 找到相应的信息。location 可以通过 lookupservice.getLocation,getlocation 的操作要把 IP 传进去,把IP传进来以后就获取到 location对象了,location 对象当中有一个叫 longitude,还有一个叫 latitude,这个时候省市、经度、纬度都已经有了。下面要生成一个新的 row,做这个事情的本质就是把 region,city,longitude,Latiude 拼到 row 后面。
iterator. map(row = > {
val ip = row.getAs[string](fieldName="ip")
val regionAll= dbSearcher. btreesearch(ip).getRegion
val region=regionAll. split(regex="\\|")(2)
val city=regionAll. split(regex="\\|")(3)
val location=lookupservice.getLocation(ip)
val longtitude=location. longitude
val latitude=location. latitude
6、row 里面并没有特别多的地方需要修改。有一张表,有第一行,第二行,第三行,第一列,第二列,第三列,这就是 row 的类型。Row 对象就是第一个数据加上第二个数据,第三个数据,再加上第四个数据,如果有一种办法把 row 对象变成 seq,就把1234当成一个数组,把 region、city和 longitude、latitude,拼到第四列的后面,再返回生成一个新的 row,该怎样操作?
row.toSeq ,使用 toSeq 以后生成了一个 seq 对象,现在要把单条数据加到 seq 对象后,使用冒号加第一个数据 region,在使用冒号加第二个数据 city,在使用冒号加第三个数据 longitude,最后加第四个数据 latitude,这时候就生成了一个新的 seq。拿到 rowSeq 以后还是要转回一个 row 对象,现在已经把这四项数据拼到了原始 row 代表的 seq 对象。接下来就可以使用 row 去 fromseq,在通过 rowseq 生成一个新的 row 对象,这样 convertIP 就完成了。拿到 iterator row 对每一行数据添加了 region,city,longitude,Latiude。
def convertIP(iterator:Iterator[Row]):Iterator[Row]={
//1.生成省市信息
val dbsearcher=new DbSearcher(new Dbconfig(), dbFile="dataset/ip2region. db")
//2、生成经纬度信息
val lookupService=new Lookupservice(
databaseFile= "dataset/GeoLiteCity. dat",
LookupService.GEOIP_MEMORY_CACHE
)
iterator. map(row = > {
val ip = row.getAs[string](fieldName="ip")
val regionAll= dbSearcher. btreesearch(ip).getRegion
val region=regionAll. split(regex="\\|")(2)
val city=regionAll. split(regex="\\|")(3)
val location=lookupservice.getLocation(ip)
val longtitude=location. longitude
val latitude=location. latitude
val rowSeq=row. toseq:+region:+city:+longitude:+latitude
Row. fromSeq(rowSeq)
})
7、第一步已经完成了,下面要生成一个新的 schema 信息。为什么要生成一个新的 scheme 信息呢?再画一个简单的图,现在已经有 row 对象了,row 对象当中有第一个数据,第二个数据,第三个数据,第四个数据,但是并不知道这些数据叫什么名字,只是拿了一个 seq 生成了一个新的 row 对象。
在这一步生成的时候,并没有 schema 信息,所以不能直接判定 row 里面已经有 schema 了,要生成一个新的 scheme。生成新的 schema 其实就是生成四个列的 schema 信息。就是省市、经度、纬度这四个列的 schema 信息,把他生成出来添加到原来的 schema 后面。既然是添加到原来的 schema 后面,该怎样做?办法也非常简单,直接 dataset.schema,名称是 region,类型是 string,导入 stringType。注意不要导第一个,第一个是 avro 的类型,要导第二个 stringType。
继续添加一个新的 city,city 也是 string 类型,再接下来添加 longitude,longitude 是 double 类型,最后添加纬度,也是 double 类型。这四个新的列已经添加到 schema 上了,这样就得到了一个经过修改的 schema 对象。
//1、把databaset转为新的一个数据集,添加数据, region, city, longitude, latitude
//使用 Map 还是 MapPartitions
val converted:RDD[Row]=databaset. rdd.mapPartitions
(convert IP)
//2.生成新的 schema
val scheme=dataset. schema
. add(name="region",stringType)
. add(name="city",stringType)
. add(name="longitude",DoubleType)
. add(name="latitude",DoubleType)
//3.使用数据( RDD ),加上新的 Schema ,生成新的 Dataset
}
拿到这个对象以后,数据也有了,RDD 也有了,用来修饰 RDD 的 schema 也有了,这样就可以创建一个新的 dataset 返回了。直接使用 dataset.sparkSession 当中的 createDataFrame,传入 converted add 的数据,这样就生成了一个新的 dataset,或者叫做 datasetWithIP。返回 datasetWithIP,这样整个IP转换的功能就写完了。
//3.使用数据( RDD ),加上新的 Schema ,生成新的 Dataset
val datasetWithIP=dataset. sparksession.createDataFrame
(converted, schema)
datasetWithIP
}
8、看起来很复杂,但是自己理顺思路,并不是很复杂。过一下整体上的步骤,第一步要把 dataset 转为 RDD,dataset 当中放的是 row,rdd 放的也是 row,所以就要把 dataset 当中的 row 对象添加四列数据,每一个row 对象当中添加四列数据。在 iterator.map 外层创建 dbsearch 和 lookupservice,然后在内层先获取 region、city 、经度、纬度,然后拼到原始的 row 的数据后面,再通过新的 seq 对象,生成 row 对象返回,返回完以后 RDD 就算生成了。在整个代码的最底部,单独处理 IP 数据的方法类,找到 process 的方法类,这个方法类就能获取到转换过的 RDD 数据。把原始的 schema 当中也添加四个列,通过 schema 加 rdd 创建新的 dataset,以上就是整体步骤。