七、图的算子
1、属性算子
类似于RDD的 map 操作
scala> graph.mapEdges(e=>(e.srcId,e.dstId,e.attr+50)).edges.collect res16: Array[org.apache.spark.graphx.Edge[(org.apache.spark.graphx.VertexId, org.apache.spark.graphx.VertexId, Int)]] = Array(Edge(1,2,(1,2,550)), Edge(1,3,(1,3,650)), Edge(2,4,(2,4,650)), Edge(3,2,(3,2,150)), Edge(3,4,(3,4,1050)), Edge(3,5,(3,5,850))) // 解构语法 scala> graph.mapVertices{case (vid,(city,template))=>(vid,(city,template+3))}.vertices.collect res17: Array[(org.apache.spark.graphx.VertexId, (org.apache.spark.graphx.VertexId, (String, Int)))] = Array((4,(4,(上海,41))), (1,(1,(南京,43))), (3,(3,(北京,18))), (5,(5,(杭州,26))), (2,(2,(天津,15))))
2、结构算子
// subgraph 在大的图里截取小图 两个变量,存的都是函数 // vpred 高阶函数写法 scala> graph.subgraph(vpred=(id,attr)=>attr._2>35).vertices.collect res2: Array[(org.apache.spark.graphx.VertexId, (String, Int))] = Array((4,(上海,38)), (1,(南京,40))) // epred scala> graph.subgraph(epred=x=>x.attr<500).triplets.collect res15: Array[org.apache.spark.graphx.EdgeTriplet[(String, Int),Int]] = Array(((3,(北京,15)),(2,(天津,12)),100)) // 结合理解 scala> def xxx(a:Long,b:Tuple2[String,Integer]):Boolean={ | b._2<500 | } xxx: (a: Long, b: (String, Integer))Boolean scala> println(xxx(10L,("abc",300))) true // abc函数 当作变量传进 subgraph 函数 scala> def abc(id:VertexId,attr:Tuple2[String,Int]):Boolean={ | attr._2>35 | } scala> graph.subgraph(vpred=abc).vertices.collect res19: Array[(org.apache.spark.graphx.VertexId, (String, Int))] = Array((4,(上海,38)), (1,(南京,40)))
3、Join 算子
从外部的RDDs加载数据,修改顶点属性
scala> graph.triplets.collect res20: Array[org.apache.spark.graphx.EdgeTriplet[(String, Int),Int]] = Array(((1,(南京,40)),(2,(天津,12)),500), ((1,(南京,40)),(3,(北京,15)),600), ((2,(天津,12)),(4,(上海,38)),600), ((3,(北京,15)),(2,(天津,12)),100), ((3,(北京,15)),(4,(上海,38)),1000), ((3,(北京,15)),(5,(杭州,23)),800)) scala> val ngp = sc.parallelize(Array((1L,30),(2L,60))) ngp: org.apache.spark.rdd.RDD[(Long, Int)] = ParallelCollectionRDD[52] at parallelize at <console>:27 scala> graph.joinVertices(ngp)((id,oldval,newval)=>(oldval._1,oldval._2+newval)) res22: org.apache.spark.graphx.Graph[(String, Int),Int] = org.apache.spark.graphx.impl.GraphImpl@4f134331 scala> graph.outerJoinVertices(ngp)((id,oldval,newval)=>(oldval._1,newval)).triplets.collect res24: Array[org.apache.spark.graphx.EdgeTriplet[(String, Option[Int]),Int]] = Array(((1,(南京,Some(30))),(2,(天津,Some(60))),500), ((1,(南京,Some(30))),(3,(北京,None)),600), ((2,(天津,Some(60))),(4,(上海,None)),600), ((3,(北京,None)),(2,(天津,Some(60))),100), ((3,(北京,None)),(4,(上海,None)),1000), ((3,(北京,None)),(5,(杭州,None)),800))
八、GraphX API 的应用
- 计算用户粉丝数量
scala> graph.mapVertices{case (id,(city,tmp))=>City(city,tmp)}.triplets.collect res28: Array[org.apache.spark.graphx.EdgeTriplet[City,Int]] = Array(((1,City(南京,40)),(2,City(天津,12)),500), ((1,City(南京,40)),(3,City(北京,15)),600), ((2,City(天津,12)),(4,City(上海,38)),600), ((3,City(北京,15)),(2,City(天津,12)),100), ((3,City(北京,15)),(4,City(上海,38)),1000), ((3,City(北京,15)),(5,City(杭州,23)),800)) //outerJoinVertices()会join所有的点,没有匹配上的的返回None scala> val cc = sc.parallelize(Array((1L,20),(2L,30))) //getOrElse(0)匹配上的用,没有的返回None scala> graph.outerJoinVertices(cc)((id,oldv,newv)=>(oldv._1,newv.getOrElse(0))).triplets.collect res44: Array[org.apache.spark.graphx.EdgeTriplet[(String, Int),Int]] = Array(((1,(南京,20)),(2,(天津,30)),500), ((1,(南京,20)),(3,(北京,0)),600), ((2,(天津,30)),(4,(上海,0)),600), ((3,(北京,0)),(2,(天津,30)),100), ((3,(北京,0)),(4,(上海,0)),1000), ((3,(北京,0)),(5,(杭州,0)),800)) // join 只对相关联的代码进行操作 scala> graph.joinVertices(cc)((id,oldv,newv)=>(oldv._1,newv)).triplets.collect res35: Array[org.apache.spark.graphx.EdgeTriplet[(String, Int),Int]] = Array(((1,(南京,20)),(2,(天津,30)),500), ((1,(南京,20)),(3,(北京,15)),600), ((2,(天津,30)),(4,(上海,38)),600), ((3,(北京,15)),(2,(天津,30)),100), ((3,(北京,15)),(4,(上海,38)),1000), ((3,(北京,15)),(5,(杭州,23)),800)) // 出入度统计 // 样例类 case class City(name:String,tmp:Int,inNum:Int,outNum:Int) val find = graph.mapVertices{case (id,(city,tmp))=>City(city,tmp,0,0)} find.joinVertices(find.inDegrees)((id,oldval,newval)=>City(oldval.name,oldval.tmp,newval,oldval.outNum)).joinVertices(find.outDegrees)((id,oldval,newval)=>City(oldval.name,oldval.tmp,oldval.inNum,newval)).vertices.collect res43: Array[(org.apache.spark.graphx.VertexId, City)] = Array((4,City(上海,38,2,0)), (1,City(南京,40,0,2)), (3,City(北