开发者学堂课程【2020版大数据实战项目之 DMP 广告系统(第七阶段):统一识别-标签聚合】学习笔记,与课程紧密联系,让用户快速学习知识。
课程地址:https://developer.aliyun.com/learning/course/682/detail/11841
统一识别-标签聚合
内容介绍:
一、简介
二、操作
三、小结
一、简介
图计算结果已经生成,接着来看聚合。首先输入 comments.show() 查看数据集,先去运行代码,
结果如下
能够注意到第一列是id,第二列是ids,第三列是tags,第四列叫 comment。component是连通图的一个id。
可以理解哪些数据在一张图当中,这些数据一定是同一个人的。因为这些id之间是有联系的。
所以只需要先按照component来进行分组。分组完以后,把分组下的所有ids和tags来进行聚合,主id此时无关紧要,只要能拿到所有的id,那主id随时可以生成。
然后tags要合并起来,按照 component 进行分组。
二、操作
1.分组
// 3、聚合
第一步输入components.groupByKey() ,可以进行一些有类型的操作。
如果要进行有类型的操作,需要把components转成其它的类型。复制Vertex(id:String,ids:Map【String, :String】, tags:Map【String, :String】componentLong)因为联通图中基本是Vertex的内容,不过最后一步追加component。id对应的是Long类型。
接下来就可以拿新类型代替旧类型,在.run后面加
as[VertexComponent]:
Valcomponents:Dataset[VertexComponent]=GraphFrame(vertex.toDF(),edge.toDF()).connectedComponents.run().as[VertexComponent],此时Vertex就是component类型。
也可以整体上折行,看到component其实是Dataset,里面的数据是VertexComponent。
那么在进行groupByKey时也是component,可以从component当中取出。
components.groupByKey(component=>component.component) 此时按照里面的 component 来进行分组。
分组以后进行聚合,先表示数据集。
val grouped = components.groupByKey(component => component.component)
意思是把每一个组进行reduce。前面是按照 Component 来进行分组的。按照component分组以后,每一个 component 对应多条数据。那么接下来只需要去把每一组的 Vertexcomponent 对象折叠,变成一个对象即可。
val agg:Dataset[(Long,=VertexComponent)] = grouped.reduceGroups(reduceVertex_
def reduceVertex(curr:VertexComponent, mid: VertexComponent): VertexComponent =(
reduceGroups
创建方法reduceVertex,reduceVertex 中接收两个参数 curr 和 mid,对应的类型都是VertexComponent,此时可让 reduceGroups 接收 reduceVertex,agg 里面是一个元组,agg 是聚合过后的数据集。
2.reduce
统计后还有工作要做,但是先进行reduce,需要生成VertexComponent,接收id, ids, tags 三个参数。
val id = curr.id
val ids = curr.ids ++ mid.ids
VertexComponent(id, ids, tags, curr.component)
}
}
(1)id
id=curr.id,然后ids = curr.ids ++ mid.ids,ids生成。
(2)tags
// Map(a -> 1, b -> 2) ++ Map(c -> 3, a->4)
//结果是Map(a ->4, b->2,c-> 3)
问题是新值会覆盖原值。
如果val tags =curr.tags ++ mid.tags
那么此时只要curr.tags 、 mid.tags里面有重复,那么后者必须替代前者,会丢失一些标签。正确做法应该把它们相加起来。
举例说明:
//Map(A20 -> 1) ++ Map(A20 -> 1)“年龄20,权重1”
//结果应该是:Map(A20->2),
但是事实上直接加加,会生成错误的(A20->1)。
此时可以对curr.tags处理。写 case 时,每一个 tag 对应每一条数据map,里面是key, value),所以case (key, value),判断mid 是否 contains ,对应 key 是否存在,如果有 key, 就返回一个元组,元组里 key 还是 key, 但是 value 可以加上1或者mid.tags.get(key),返回 opention 时可以直接.get; else的情况下不变还是返回 key。
val temp = {
curr.tags.map {
case (key, value) => if (mid.tags.contains(key)) (key, value + mid.tags.get(key).get) else (key, value)
}
这种合并缺点是会丢失一些数据,不写else,一条数据是会被丢弃掉的。所以else一定要去写。如果没有匹配到对方数据,数据还要原路返回。
如果不写else,那么拿curr中间结果,相加中间结果:curr++temp。此时会把temp里面的东西替换给curr并保持curr里面内容不变。
两种方式都可以,采用第一种方式。写上else后其实还有问题。
//Map(A20 -> 1) ++ Map(A20 -> 1)
//Map(A20 -> 1) ++ Map(A20 -> 1,KWshuage-> 1)
假如后面多KWshuage-> 1,也就是后者map当中有一个数据,前者没有即使再进行ios,这条数据也能加进来。也就是说它只会包含curr有的数据,对于 curr 当中没有的数据不能加入。
此时应该使用temp++mid, 来进行相应的操作。但是 mid 应该是放在 temp 前面。因为后者里面的内容会替换前者里面的内容,如果把 mid 放在 temp 后面,mid 就会被 temp 替换。
报红线的原因很简单,就是因为少写了tags:midtags ++temp
tag生成, tag生成后传到vertexcomponent里。但是它里面还差数据component id。设置component id。
三、小结
返回一条新的数据。
agg 的类型:
首先里面是元组,这个元组第二项是vertexcomponent。不是需要maind,tags数据。但是聚合以后,形成数据集的格式(Long, vertexcomponent)不是很正确,里面放的是元组,应该把元组的内容同样聚合掉。

