开发者学堂课程【2020版大数据实战项目之 DMP 广告系统(第七阶段):打标签_完成】学习笔记,与课程紧密联系,让用户快速学习知识。
课程地址:https://developer.aliyun.com/learning/course/682/detail/11837
打标签_完成
(3.5)生成 mainId
上节3.4这个步骤当中,我们把标签数据生成并且拼接到了一个
Tags 可变的 Map 当中,接下来,我们就要去生成 mainId,如何
生成呢?
应按照一定的顺序去生成,后面可能会生成很多个这样的一些对象叫
做 IdsWithTags,如果这个 mainId 生成的规则不一样的话,在使
用这个 mainId 进行合并、图计算的时候会带来非常大的困扰,所
以 mainId 必须按照一定的顺序去生成。所以这样我们再写一个方
法专门去生成 mainId。
// mainId 的生成需要按照一个顺序来进行
第一步:Val ids = genIdMap(row)
第二步:Val mainId = getMainId(ids)
IdsWithTags
(mainId, ids, tags.toMap)
// 返回一个IdsWithTags
对象,第一个数据是 mainId, 第二个属性是 ids, 第三个属性是 tags, 这里的 tags 是一个不可变的 map, 所以需要 toMap 的方法转为不可变的 Map,再设置进去,此时标签就打完了。
}
(注意到(case class IdsWithTags(mainId: String, ids:
Map[String, String], tags:Map[String,Int]
)
中, IdsWithTags 当
中还有一个 ids,这个 ids 当中需要一个 map, 所以可以先把
Row 转为 map 形式的ids。)
// 先把Row 转为 Map 形式的 ids 方法
(先把方法写出来,这个方法命名为 genMainId)
def genMainId(row: Row): Map[String,String] = {
val keyList = List(
"imei", "imeimd5", "imeisha1", "mac", "macmd5", "macsha1", "openudid", "openudidmd5", "openudidsha1", "idfa", "idfamd5", "idfasha1"
)
keyList.map(key => (key, row.getAs[String](key)))
.filter(kv => StringUtils.isNotBlank(kv._2))
) .toMap
}
// 接收一个 Row 对象,返回 Map ,Map 是 String 的key 也是
value, 是我们最终要设置给 ids 中 Map 的值;先把 Map 写出
来,后面这个方法就有解了;拿到 keyList,取 keyList.map,
map 从 Row 当中取对应的 key, 生成的数组是一个 String, 此
时keyList 原来是 key,现在变成 value,进行简单的 filter, 可以
判断 value 是否为空,通过 StringUtils.isNotBlank判断,如果
是空我们就不把它加到数据集当中,把 value 传进来判断是否为空
或者空白,这个地方如果返回 true 的话,value 就会被加进去,所
以需要在 is 后面加一个“Not”。过滤后就没有不存在的 value
了。此时只剩下一个 value 了,
最终我们要得到一个 Map 返回,所以需要生成一个元组,把 key 作为 key, 把 value作为 value 的元组;value 改为 kv, 另一个 value 改为 kv._2, 其中“_2”下划线2取的就是“row.getAs[String](key) ”这个 value 的值, 使用 toMap 直接把它转为一个Map。(注:如果跟 List 当中是一个 kv 类型的或者是一个二元元组的是可以直接转为 Map的 )
def getMainId(ids: Map[ String, String]): String = {
val keyList = List(
"imei", "imeimd5", "imeisha1", "mac", "macmd5",
"macsha1",
"openudid", "openudidmd5", "openudidsha1",
"idfa","idfamd5", "idfasha1"
)
// def getMainId
这个方法当中接收一个 Row 对象,就拿到一个
Row 对象,里面有哪些字段可以作为 ID ?
字段表示为 keyList, keyList
当中给出一个 List,第一项应该是什
么?这里的 key 就不再自己手写,直接去原来代码里复制,顺序也
不需要手动去调整,直接拿过来即;此时 List 另起一行把数据放进
来,“imei”是可以作为 ID 的,那么 imei 的"imeimd5",
"imeisha1"
值也可以作为 mainId,"mac", "macmd5",
"macsha1", "openudid", "idfa",
都可以作为 ID,就按照这样的顺
序来取 mainId。
如何取?例:Row 里面有imei 就在直接把 imei 的值返回,它就
是什么 id, 所以返回值应该是一个 String 类型;如何进行相应的取得?
首先拿到 keyList,map 一下,这个 map 把它转为value 值,通过拿到 row 取出 getAs 其中的内容,但这个做法不安全,可以改为 filter, 当中接收到一个 key 信息,拿到 key 后,通过 Row 来判断其中有没有这个 key, 正常情况下可以在其它的集合当中使用 contains 来进行判断,但是 Row 当中没有这个内容,但在 Row 当中是可以通过 fieldIndex 的属性来进行判断,把对应的 key 传进去,点开 fieldIndex, 如果 schema 不存在,它就报一个UnsupportrdOperationException,
如果传的 a field 不存在,它就报一个IllegalArgmentException
。所以会报错,如果你判断这个数据是否存在,还报错,这样比较麻烦。所以先把 keyList.filter(key => row.fieIdIndex())
这一小节放在这,先做下一个功能,但大家要知道 mainId 是通过这个方法来获取的。
keyList.map(key => ids.get(key))
.filter(option => option.isDefined)
. Map(option => option.get)
.head
}
// 直接在 ids 当中取 get(key), 这个 get 的返回值是 option,
进行过滤的时候可以直接 filte 拿到 option, 如果已经定义过就把
它加到数据集当中。这个时候取的 head 还是一个 option, 还要再
进行一次转换拿到 option 把它当中的数据 get 出来, 此时再去
head 拿到的就是String ,返回后就能得到 mainId。然后就通过
getMainId 传进去一个 ids。
把无关的方法折叠掉后的部分代码:
import ch.hsr.geohash.GeoHash
import cn.itcast.area.BusinessAreaRunner
import cn.itcast.etl.ETLRunner
import org.apache.commons.lang3.StringUtils
import org.apache.spark.sql.(DataFrame, Dataset, Row, SparkSession)
import scala.collection.mutable
object TagRunner (
private val ODS_TABLE_NAME =
ETLRunner.ODS_TABLE_NAME
private val AREA_TABLE_NAME =
BusinessAreaRunner.AREA_TABLE_NAME
def main(args: Array[String]): Unit = {
import...
// 1. 创建 SparkSession
val spark = SparkSession.builder()
.appName( name = "tag")
.master( master =“local[6]"
)
.loadConfig()
.getOrCreate()
import...
val geoHash = udf(toGeoHash_)
// 2. 读取数据
val odsoption = spark.readKuduTable(ODS_TABLE_NAME)
val area0ption = spark.readKuduTable(AREA_TABLE_NAME)
if (odsOption.isEmpty | | areaoption.isEmpty) return
val odsWithGeoHash = odsoption.get.withColumn(...)
// (3.2) 生成一个新的数据集,这个数据集包含了商圈信息
val odsWithArea: DataFrame = odsWithGeoHash.join(...)
//(3.3) 生成标签数据
Val tags: Dataset[IdsWithTags] =
odsWithArea.map(createTags)
Tags.show()
}
/**...*/
def createTags(row: Row): IdsWithTags = {...}
def genIdMap(row: Row): Map[String, String] = {...}
def getMainId(ids: Map[String, String]): String = {...}
def toGeoHash(longitude: Double, latitude: Double): String = {...}
}
case class IdsWithTags(mainId: String, ids: Map[String,
String], tags: Map[String, Int])
(所有 ID 和 所有标签都是Map 类型的)
// (3.4.3) 关键词 keywords -> 帅哥,有钱 转换为:
row.getAs[String]( fieldName = “keywords”).split(regex = “
,”)
.map(
“KW”+_ -> 1)
.foreach(tags += +_)
// (3.4.7) 商圈标签,正常情况下在这里需要生成 GeoHash,然后再查询 Kudu 表,获取数据
//优化写法,直接先进行 join,把商圈信息添加进来,然后直接取
转换为:
row.getAs[String]( fieldName = “area”).split( regex = “
,”)
tags +=
(“AE”+ row.getAs[String]( fieldName = “area”)->1)
.map
(“A”+ _-> 1)
.foreach(tags += _)
把结果运行出来,第一个 mainId 对应的是没有问题的,如果没有 uuid、mac
地址的话可能是其他的mainId, 第二列 ids,mac 地址对应的就是52:54,openudid 对应的是 p,imei 对应的是238,前面也是238开头,openudid 是 Y 开头,前面对应的也是 Y,说明数据是没错的,tags 也有,但是 Map 稍微有一点问题,是因为有两个地方没有设置前缀,再进入到方法当中;
第一个没有给前缀的地方就是关键词这里应该给一个前缀叫做 .map(“KW”+ _ -> 1)
.foreach(tags +=”KW”+= _)
第二个地方为 .mp(“A”+ _ ->)
.foreach(tags += _)