【Spark】(九)Spark GraphX 图计算解析2

本文涉及的产品
公共DNS(含HTTPDNS解析),每月1000万次HTTP解析
全局流量管理 GTM,标准版 1个月
云解析 DNS,旗舰版 1个月
简介: 【Spark】(九)Spark GraphX 图计算解析2


七、图的算子



image.png


1、属性算子


类似于RDD的 map 操作

scala> graph.mapEdges(e=>(e.srcId,e.dstId,e.attr+50)).edges.collect
res16: Array[org.apache.spark.graphx.Edge[(org.apache.spark.graphx.VertexId, org.apache.spark.graphx.VertexId, Int)]] = Array(Edge(1,2,(1,2,550)), Edge(1,3,(1,3,650)), Edge(2,4,(2,4,650)), Edge(3,2,(3,2,150)), Edge(3,4,(3,4,1050)), Edge(3,5,(3,5,850)))
// 解构语法
scala> graph.mapVertices{case (vid,(city,template))=>(vid,(city,template+3))}.vertices.collect
res17: Array[(org.apache.spark.graphx.VertexId, (org.apache.spark.graphx.VertexId, (String, Int)))] = Array((4,(4,(上海,41))), (1,(1,(南京,43))), (3,(3,(北京,18))), (5,(5,(杭州,26))), (2,(2,(天津,15))))


2、结构算子


image.png


// subgraph 在大的图里截取小图  两个变量,存的都是函数
// vpred    高阶函数写法  
scala> graph.subgraph(vpred=(id,attr)=>attr._2>35).vertices.collect
res2: Array[(org.apache.spark.graphx.VertexId, (String, Int))] = Array((4,(上海,38)), (1,(南京,40)))
// epred     
scala> graph.subgraph(epred=x=>x.attr<500).triplets.collect
res15: Array[org.apache.spark.graphx.EdgeTriplet[(String, Int),Int]] = Array(((3,(北京,15)),(2,(天津,12)),100))
// 结合理解
scala> def xxx(a:Long,b:Tuple2[String,Integer]):Boolean={
     | b._2<500
     | }
xxx: (a: Long, b: (String, Integer))Boolean
scala> println(xxx(10L,("abc",300)))
true
// abc函数 当作变量传进 subgraph 函数
scala> def abc(id:VertexId,attr:Tuple2[String,Int]):Boolean={
     | attr._2>35
     | }
scala> graph.subgraph(vpred=abc).vertices.collect
res19: Array[(org.apache.spark.graphx.VertexId, (String, Int))] = Array((4,(上海,38)), (1,(南京,40)))


3、Join 算子


从外部的RDDs加载数据,修改顶点属性


image.png


scala> graph.triplets.collect
res20: Array[org.apache.spark.graphx.EdgeTriplet[(String, Int),Int]] = Array(((1,(南京,40)),(2,(天津,12)),500), ((1,(南京,40)),(3,(北京,15)),600), ((2,(天津,12)),(4,(上海,38)),600), ((3,(北京,15)),(2,(天津,12)),100), ((3,(北京,15)),(4,(上海,38)),1000), ((3,(北京,15)),(5,(杭州,23)),800))
scala> val ngp = sc.parallelize(Array((1L,30),(2L,60)))
ngp: org.apache.spark.rdd.RDD[(Long, Int)] = ParallelCollectionRDD[52] at parallelize at <console>:27
scala> graph.joinVertices(ngp)((id,oldval,newval)=>(oldval._1,oldval._2+newval)) 
res22: org.apache.spark.graphx.Graph[(String, Int),Int] = org.apache.spark.graphx.impl.GraphImpl@4f134331
scala> graph.outerJoinVertices(ngp)((id,oldval,newval)=>(oldval._1,newval)).triplets.collect
res24: Array[org.apache.spark.graphx.EdgeTriplet[(String, Option[Int]),Int]] = Array(((1,(南京,Some(30))),(2,(天津,Some(60))),500), ((1,(南京,Some(30))),(3,(北京,None)),600), ((2,(天津,Some(60))),(4,(上海,None)),600), ((3,(北京,None)),(2,(天津,Some(60))),100), ((3,(北京,None)),(4,(上海,None)),1000), ((3,(北京,None)),(5,(杭州,None)),800))


八、GraphX API 的应用


  • 计算用户粉丝数量


image.png


scala> graph.mapVertices{case (id,(city,tmp))=>City(city,tmp)}.triplets.collect
res28: Array[org.apache.spark.graphx.EdgeTriplet[City,Int]] = Array(((1,City(南京,40)),(2,City(天津,12)),500), ((1,City(南京,40)),(3,City(北京,15)),600), ((2,City(天津,12)),(4,City(上海,38)),600), ((3,City(北京,15)),(2,City(天津,12)),100), ((3,City(北京,15)),(4,City(上海,38)),1000), ((3,City(北京,15)),(5,City(杭州,23)),800))
//outerJoinVertices()会join所有的点,没有匹配上的的返回None
scala> val cc = sc.parallelize(Array((1L,20),(2L,30)))
//getOrElse(0)匹配上的用,没有的返回None
scala> graph.outerJoinVertices(cc)((id,oldv,newv)=>(oldv._1,newv.getOrElse(0))).triplets.collect
res44: Array[org.apache.spark.graphx.EdgeTriplet[(String, Int),Int]] = Array(((1,(南京,20)),(2,(天津,30)),500), ((1,(南京,20)),(3,(北京,0)),600), ((2,(天津,30)),(4,(上海,0)),600), ((3,(北京,0)),(2,(天津,30)),100), ((3,(北京,0)),(4,(上海,0)),1000), ((3,(北京,0)),(5,(杭州,0)),800))
// join 只对相关联的代码进行操作
scala> graph.joinVertices(cc)((id,oldv,newv)=>(oldv._1,newv)).triplets.collect
res35: Array[org.apache.spark.graphx.EdgeTriplet[(String, Int),Int]] = Array(((1,(南京,20)),(2,(天津,30)),500), ((1,(南京,20)),(3,(北京,15)),600), ((2,(天津,30)),(4,(上海,38)),600), ((3,(北京,15)),(2,(天津,30)),100), ((3,(北京,15)),(4,(上海,38)),1000), ((3,(北京,15)),(5,(杭州,23)),800))
// 出入度统计
// 样例类
case class City(name:String,tmp:Int,inNum:Int,outNum:Int)
val find = graph.mapVertices{case (id,(city,tmp))=>City(city,tmp,0,0)}
find.joinVertices(find.inDegrees)((id,oldval,newval)=>City(oldval.name,oldval.tmp,newval,oldval.outNum)).joinVertices(find.outDegrees)((id,oldval,newval)=>City(oldval.name,oldval.tmp,oldval.inNum,newval)).vertices.collect
res43: Array[(org.apache.spark.graphx.VertexId, City)] = Array((4,City(上海,38,2,0)), (1,City(南京,40,0,2)), (3,City(北




目录
相关文章
|
3月前
|
分布式计算 大数据 Java
大数据-87 Spark 集群 案例学习 Spark Scala 案例 手写计算圆周率、计算共同好友
大数据-87 Spark 集群 案例学习 Spark Scala 案例 手写计算圆周率、计算共同好友
79 5
|
3月前
|
分布式计算 关系型数据库 MySQL
大数据-88 Spark 集群 案例学习 Spark Scala 案例 SuperWordCount 计算结果数据写入MySQL
大数据-88 Spark 集群 案例学习 Spark Scala 案例 SuperWordCount 计算结果数据写入MySQL
60 3
|
3月前
|
存储 分布式计算 算法
大数据-106 Spark Graph X 计算学习 案例:1图的基本计算、2连通图算法、3寻找相同的用户
大数据-106 Spark Graph X 计算学习 案例:1图的基本计算、2连通图算法、3寻找相同的用户
80 0
|
2月前
|
存储 分布式计算 Java
存算分离与计算向数据移动:深度解析与Java实现
【11月更文挑战第10天】随着大数据时代的到来,数据量的激增给传统的数据处理架构带来了巨大的挑战。传统的“存算一体”架构,即计算资源与存储资源紧密耦合,在处理海量数据时逐渐显露出其局限性。为了应对这些挑战,存算分离(Disaggregated Storage and Compute Architecture)和计算向数据移动(Compute Moves to Data)两种架构应运而生,成为大数据处理领域的热门技术。
74 2
|
3月前
|
存储 固态存储 安全
阿里云服务器X86计算架构解析与X86计算架构云服务器收费价格参考
阿里云服务器架构分为X86计算、Arm计算、高性能计算等多种架构,其中X86计算是用户选择最多的一种架构,本文将深入探讨阿里云X86计算架构的云服务器,包括其技术特性、适用场景、性能优势以及最新价格情况。
|
3月前
|
编解码 弹性计算 应用服务中间件
阿里云服务器Arm计算架构解析:Arm计算架构云服务器租用收费标准价格参考
阿里云服务器架构分为X86计算、Arm计算、高性能计算等多种架构,其中Arm计算架构以其低功耗、高效率的特点受到广泛关注。本文将深入解析阿里云Arm计算架构云服务器的技术特点、适用场景以及包年包月与按量付费的收费标准与最新活动价格情况,以供选择参考。
|
3月前
|
分布式计算 算法 Spark
spark学习之 GraphX—预测社交圈子
spark学习之 GraphX—预测社交圈子
96 0
|
3月前
|
存储 分布式计算 算法
大数据-105 Spark GraphX 基本概述 与 架构基础 概念详解 核心数据结构
大数据-105 Spark GraphX 基本概述 与 架构基础 概念详解 核心数据结构
66 0
|
3月前
|
SQL 分布式计算 大数据
大数据-97 Spark 集群 SparkSQL 原理详细解析 Broadcast Shuffle SQL解析过程(一)
大数据-97 Spark 集群 SparkSQL 原理详细解析 Broadcast Shuffle SQL解析过程(一)
85 0
|
3月前
|
SQL 分布式计算 算法
大数据-97 Spark 集群 SparkSQL 原理详细解析 Broadcast Shuffle SQL解析过程(二)
大数据-97 Spark 集群 SparkSQL 原理详细解析 Broadcast Shuffle SQL解析过程(二)
107 0

热门文章

最新文章

推荐镜像

更多