【Spark】(九)Spark GraphX 图计算解析1

本文涉及的产品
云原生大数据计算服务 MaxCompute,5000CU*H 100GB 3个月
云原生大数据计算服务MaxCompute,500CU*H 100GB 3个月
全局流量管理 GTM,标准版 1个月
简介: 【Spark】(九)Spark GraphX 图计算解析1

文章目录


前言:为什么需要图计算

一、Spark GraphX 概述

二、图的术语

三、图的经典表示法

四、GraphX 核心抽象

五、GraphX API

六、属性图应用示例


七、图的算子

1、属性算子

2、结构算子

3、Join 算子


八、GraphX API 的应用


前言:为什么需要图计算


  • 许多大数据以大规模图或网络的形式呈现


  • 许多非图结构的大数据,常会被转换为图模型进行分析


  • 图数据结构很好地表达了数据之间的关联性


一、Spark GraphX 概述


  • 图是由顶点集合(vertex)及顶点间的关系集合(边edge)组成的一种网状数据结构。


Spark GraphX是一个分布式图处理框架,它是基于Spark平台提供对图计算和图挖掘简洁易用的而丰富的接口,极大的方便了对分布式图处理的需求。


那么什么是图,都计算些什么?众所周知社交网络中人与人之间有很多关系链,例如Twitter、Facebook、微博和微信等,数据中出现网状结构关系都需要图计算。


GraphX是一个新的Spark API,它用于图和分布式图(graph-parallel)的计算。GraphX通过引入弹性分布式属性图(Resilient Distributed Property Graph): 顶点和边均有属性的有向多重图,来扩展Spark RDD。为了支持图计算,GraphX开发了一组基本的功能操作以及一个优化过的Pregel API。另外,GraphX也包含了一个快速增长的图算法和图builders的集合,用以简化图分析任务。


从社交网络到语言建模,不断增长的数据规模以及图形数据的重要性已经推动了许多新的分布式图系统的发展。 通过限制计算类型以及引入新的技术来切分和分配图,这些系统可以高效地执行复杂的图形算法,比一般的分布式数据计算(data-parallel,如spark、MapReduce)快很多。


image.png


分布式图(graph-parallel)计算和分布式数据(data-parallel)计算类似,分布式数据计算采用了一种record-centric(以记录为中心)的集合视图,而分布式图计算采用了一种vertex-centric(以顶点为中心)的图视图。 分布式数据计算通过同时处理独立的数据来获得并发的目的,分布式图计算则是通过对图数据进行分区(即切分)来获得并发的目的。更准确的说,分布式图计算递归地定义特征的转换函数(这种转换函数作用于邻居特征),通过并发地执行这些转换函数来获得并发的目的。


分布式图计算比分布式数据计算更适合图的处理,但是在典型的图处理流水线中,它并不能很好地处理所有操作。例如,虽然分布式图系统可以很好的计算PageRank等算法,但是它们不适合从不同的数据源构建图或者跨过多个图计算特征。 更准确的说,分布式图系统提供的更窄的计算视图无法处理那些构建和转换图结构以及跨越多个图的需求。分布式图系统中无法提供的这些操作需要数据在图本体之上移动并且需要一个图层面而不是单独的顶点或边层面的计算视图。例如,我们可能想限制我们的分析到几个子图上,然后比较结果。 这不仅需要改变图结构,还需要跨多个图计算。


image.png


二、图的术语


  • 顶点(Vertex)
  • 边(Edge)
Graph={V,E}
集合V={v1,v2,v3}
集合E={(v1,v2),(v1,v3),(v2,v3)}


image.png


  • 有向图
  • 无向图

image.png

  • 有环图 包含一系列顶点连接的回路 (环路)
  • 无环图 DAG 即为有向

image.png



  • 度 : 一个顶点所有边的数量
  • 出度:指从当前顶点指向其他顶点的边的数量
  • 入度:其他顶点指向当前顶点的边的数量


image.png


三、图的经典表示法


邻接矩阵


image.png


四、GraphX 核心抽象


image.png



五、GraphX API


创建 Graph


image.png


六、属性图应用示例



image.png


import org.apache.spark.graphx._
val verticeArray=Array((1L,("zhangsan",32)),(2L,("lisi",28)),(3L,("wangwu",36)),(4L,("zhaoliu",80)),(5L,("songqi",24)),(6L,("yuba",12)))
val edgeArray=Array(Edge(4L,1L,1),Edge(2L,1L,7),Edge(2L,4L,2),Edge(3L,2L,4),Edge(5L,2L,2),Edge(5L,3L,8),Edge(5L,6L,3),Edge(3L,6L,3))
val vertsRdd=sc.parallelize(verticeArray)
val edgeRdd=sc.parallelize(edgeArray)
val graph = Graph(vertsRdd,edgeRdd)
scala> graph.vertices.filter(p=>p._2._2>30).collect
res2: Array[(org.apache.spark.graphx.VertexId, (String, Int))] = Array((4,(zhaoliu,80)), (1,(zhangsan,32)), (3,(wangwu,36)))
// 偏函数
scala> graph.vertices.filter{case (id,(name,age))=>age>30}.collect
res3: Array[(org.apache.spark.graphx.VertexId, (String, Int))] = Array((4,(zhaoliu,80)), (1,(zhangsan,32)), (3,(wangwu,36)))
scala> graph.vertices.collect {case (id,(name,age))=>age>30}.collect
res4: Array[Boolean] = Array(true, true, false, true, false, false)
// 权重
scala> graph.triplets.collect
res10: Array[org.apache.spark.graphx.EdgeTriplet[(String, Int),Int]] = Array(((2,(lisi,28)),(1,(zhangsan,32)),7), ((2,(lisi,28)),(4,(zhaoliu,80)),2), ((3,(wangwu,36)),(2,(lisi,28)),4), ((3,(wangwu,36)),(6,(yuba,12)),3), ((4,(zhaoliu,80)),(1,(zhangsan,32)),1), ((5,(songqi,24)),(2,(lisi,28)),2), ((5,(songqi,24)),(3,(wangwu,36)),8), ((5,(songqi,24)),(6,(yuba,12)),3))
// 假设打CALL5次,找出真爱
// srcAttr --> 源顶点属性值
// attr --> 边的属性值权重
// dstAttr --> 目标顶点属性值
scala> graph.triplets.map(x=>x.srcAttr).collect
res11: Array[(String, Int)] = Array((lisi,28), (lisi,28), (wangwu,36), (wangwu,36), (zhaoliu,80), (songqi,24), (songqi,24), (songqi,24))
scala> graph.triplets.map(x=>x.dstAttr).collect
res12: Array[(String, Int)] = Array((zhangsan,32), (zhaoliu,80), (lisi,28), (yuba,12), (zhangsan,32), (lisi,28), (wangwu,36), (yuba,12))
scala> graph.triplets.map(x=>x.attr).collect
res13: Array[Int] = Array(7, 2, 4, 3, 1, 2, 8, 3)
scala> graph.triplets.filter (x=>x.attr>=5).foreach(x=>println(x.srcAttr._1+" like "+x.dstAttr._1))
lisi like zhangsan
songqi like wangwu


image.png


// 图
// vert.txt
1,南京,40
2,天津,12
3,北京,15
4,上海,38
5,杭州,23
// edge.txt
1,2,500
1,3,600
2,4,600
3,2,100
3,4,1000
3,5,800
import org.apache.spark.graphx._
// RDD 读文件 
// 点
val vertsRDD = sc.textFile("file:///opt/soft/data/3/vert.txt").map(x=>{val e = x.split(",");(e(0).toLong,(e(1),e(2).toInt))})
vertsRDD.foreach(println)
(1,(南京,40))
(2,(天津,12))
(3,(北京,15))
(4,(上海,38))
(5,(杭州,23))
// 边
val edgesRDD = sc.textFile("file:///opt/soft/data/3/edge.txt").map(x=>{val e=x.split(",");Edge(e(0).toLong,e(1).toLong,e(2).toInt)})
edgesRDD.foreach(println)
Edge(1,2,500)
Edge(1,3,600)
Edge(2,4,600)
Edge(3,2,100)
Edge(3,4,1000)
Edge(3,5,800)
// 生成图 
val graph = Graph(vertsRDD,edgesRDD)
graph: org.apache.spark.graphx.Graph[(String, Int),Int] = org.apache.spark.graphx.impl.GraphImpl@356de111
// 每个点出度和入度 总度数
scala> graph.degrees.collect
res2: Array[(org.apache.spark.graphx.VertexId, Int)] = Array((4,2), (1,2), (3,4), (5,1), (2,3))
// 边
scala> graph.numEdges
res3: Long = 6
// 点
scala> graph.numVertices
res4: Long = 5
// 入度
scala> graph.inDegrees.collect
res5: Array[(org.apache.spark.graphx.VertexId, Int)] = Array((4,2), (3,1), (5,1), (2,2))
// 出度
scala> graph.outDegrees.collect
res6: Array[(org.apache.spark.graphx.VertexId, Int)] = Array((1,2), (3,3), (2,1))
// 点边关系
scala> graph.triplets.collect
res7: Array[org.apache.spark.graphx.EdgeTriplet[(String, Int),Int]] = Array(((1,(南京,40)),(2,(天津,12)),500), ((1,(南京,40)),(3,(北京,15)),600), ((2,(天津,12)),(4,(上海,38)),600), ((3,(北京,15)),(2,(天津,12)),100), ((3,(北京,15)),(4,(上海,38)),1000), ((3,(北京,15)),(5,(杭州,23)),800))
// 边 edges        srcId:出发点id    dstId:目标点id   attr:权重
scala> graph.edges.map(x=>(x.srcId,x.dstId,x.attr+50)).collect
res9: Array[(org.apache.spark.graphx.VertexId, org.apache.spark.graphx.VertexId, Int)] = Array((1,2,550), (1,3,650), (2,4,650), (3,2,150), (3,4,1050), (3,5,850))
// 点 vertices
scala> graph.vertices.map(x=>(x._1,(x._2._1,x._2._2+3))).collect
res10: Array[(org.apache.spark.graphx.VertexId, (String, Int))] = Array((4,(上海,41)), (1,(南京,43)), (3,(北京,18)), (5,(杭州,26)), (2,(天津,15)))
// 图 triplets
scala> graph.triplets.map(x=>((x.srcId,(x.srcAttr._1,x.srcAttr._2+3)),(x.dstId,(x.dstAttr._1,x.dstAttr._2+3)),x.attr+50)).collect
res15: Array[((org.apache.spark.graphx.VertexId, (String, Int)), (org.apache.spark.graphx.VertexId, (String, Int)), Int)] = Array(((1,(南京,43)),(2,(天津,15)),550), ((1,(南京,43)),(3,(北京,18)),650), ((2,(天津,15)),(4,(上海,41)),650), ((3,(北京,18)),(2,(天津,15)),150), ((3,(北京,18)),(4,(上海,41)),1050), ((3,(北京,18)),(5,(杭州,26)),850))


目录
相关文章
|
2月前
|
分布式计算 大数据 Java
大数据-87 Spark 集群 案例学习 Spark Scala 案例 手写计算圆周率、计算共同好友
大数据-87 Spark 集群 案例学习 Spark Scala 案例 手写计算圆周率、计算共同好友
75 5
|
2月前
|
分布式计算 关系型数据库 MySQL
大数据-88 Spark 集群 案例学习 Spark Scala 案例 SuperWordCount 计算结果数据写入MySQL
大数据-88 Spark 集群 案例学习 Spark Scala 案例 SuperWordCount 计算结果数据写入MySQL
56 3
|
2月前
|
存储 分布式计算 算法
大数据-106 Spark Graph X 计算学习 案例:1图的基本计算、2连通图算法、3寻找相同的用户
大数据-106 Spark Graph X 计算学习 案例:1图的基本计算、2连通图算法、3寻找相同的用户
73 0
|
1月前
|
存储 分布式计算 Java
存算分离与计算向数据移动:深度解析与Java实现
【11月更文挑战第10天】随着大数据时代的到来,数据量的激增给传统的数据处理架构带来了巨大的挑战。传统的“存算一体”架构,即计算资源与存储资源紧密耦合,在处理海量数据时逐渐显露出其局限性。为了应对这些挑战,存算分离(Disaggregated Storage and Compute Architecture)和计算向数据移动(Compute Moves to Data)两种架构应运而生,成为大数据处理领域的热门技术。
67 2
|
2月前
|
存储 固态存储 安全
阿里云服务器X86计算架构解析与X86计算架构云服务器收费价格参考
阿里云服务器架构分为X86计算、Arm计算、高性能计算等多种架构,其中X86计算是用户选择最多的一种架构,本文将深入探讨阿里云X86计算架构的云服务器,包括其技术特性、适用场景、性能优势以及最新价格情况。
|
2月前
|
编解码 弹性计算 应用服务中间件
阿里云服务器Arm计算架构解析:Arm计算架构云服务器租用收费标准价格参考
阿里云服务器架构分为X86计算、Arm计算、高性能计算等多种架构,其中Arm计算架构以其低功耗、高效率的特点受到广泛关注。本文将深入解析阿里云Arm计算架构云服务器的技术特点、适用场景以及包年包月与按量付费的收费标准与最新活动价格情况,以供选择参考。
|
2月前
|
分布式计算 算法 Spark
spark学习之 GraphX—预测社交圈子
spark学习之 GraphX—预测社交圈子
58 0
|
2月前
|
存储 分布式计算 算法
大数据-105 Spark GraphX 基本概述 与 架构基础 概念详解 核心数据结构
大数据-105 Spark GraphX 基本概述 与 架构基础 概念详解 核心数据结构
61 0
|
2月前
|
SQL 分布式计算 大数据
大数据-97 Spark 集群 SparkSQL 原理详细解析 Broadcast Shuffle SQL解析过程(一)
大数据-97 Spark 集群 SparkSQL 原理详细解析 Broadcast Shuffle SQL解析过程(一)
77 0
|
2月前
|
SQL 分布式计算 算法
大数据-97 Spark 集群 SparkSQL 原理详细解析 Broadcast Shuffle SQL解析过程(二)
大数据-97 Spark 集群 SparkSQL 原理详细解析 Broadcast Shuffle SQL解析过程(二)
95 0

推荐镜像

更多