开发者学堂课程【2020版大数据实战项目之 DMP 广告系统(第七阶段):打标签_环境准备】学习笔记,与课程紧密联系,让用户快速学习知识。
课程地址:https://developer.aliyun.com/learning/course/682/detail/11835
打标签_环境准备
DMP 的最后一个功能就是对整个数据集打上标签,然后进行图计算把标签之间的关系找到进行一个统一的用户识别,接下来这个打标签的功能相对来说比较简单。
创建好一个类: TagRunner
在这个类当中要做如下几个步骤:
(1)创建 SparkSession
(2)读取数据
(3)标签生成
代码如下:
Package cn.itcast.tags
Import org.apache.spark.sql.SparkSession
object TagRunner {
Private val ODS_TABLE_NAME = ETLRunner.ODS_TABLE_NAME
Private val AREA_TABLE_NAME = BusinessAreaRunner.AREA_TABLE_NAME
def main(args: Arrary[String]): Unit = {
Import cn.itcast.utils.KuduHelper._
Import cn.itcast.utils.SparkConfrigHelper._
// (1)创建 SparkSession
val spark = SparkSession.builder()
.appName( name = "tag")
.master(master = "local[6]")
.loadConfig()
.getOrCreate()
Import spark.implicits._
Import org.apache.spark.squl.functions._
Val geoHash = udf(toGeoHash _)
// 直接拿到 spark 对象后使用SparkSession.builder
,接着 @ 一个 appName
命名为“tag”
,设置 master
为“local[6]”
,取getOrCreate
前需要设置一些参数,取 import 的两个工具类“cn.itcast.utils.KuduHelper._
和cn.itcast.utils.SparkConfrigHelper._”
,接下来在 “.getOrCreate()”
前给上“.loadConfig()”
,这个时候SparkSession
就创建出来了。
//(2)读取数据
val odsoption = spark.readKuduTable(ODS_TABLE_NAME)
Val areaOption = spark.readKuduTable(AREA_TABLE_HAME)
If (odsoption.isEmpty | | areaoption.isEmpty)_return
//使用spark.readKuduTable
,需要 ODS 层的表,所以直接把名字创建出来,叫做ODS_TABLE_NAME
,从 ETLRunner 中找到”ODS_TABLE_NAME”
这个属性,把ODS 表名写到spark.readKuduTable
这里;表生成后拿到一个 option ,把 option 表示出来叫做 odsoption ,这个时候就可以进行标签的生成了。
但在标签生成前有一个功能要用到商圈库,直接在读取数据这一步把商圈库也读出来。
接下来我们拿到 areaoption ,通过 spark.readKuduTable
来拿,需要把名字生成Private val
,找到对应的 area 文件里的BusinessAreaRunner
,往下找到最后一部分有一个 AREA_TABLE_NAME
的属性,直接设置为公共的让外部也可以访问,应该提供一个String ,那这样一个类型如果不提供 String 可能会有问题。接着进行AREA_TABLE_NAME
它的名字叫做BusinessAreaRunner.AREA_TABLE_NAM
E,接下来就可以进行具体的标签生成了。
在标签生成前需要有 ODS 层的数据,areaoption 的数据也得有,如果这两个数据没有的话就没有办法继续向下进行,所以直接进行一个判断,判断odsoption .isEmpty
或者areaoption.isEmpty
这样判断以后可以“return”返回 。
现在我们的 ODS 层里面是没有商圈数据的,所以应该将商圈数据加进 ODS 层里,用一个统一的数据集来进行相应的操作。
// (3)标签生成
//(3.1) 将ODS 的表加入 GeoHash
Val odsWithGeoHash = odsWithColumn(
coName = “geoHash”,geoHash(‘longitude,’latitude)
)
//(3.2) 生成一个新的数据集,这个数据集包含了商圈信息
Val odsWithArea = odsWithGeoHash.join(
areaoption.get,
OdsWidthGeoHash.col( colName = “geoHash”) === areaoption.get.col(colName = “geoHash”),
joinType = “left”
)
}
def toGeoHash(longitude: Double,latitude:Double): String = {
GeoHash.withCharacterPrecision(latitude, longitude, numberofCharacters = 8).toBase32
}
//先拿到一个Val odsWithArea = odsoption.get.join(Areaoption.get,)
的数据集,还要有一些 join 的方式,Odsoption.get.col (colName = “geoHash”) ,colName
里面写上“geoHash”,观察发现现在 ODS 层里面没有 geoHash,所以还是要再去创建一个函数,命名为 geoHash,这个函数当中接收一个 longitude ,对应的是一个 Double 类型,然后再来一个 latitude,对应的是一个 Double 类型,接下来 Unit 返回一个 String ,这个 String 就是 geoHash 的值;
接下来可以把 longitude 和 latitude 生成一个 GeoHash,把它叫做withCharacterPrecision
,把 latitude 和 longitude 传进来,再去指定生成8位“numberofCharacters = 8”
,再toBase32
。
前面做完后还应该在第一步——“创建SparkSession
”这里的“.getorcreate()”
后面先去导入对应的内容,取“Import spark.implicits._
”,再取“Import org.apache.spark.squl.functions._
”,都导入了以后就可以把这个方法生成一个“UDF”了,这里我们只需要导成函数即可,因为我们最终还是要处理“odsWithArea
”,导成函数需要拿到一个 geoHash 这样的一个函数,命名使用 udf ,这个 udf 是functions 中的函数,它可以把一个函数变成一个基础的 udf ,但是不能变成 udaf,所以把 geoHash 传进来,得到函数‘’Val geoHash = udf(toGeoHash _)”
。
拿到 geoHash 后我们第一步可以先处理一下“(3.1) 将 ODS 的表加入GeoHash”,加入 GeoHash 可以直接拿到Val odsWithGeoHash =
odsoption.get.withColumn()
,这个新的 Column命名成“geoHash”
。
命名好后就可以考虑该怎么取生成这一列的数据,就必须要使用到
geoHash 这样的一个 udf,使用过程,传进去一个 longitude 和
latitude,这个时候整个的数据集就生成了,后面我们就不再使用
odsoption,而是直接使用包含有 geoHash的 ODS 层的表;取出
“geoHash”
和 “areaoption.get
这个表里的col(colName =
“geoHash”)
”
这一列判等,左边和右边 geoHash 相等才能 ‘’
join” 起来,这时还要再去制定一个参数告诉它你是左外、右
外还是内连接,还是第二级,直接 left ,最少保留 ODS 层的所有数
据。
通过上面这一步就可以生成了“(3.2) 生成一个新的数据集,这个数据集包含了商圈信息‘’,也就是说(3.1)、(3.2)这两步都是在试图把商圈信息加到 ODS 层表当中。