开发者学堂课程【2020版大数据实战项目之 DMP 广告系统(第七阶段):统一识别-标签聚合】学习笔记,与课程紧密联系,让用户快速学习知识。
课程地址:https://developer.aliyun.com/learning/course/682/detail/11841
统一识别-标签聚合
内容介绍:
一、简介
二、操作
三、小结
一、简介
图计算结果已经生成,接着来看聚合。首先输入 comments.show() 查看数据集,先去运行代码,
结果如下
能够注意到第一列是id,第二列是ids,第三列是tags,第四列叫 comment。component是连通图的一个id。
可以理解哪些数据在一张图当中,这些数据一定是同一个人的。因为这些id之间是有联系的。
所以只需要先按照component来进行分组。分组完以后,把分组下的所有ids和tags来进行聚合,主id此时无关紧要,只要能拿到所有的id,那主id随时可以生成。
然后tags要合并起来,按照 component 进行分组。
二、操作
1.分组
// 3、聚合
第一步输入components.groupByKey() ,可以进行一些有类型的操作。
如果要进行有类型的操作,需要把components转成其它的类型。复制Vertex(id:String,ids:Map【String, :String】, tags:Map【String, :String】componentLong)因为联通图中基本是Vertex的内容,不过最后一步追加component。id对应的是Long类型。
接下来就可以拿新类型代替旧类型,在.run后面加
as[
Vertex
Component]:
Valcomponents:Dataset[
VertexComponent
]=GraphFrame(vertex.toDF(),edge.toDF()).connectedComponents.run
().as[VertexComponent]
,此时Vertex就是component类型。
也可以整体上折行,看到component其实是Dataset,里面的数据是VertexComponent。
那么在进行groupByKey
时也是component
,可以从component当中取出。
components.groupByKey(component=>component.component)
此时按照里面的 component 来进行分组。
分组以后进行聚合,先表示数据集。
val grouped = components.groupByKey(component => component.component)
意思是把每一个组进行reduce。前面是按照 Component 来进行分组的。按照component分组以后,每一个 component 对应多条数据。那么接下来只需要去把每一组的 Vertexcomponent 对象折叠,变成一个对象即可。
val agg:Dataset[(Long,=VertexComponent)] = grouped
.
reduceGroups(
reduceVertex_
def reduceVertex(curr:VertexComponent, mid: VertexComponent): Verte
x
Component
=(
reduce
Gr
oups
创建方法reduceVertex
,
reduceVertex
中接收两个参数 curr 和 mid,对应的类型都是VertexComponent,此时可让 reduceGroups 接收 reduceVertex,agg 里面是一个元组,agg 是聚合过后的数据集。
2.reduce
统计后还有工作要做,但是先进行reduce,需要生成VertexComponent,接收id, ids, tags 三个参数。
val id = curr.id
val ids = curr.ids ++ mid.ids
VertexComponent
(
id, ids, tags, curr.component
)
}
}
(1)id
id=curr.id,然后
ids = curr.ids ++ mid.ids
,
ids
生成。
(2)tags
// Map(a -> 1, b -> 2) ++ Map(c -> 3, a->4)
//结果是Map(a ->4
,
b->2
,
c-> 3)
问题是新值会覆盖原值。
如果val tags =
curr.tags ++ mid.tags
那么此时只要curr.tags
、
mid.tags
里面有重复,那么后者必须替代前者,会丢失一些标签。正确做法应该把它们相加起来。
举例说明:
//Map(A20 -> 1) ++ Map(A20 -> 1)
“年龄20,权重1”
//结果应该是:M
ap
(A20->2)
,
但是事实上直接加加,会生成错误的(A20->1)。
此时可以对curr.tags处理。写 case 时,每一个 tag 对应每一条数据map,里面是key, value),所以case (key, value)
,判断mid 是否 contains ,对应 key 是否存在,如果有 key, 就返回一个元组,元组里 key 还是 key, 但是 value 可以加上1或者mid.tags.get(key)
,返回 opention 时可以直接.get; else
的情况下不变还是返回 key。
val temp = {
curr.tags.map {
case (key, value) => if (mid.tags.contains(key)) (key, value + mid.tags.get(key).get) else (key, value)
}
这种合并缺点是会丢失一些数据,不写else,一条数据是会被丢弃掉的。所以else一定要去写。如果没有匹配到对方数据,数据还要原路返回。
如果不写else,那么拿curr中间结果,相加中间结果:curr++temp。此时会把temp里面的东西替换给curr并保持curr里面内容不变。
两种方式都可以,采用第一种方式。写上else后其实还有问题。
//Map(A20 -> 1) ++ Map(A20 -> 1)
//Map(A20 -> 1) ++ Map(A20 -> 1,
KWshuage-> 1
)
假如后面多KWshuage-> 1,也就是后者map当中有一个数据,前者没有即使再进行ios,这条数据也能加进来。也就是说它只会包含curr有的数据,对于 curr 当中没有的数据不能加入。
此时应该使用temp
+
+mid
, 来进行相应的操作。但是 mid 应该是放在 temp 前面。因为后者里面的内容会替换前者里面的内容,如果把 mid 放在 temp 后面,mid 就会被 temp 替换。
报红线的原因很简单,就是因为少写了tags
:
midtags ++temp
tag生成, tag生成后传到vertex
component
里。但是它里面还差数据component id
。设置component id
。
三、小结
返回一条新的数据。
agg 的类型:
首先里面是元组,这个元组第二项是vertexcomponent
。不是需要maind,tags
数据。但是聚合以后,形成数据集的格式(
Long, vertexcomponent
)
不是很正确,里面放的是元组,应该把元组的内容同样聚合掉。