文章目录
前言:为什么需要图计算
一、Spark GraphX 概述
二、图的术语
三、图的经典表示法
四、GraphX 核心抽象
五、GraphX API
六、属性图应用示例
七、图的算子
1、属性算子
2、结构算子
3、Join 算子
八、GraphX API 的应用
前言:为什么需要图计算
- 许多大数据以大规模图或网络的形式呈现
- 许多非图结构的大数据,常会被转换为图模型进行分析
- 图数据结构很好地表达了数据之间的关联性
一、Spark GraphX 概述
- 图是由顶点集合(vertex)及顶点间的关系集合(边edge)组成的一种网状数据结构。
Spark GraphX是一个分布式图处理框架,它是基于Spark平台提供对图计算和图挖掘简洁易用的而丰富的接口,极大的方便了对分布式图处理的需求。
那么什么是图,都计算些什么?众所周知社交网络中人与人之间有很多关系链,例如Twitter、Facebook、微博和微信等,数据中出现网状结构关系都需要图计算。
GraphX是一个新的Spark API,它用于图和分布式图(graph-parallel)的计算。GraphX通过引入弹性分布式属性图(Resilient Distributed Property Graph): 顶点和边均有属性的有向多重图,来扩展Spark RDD。为了支持图计算,GraphX开发了一组基本的功能操作以及一个优化过的Pregel API。另外,GraphX也包含了一个快速增长的图算法和图builders的集合,用以简化图分析任务。
从社交网络到语言建模,不断增长的数据规模以及图形数据的重要性已经推动了许多新的分布式图系统的发展。 通过限制计算类型以及引入新的技术来切分和分配图,这些系统可以高效地执行复杂的图形算法,比一般的分布式数据计算(data-parallel,如spark、MapReduce)快很多。
分布式图(graph-parallel)计算和分布式数据(data-parallel)计算类似,分布式数据计算采用了一种record-centric(以记录为中心)的集合视图,而分布式图计算采用了一种vertex-centric(以顶点为中心)的图视图。 分布式数据计算通过同时处理独立的数据来获得并发的目的,分布式图计算则是通过对图数据进行分区(即切分)来获得并发的目的。更准确的说,分布式图计算递归地定义特征的转换函数(这种转换函数作用于邻居特征),通过并发地执行这些转换函数来获得并发的目的。
分布式图计算比分布式数据计算更适合图的处理,但是在典型的图处理流水线中,它并不能很好地处理所有操作。例如,虽然分布式图系统可以很好的计算PageRank等算法,但是它们不适合从不同的数据源构建图或者跨过多个图计算特征。 更准确的说,分布式图系统提供的更窄的计算视图无法处理那些构建和转换图结构以及跨越多个图的需求。分布式图系统中无法提供的这些操作需要数据在图本体之上移动并且需要一个图层面而不是单独的顶点或边层面的计算视图。例如,我们可能想限制我们的分析到几个子图上,然后比较结果。 这不仅需要改变图结构,还需要跨多个图计算。
二、图的术语
- 顶点(Vertex)
- 边(Edge)
Graph={V,E} 集合V={v1,v2,v3} 集合E={(v1,v2),(v1,v3),(v2,v3)}
- 有向图
- 无向图
- 有环图 包含一系列顶点连接的回路 (环路)
- 无环图 DAG 即为有向
- 度 : 一个顶点所有边的数量
- 出度:指从当前顶点指向其他顶点的边的数量
- 入度:其他顶点指向当前顶点的边的数量
三、图的经典表示法
邻接矩阵
四、GraphX 核心抽象
五、GraphX API
创建 Graph
六、属性图应用示例
import org.apache.spark.graphx._ val verticeArray=Array((1L,("zhangsan",32)),(2L,("lisi",28)),(3L,("wangwu",36)),(4L,("zhaoliu",80)),(5L,("songqi",24)),(6L,("yuba",12))) val edgeArray=Array(Edge(4L,1L,1),Edge(2L,1L,7),Edge(2L,4L,2),Edge(3L,2L,4),Edge(5L,2L,2),Edge(5L,3L,8),Edge(5L,6L,3),Edge(3L,6L,3)) val vertsRdd=sc.parallelize(verticeArray) val edgeRdd=sc.parallelize(edgeArray) val graph = Graph(vertsRdd,edgeRdd) scala> graph.vertices.filter(p=>p._2._2>30).collect res2: Array[(org.apache.spark.graphx.VertexId, (String, Int))] = Array((4,(zhaoliu,80)), (1,(zhangsan,32)), (3,(wangwu,36))) // 偏函数 scala> graph.vertices.filter{case (id,(name,age))=>age>30}.collect res3: Array[(org.apache.spark.graphx.VertexId, (String, Int))] = Array((4,(zhaoliu,80)), (1,(zhangsan,32)), (3,(wangwu,36))) scala> graph.vertices.collect {case (id,(name,age))=>age>30}.collect res4: Array[Boolean] = Array(true, true, false, true, false, false) // 权重 scala> graph.triplets.collect res10: Array[org.apache.spark.graphx.EdgeTriplet[(String, Int),Int]] = Array(((2,(lisi,28)),(1,(zhangsan,32)),7), ((2,(lisi,28)),(4,(zhaoliu,80)),2), ((3,(wangwu,36)),(2,(lisi,28)),4), ((3,(wangwu,36)),(6,(yuba,12)),3), ((4,(zhaoliu,80)),(1,(zhangsan,32)),1), ((5,(songqi,24)),(2,(lisi,28)),2), ((5,(songqi,24)),(3,(wangwu,36)),8), ((5,(songqi,24)),(6,(yuba,12)),3)) // 假设打CALL5次,找出真爱 // srcAttr --> 源顶点属性值 // attr --> 边的属性值权重 // dstAttr --> 目标顶点属性值 scala> graph.triplets.map(x=>x.srcAttr).collect res11: Array[(String, Int)] = Array((lisi,28), (lisi,28), (wangwu,36), (wangwu,36), (zhaoliu,80), (songqi,24), (songqi,24), (songqi,24)) scala> graph.triplets.map(x=>x.dstAttr).collect res12: Array[(String, Int)] = Array((zhangsan,32), (zhaoliu,80), (lisi,28), (yuba,12), (zhangsan,32), (lisi,28), (wangwu,36), (yuba,12)) scala> graph.triplets.map(x=>x.attr).collect res13: Array[Int] = Array(7, 2, 4, 3, 1, 2, 8, 3) scala> graph.triplets.filter (x=>x.attr>=5).foreach(x=>println(x.srcAttr._1+" like "+x.dstAttr._1)) lisi like zhangsan songqi like wangwu
// 图 // vert.txt 1,南京,40 2,天津,12 3,北京,15 4,上海,38 5,杭州,23 // edge.txt 1,2,500 1,3,600 2,4,600 3,2,100 3,4,1000 3,5,800 import org.apache.spark.graphx._ // RDD 读文件 // 点 val vertsRDD = sc.textFile("file:///opt/soft/data/3/vert.txt").map(x=>{val e = x.split(",");(e(0).toLong,(e(1),e(2).toInt))}) vertsRDD.foreach(println) (1,(南京,40)) (2,(天津,12)) (3,(北京,15)) (4,(上海,38)) (5,(杭州,23)) // 边 val edgesRDD = sc.textFile("file:///opt/soft/data/3/edge.txt").map(x=>{val e=x.split(",");Edge(e(0).toLong,e(1).toLong,e(2).toInt)}) edgesRDD.foreach(println) Edge(1,2,500) Edge(1,3,600) Edge(2,4,600) Edge(3,2,100) Edge(3,4,1000) Edge(3,5,800) // 生成图 val graph = Graph(vertsRDD,edgesRDD) graph: org.apache.spark.graphx.Graph[(String, Int),Int] = org.apache.spark.graphx.impl.GraphImpl@356de111 // 每个点出度和入度 总度数 scala> graph.degrees.collect res2: Array[(org.apache.spark.graphx.VertexId, Int)] = Array((4,2), (1,2), (3,4), (5,1), (2,3)) // 边 scala> graph.numEdges res3: Long = 6 // 点 scala> graph.numVertices res4: Long = 5 // 入度 scala> graph.inDegrees.collect res5: Array[(org.apache.spark.graphx.VertexId, Int)] = Array((4,2), (3,1), (5,1), (2,2)) // 出度 scala> graph.outDegrees.collect res6: Array[(org.apache.spark.graphx.VertexId, Int)] = Array((1,2), (3,3), (2,1)) // 点边关系 scala> graph.triplets.collect res7: Array[org.apache.spark.graphx.EdgeTriplet[(String, Int),Int]] = Array(((1,(南京,40)),(2,(天津,12)),500), ((1,(南京,40)),(3,(北京,15)),600), ((2,(天津,12)),(4,(上海,38)),600), ((3,(北京,15)),(2,(天津,12)),100), ((3,(北京,15)),(4,(上海,38)),1000), ((3,(北京,15)),(5,(杭州,23)),800)) // 边 edges srcId:出发点id dstId:目标点id attr:权重 scala> graph.edges.map(x=>(x.srcId,x.dstId,x.attr+50)).collect res9: Array[(org.apache.spark.graphx.VertexId, org.apache.spark.graphx.VertexId, Int)] = Array((1,2,550), (1,3,650), (2,4,650), (3,2,150), (3,4,1050), (3,5,850)) // 点 vertices scala> graph.vertices.map(x=>(x._1,(x._2._1,x._2._2+3))).collect res10: Array[(org.apache.spark.graphx.VertexId, (String, Int))] = Array((4,(上海,41)), (1,(南京,43)), (3,(北京,18)), (5,(杭州,26)), (2,(天津,15))) // 图 triplets scala> graph.triplets.map(x=>((x.srcId,(x.srcAttr._1,x.srcAttr._2+3)),(x.dstId,(x.dstAttr._1,x.dstAttr._2+3)),x.attr+50)).collect res15: Array[((org.apache.spark.graphx.VertexId, (String, Int)), (org.apache.spark.graphx.VertexId, (String, Int)), Int)] = Array(((1,(南京,43)),(2,(天津,15)),550), ((1,(南京,43)),(3,(北京,18)),650), ((2,(天津,15)),(4,(上海,41)),650), ((3,(北京,18)),(2,(天津,15)),150), ((3,(北京,18)),(4,(上海,41)),1050), ((3,(北京,18)),(5,(杭州,26)),850))