【Spark】(九)Spark GraphX 图计算解析2

简介: 【Spark】(九)Spark GraphX 图计算解析2


七、图的算子



image.png


1、属性算子


类似于RDD的 map 操作

scala> graph.mapEdges(e=>(e.srcId,e.dstId,e.attr+50)).edges.collect
res16: Array[org.apache.spark.graphx.Edge[(org.apache.spark.graphx.VertexId, org.apache.spark.graphx.VertexId, Int)]] = Array(Edge(1,2,(1,2,550)), Edge(1,3,(1,3,650)), Edge(2,4,(2,4,650)), Edge(3,2,(3,2,150)), Edge(3,4,(3,4,1050)), Edge(3,5,(3,5,850)))
// 解构语法
scala> graph.mapVertices{case (vid,(city,template))=>(vid,(city,template+3))}.vertices.collect
res17: Array[(org.apache.spark.graphx.VertexId, (org.apache.spark.graphx.VertexId, (String, Int)))] = Array((4,(4,(上海,41))), (1,(1,(南京,43))), (3,(3,(北京,18))), (5,(5,(杭州,26))), (2,(2,(天津,15))))


2、结构算子


image.png


// subgraph 在大的图里截取小图  两个变量,存的都是函数
// vpred    高阶函数写法  
scala> graph.subgraph(vpred=(id,attr)=>attr._2>35).vertices.collect
res2: Array[(org.apache.spark.graphx.VertexId, (String, Int))] = Array((4,(上海,38)), (1,(南京,40)))
// epred     
scala> graph.subgraph(epred=x=>x.attr<500).triplets.collect
res15: Array[org.apache.spark.graphx.EdgeTriplet[(String, Int),Int]] = Array(((3,(北京,15)),(2,(天津,12)),100))
// 结合理解
scala> def xxx(a:Long,b:Tuple2[String,Integer]):Boolean={
     | b._2<500
     | }
xxx: (a: Long, b: (String, Integer))Boolean
scala> println(xxx(10L,("abc",300)))
true
// abc函数 当作变量传进 subgraph 函数
scala> def abc(id:VertexId,attr:Tuple2[String,Int]):Boolean={
     | attr._2>35
     | }
scala> graph.subgraph(vpred=abc).vertices.collect
res19: Array[(org.apache.spark.graphx.VertexId, (String, Int))] = Array((4,(上海,38)), (1,(南京,40)))


3、Join 算子


从外部的RDDs加载数据,修改顶点属性


image.png


scala> graph.triplets.collect
res20: Array[org.apache.spark.graphx.EdgeTriplet[(String, Int),Int]] = Array(((1,(南京,40)),(2,(天津,12)),500), ((1,(南京,40)),(3,(北京,15)),600), ((2,(天津,12)),(4,(上海,38)),600), ((3,(北京,15)),(2,(天津,12)),100), ((3,(北京,15)),(4,(上海,38)),1000), ((3,(北京,15)),(5,(杭州,23)),800))
scala> val ngp = sc.parallelize(Array((1L,30),(2L,60)))
ngp: org.apache.spark.rdd.RDD[(Long, Int)] = ParallelCollectionRDD[52] at parallelize at <console>:27
scala> graph.joinVertices(ngp)((id,oldval,newval)=>(oldval._1,oldval._2+newval)) 
res22: org.apache.spark.graphx.Graph[(String, Int),Int] = org.apache.spark.graphx.impl.GraphImpl@4f134331
scala> graph.outerJoinVertices(ngp)((id,oldval,newval)=>(oldval._1,newval)).triplets.collect
res24: Array[org.apache.spark.graphx.EdgeTriplet[(String, Option[Int]),Int]] = Array(((1,(南京,Some(30))),(2,(天津,Some(60))),500), ((1,(南京,Some(30))),(3,(北京,None)),600), ((2,(天津,Some(60))),(4,(上海,None)),600), ((3,(北京,None)),(2,(天津,Some(60))),100), ((3,(北京,None)),(4,(上海,None)),1000), ((3,(北京,None)),(5,(杭州,None)),800))


八、GraphX API 的应用


  • 计算用户粉丝数量


image.png


scala> graph.mapVertices{case (id,(city,tmp))=>City(city,tmp)}.triplets.collect
res28: Array[org.apache.spark.graphx.EdgeTriplet[City,Int]] = Array(((1,City(南京,40)),(2,City(天津,12)),500), ((1,City(南京,40)),(3,City(北京,15)),600), ((2,City(天津,12)),(4,City(上海,38)),600), ((3,City(北京,15)),(2,City(天津,12)),100), ((3,City(北京,15)),(4,City(上海,38)),1000), ((3,City(北京,15)),(5,City(杭州,23)),800))
//outerJoinVertices()会join所有的点,没有匹配上的的返回None
scala> val cc = sc.parallelize(Array((1L,20),(2L,30)))
//getOrElse(0)匹配上的用,没有的返回None
scala> graph.outerJoinVertices(cc)((id,oldv,newv)=>(oldv._1,newv.getOrElse(0))).triplets.collect
res44: Array[org.apache.spark.graphx.EdgeTriplet[(String, Int),Int]] = Array(((1,(南京,20)),(2,(天津,30)),500), ((1,(南京,20)),(3,(北京,0)),600), ((2,(天津,30)),(4,(上海,0)),600), ((3,(北京,0)),(2,(天津,30)),100), ((3,(北京,0)),(4,(上海,0)),1000), ((3,(北京,0)),(5,(杭州,0)),800))
// join 只对相关联的代码进行操作
scala> graph.joinVertices(cc)((id,oldv,newv)=>(oldv._1,newv)).triplets.collect
res35: Array[org.apache.spark.graphx.EdgeTriplet[(String, Int),Int]] = Array(((1,(南京,20)),(2,(天津,30)),500), ((1,(南京,20)),(3,(北京,15)),600), ((2,(天津,30)),(4,(上海,38)),600), ((3,(北京,15)),(2,(天津,30)),100), ((3,(北京,15)),(4,(上海,38)),1000), ((3,(北京,15)),(5,(杭州,23)),800))
// 出入度统计
// 样例类
case class City(name:String,tmp:Int,inNum:Int,outNum:Int)
val find = graph.mapVertices{case (id,(city,tmp))=>City(city,tmp,0,0)}
find.joinVertices(find.inDegrees)((id,oldval,newval)=>City(oldval.name,oldval.tmp,newval,oldval.outNum)).joinVertices(find.outDegrees)((id,oldval,newval)=>City(oldval.name,oldval.tmp,oldval.inNum,newval)).vertices.collect
res43: Array[(org.apache.spark.graphx.VertexId, City)] = Array((4,City(上海,38,2,0)), (1,City(南京,40,0,2)), (3,City(北




目录
相关文章
|
2月前
|
存储 并行计算 前端开发
【C++ 函数 基础教程 第五篇】C++深度解析:函数包裹与异步计算的艺术(二)
【C++ 函数 基础教程 第五篇】C++深度解析:函数包裹与异步计算的艺术
41 1
|
2月前
|
数据安全/隐私保护 C++ 容器
【C++ 函数 基础教程 第五篇】C++深度解析:函数包裹与异步计算的艺术(一)
【C++ 函数 基础教程 第五篇】C++深度解析:函数包裹与异步计算的艺术
53 0
|
1月前
|
存储 编译器 Linux
【C语言】自定义类型:结构体深入解析(二)结构体内存对齐&&宏offsetof计算偏移量&&结构体传参
【C语言】自定义类型:结构体深入解析(二)结构体内存对齐&&宏offsetof计算偏移量&&结构体传参
|
2月前
|
存储 算法 Serverless
【软件设计师备考 专题 】数据结构深度解析:从数组到图
【软件设计师备考 专题 】数据结构深度解析:从数组到图
58 0
|
2月前
|
存储 算法 编译器
【C++ 泛型编程 进阶篇】C++模板元编程深度解析:探索编译时计算的神奇之旅
【C++ 泛型编程 进阶篇】C++模板元编程深度解析:探索编译时计算的神奇之旅
120 0
|
2月前
|
开发工具 数据安全/隐私保护 Android开发
视觉智能平台常见问题之图片解析出的水印图判断是自己添加的水印图如何解决
视觉智能平台是利用机器学习和图像处理技术,提供图像识别、视频分析等智能视觉服务的平台;本合集针对该平台在使用中遇到的常见问题进行了收集和解答,以帮助开发者和企业用户在整合和部署视觉智能解决方案时,能够更快地定位问题并找到有效的解决策略。
24 1
|
6月前
|
分布式计算 Java Spark
图解Spark Graphx实现顶点关联邻接顶点的collectNeighbors函数原理
图解Spark Graphx实现顶点关联邻接顶点的collectNeighbors函数原理
35 0
|
2月前
|
算法 Java
Java必刷入门递归题×5(内附详细递归解析图)
Java必刷入门递归题×5(内附详细递归解析图)
26 1
|
7月前
|
Cloud Native Go 开发工具
如何让CSDN学习成就个人能力六边形全是100分:解析个人能力雷达图的窍门
如何让CSDN学习成就个人能力六边形全是100分:解析个人能力雷达图的窍门
140 0
|
3月前
|
弹性计算 大数据 测试技术
阿里云服务器服务费怎么计算?详细解析2024新版
阿里云服务器服务费怎么计算?详细解析2024新版,云服务器ECS经济型e实例2核2G、3M固定带宽99元一年、ECS u1实例2核4G、5M固定带宽、80G ESSD Entry盘优惠价格199元一年,轻量应用服务器2核2G3M带宽轻量服务器一年61元、2核4G4M带宽轻量服务器一年165元12个月、2核4G服务器30元3个月
59 1

推荐镜像

更多