Pregel模型原理

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
检索分析服务 Elasticsearch 版,2核4GB开发者规格 1个月
智能开放搜索 OpenSearch行业算法版,1GB 20LCU 1个月
简介: Pregel模型原理

一.图简介

图数据结构可以很好的展示关联关系,关联性计算是大数据计算核心,Pregel是一种基于BSP模型实现的并行图处理系统。主要用于图遍历(BFS)、最短路径(SSSP)、PageRank计算等等计算。

二.模型

2.1 简介

Pregel计算模型,输入是一个有向图,该有向图每个顶点(Vertex)是唯一,每个顶点都有属性,这些属性可以被修改,其初始值由用户定义,边也有属性,与顶点关联。

消息(Message),消息是Pregel计算模型的核心。每个 Vertex 在初始状态以及之后的每一个计算步骤当中都被 Attach 一个 Message 值作为 Vertex 当前的状态,算法的迭代通过 Vertex 之间互相发送的消息来完成。

超迭代(Superstep)。一个 Superstep 是 Pregel 在执行算法过程当中进行的一次迭代。 一次 Pregel 计算过程可能包括多个 Superstep。

2.2 Vertex

Pregel图计算过程与MapReduce非常接近:在迭代每一个步骤,将会以图的节点为中心进行Map操作,这意味Map函数当中我们只有以某一节点为中心的局部信息,包括:

  • 一个 Vertex 和它当前 Attach 的 Message
  • 当前 Vertex 向外指向的 Edge 和 Edge 上的属性
  • 当前 Vertex 在上一步计算当中所接收到的全部 Message

Vertex状态变化

当一个Vertex在上上一步当中没有接收到消息,或者算法自己决定不在向外发送消息,它可以被转变为Inactive。在 Pregel 的术语当中,这被称为 Vote to halt。

当一个在之前已经 Inactive 的 Vertex 又接受到一条新的消息,它会在新的计算中转变为 Active 的状态。

在大多数算法当中,所有的 Vertex 都进入 Inactive 状态就意味着算法结束。

2.3 优化

Message网络传输

在有些情况下,我们可以在 Message 发送前先对他们进行一步聚合。比方说, 在算法中我们只关心接收到消息的最大值,那么与其把所有消息都发送到目的地再计算, 不如先将最大值求出,这样可以极大地减少需要发送的消息数量。 Pregel 允许用户自定义 Combiner 来实现这一目的。

三.系统设计

3.1 图分区

图分区的方法十分容易理解,前文提到 Pregel 是以 Vertex 为中心的计算模型, 因此在分区的时候也是以 Vertex 为中心。 当一个节点被划分到一个区,与之相连的局部信息(边、边属性、消息)也都会被分配到这个区上。 由于对图进行分区的函数是全局一致的,各个计算节点对消息的转发并不需要通过某一中心服务进行协调。

默认的分区方法就是对 VertexId 的 Hash 值进行取模操作。用户也可以自定义分区函数以增强分区的局部性(Locality), 减少计算节点之间的网络流量。

3.2 主从模型

Pregel 在分布式系统当中的任务调度是简单的主从模型。每个计算任务有一个 Master 进程协调所有计算,在每个 Superstep 当中,Master 会决定图分区、发送 RPC 调用到 Worker 节点激发任务以及监控任务完成。所有图分区都在 Worker 上,Master 不管理任何图分区。 不过,Pregel 的 Aggregator 运行在 Master 上。因此 Worker 需要将 Aggregator 所需信息发送到 Master 上进行聚合。

3.3 Message 缓冲

Message 缓冲是在计算节点(Worker)的层面上提高吞吐量的一个优化。 Message 在 Worker 之间传递时并不是来一个发一个,而是通过缓冲积攒一些 Message,之后以 Batch 的形式批量发送。 这一优化可以减少网络请求的 Overhead。

3.4 Checkpoint 和 Confined Recovery

Checkpoint 在 Superstep 执行前进行,用来保存当前系统的状态。当某一图分区计算失败但 Worker 仍然可用时, 可以从 Checkpoint 执行快速恢复

当某一 Worker 整体失败当机使得它所记录的全部状态丢失时,新启动的 Worker 可能要重新接收上一步发送出来的消息。 为了避免无限制的递归重新计算之前的步骤,Pregel 将 Worker 将要发送的每一条消息写入 Write Ahead Log。 这种方法被称为 Confined Recovery

四.源码

def apply[VD: ClassTag, ED: ClassTag, A: ClassTag]
(graph: Graph[VD, ED],
initialMsg: A,
maxIterations: Int = Int.MaxValue,
activeDirection: EdgeDirection = EdgeDirection.Either)
(vprog: (VertexId, VD, A) => VD,
sendMsg: EdgeTriplet[VD, ED] => Iterator[(VertexId, A)],
mergeMsg: (A, A) => A)
: Graph[VD, ED] =
{
//要求最大迭代数大于0,不然报错。
require(maxIterations > 0, s"Maximum number of iterations must be greater than 0," +
s" but got ${maxIterations}")
//第一次迭代,对每个节点用vprog函数计算。
var g = graph.mapVertices((vid, vdata) => vprog(vid, vdata, initialMsg)).cache()
// 根据发送、聚合信息的函数计算下次迭代用的信息。
var messages = GraphXUtils.mapReduceTriplets(g, sendMsg, mergeMsg)
//数一下还有多少节点活跃
var activeMessages = messages.count()
// 下面进入循环迭代
var prevG: Graph[VD, ED] = null
var i = 0
while (activeMessages > 0 && i < maxIterations) {
// 接受消息并更新节点信息
prevG = g
g = g.joinVertices(messages)(vprog).cache()

val oldMessages = messages
// Send new messages, skipping edges where neither side received a message. We must cache
// messages so it can be materialized on the next line, allowing us to uncache the previous
/*iteration这里用mapReduceTriplets实现消息的发送和聚合。mapReduceTriplets的*参数中有一个map方法和一个reduce方法,这里的*sendMsg就是map方法,*mergeMsg就是reduce方法
*/
messages = GraphXUtils.mapReduceTriplets(
g, sendMsg, mergeMsg, Some((oldMessages, activeDirection))).cache()
// The call to count() materializes `messages` and the vertices of `g`. This hides oldMessages
// (depended on by the vertices of g) and the vertices of prevG (depended on by oldMessages
// and the vertices of g).
activeMessages = messages.count()

logInfo("Pregel finished iteration " + i)

// Unpersist the RDDs hidden by newly-materialized RDDs
oldMessages.unpersist(blocking = false)
prevG.unpersistVertices(blocking = false)
prevG.edges.unpersist(blocking = false)
// count the iteration
i += 1
}
messages.unpersist(blocking = false)
g
} // end of apply
目录
相关文章
|
6月前
|
机器学习/深度学习 Java 网络架构
YOLOv5改进 | TripletAttention三重注意力机制(附代码+机制原理+添加教程)
YOLOv5改进 | TripletAttention三重注意力机制(附代码+机制原理+添加教程)
337 0
|
2月前
|
机器学习/深度学习 PyTorch 算法框架/工具
CNN中的注意力机制综合指南:从理论到Pytorch代码实现
注意力机制已成为深度学习模型的关键组件,尤其在卷积神经网络(CNN)中发挥了重要作用。通过使模型关注输入数据中最相关的部分,注意力机制显著提升了CNN在图像分类、目标检测和语义分割等任务中的表现。本文将详细介绍CNN中的注意力机制,包括其基本概念、不同类型(如通道注意力、空间注意力和混合注意力)以及实际实现方法。此外,还将探讨注意力机制在多个计算机视觉任务中的应用效果及其面临的挑战。无论是图像分类还是医学图像分析,注意力机制都能显著提升模型性能,并在不断发展的深度学习领域中扮演重要角色。
82 10
|
4月前
|
机器学习/深度学习 算法 Serverless
现代深度学习框架构建问题之链式法则在反向传播中的作用如何解决
现代深度学习框架构建问题之链式法则在反向传播中的作用如何解决
53 3
|
4月前
|
机器学习/深度学习 自动驾驶
大模型概念问题之谷歌的MUM模型是什么
大模型概念问题之谷歌的MUM模型是什么
|
6月前
|
前端开发
iStack详解(一)——iStack基本原理
iStack详解(一)——iStack基本原理
304 4
|
6月前
|
机器学习/深度学习 存储 人工智能
一文搞懂 Transformer 工作原理 !!
一文搞懂 Transformer 工作原理 !!
187 0
|
6月前
|
运维 监控 NoSQL
RedisShake的基本原理
RedisShake的基本原理
402 0
|
6月前
|
机器学习/深度学习 Java 网络架构
YOLOv8改进 | TripletAttention三重注意力机制(附代码+机制原理+添加教程)
YOLOv8改进 | TripletAttention三重注意力机制(附代码+机制原理+添加教程)
894 0
|
机器学习/深度学习 人工智能 算法
一文搞懂模型量化算法基础
一文搞懂模型量化算法基础
3976 0
|
机器学习/深度学习 算法 数据可视化
详细解读GraphFPN | 如何用图模型提升目标检测模型性能?
详细解读GraphFPN | 如何用图模型提升目标检测模型性能?
187 0