Pregel模型原理

本文涉及的产品
实时数仓Hologres,5000CU*H 100GB 3个月
智能开放搜索 OpenSearch行业算法版,1GB 20LCU 1个月
大数据开发治理平台 DataWorks,不限时长
简介: Pregel模型原理

一.图简介

图数据结构可以很好的展示关联关系,关联性计算是大数据计算核心,Pregel是一种基于BSP模型实现的并行图处理系统。主要用于图遍历(BFS)、最短路径(SSSP)、PageRank计算等等计算。

二.模型

2.1 简介

Pregel计算模型,输入是一个有向图,该有向图每个顶点(Vertex)是唯一,每个顶点都有属性,这些属性可以被修改,其初始值由用户定义,边也有属性,与顶点关联。

消息(Message),消息是Pregel计算模型的核心。每个 Vertex 在初始状态以及之后的每一个计算步骤当中都被 Attach 一个 Message 值作为 Vertex 当前的状态,算法的迭代通过 Vertex 之间互相发送的消息来完成。

超迭代(Superstep)。一个 Superstep 是 Pregel 在执行算法过程当中进行的一次迭代。 一次 Pregel 计算过程可能包括多个 Superstep。

2.2 Vertex

Pregel图计算过程与MapReduce非常接近:在迭代每一个步骤,将会以图的节点为中心进行Map操作,这意味Map函数当中我们只有以某一节点为中心的局部信息,包括:

  • 一个 Vertex 和它当前 Attach 的 Message
  • 当前 Vertex 向外指向的 Edge 和 Edge 上的属性
  • 当前 Vertex 在上一步计算当中所接收到的全部 Message

Vertex状态变化

当一个Vertex在上上一步当中没有接收到消息,或者算法自己决定不在向外发送消息,它可以被转变为Inactive。在 Pregel 的术语当中,这被称为 Vote to halt。

当一个在之前已经 Inactive 的 Vertex 又接受到一条新的消息,它会在新的计算中转变为 Active 的状态。

在大多数算法当中,所有的 Vertex 都进入 Inactive 状态就意味着算法结束。

2.3 优化

Message网络传输

在有些情况下,我们可以在 Message 发送前先对他们进行一步聚合。比方说, 在算法中我们只关心接收到消息的最大值,那么与其把所有消息都发送到目的地再计算, 不如先将最大值求出,这样可以极大地减少需要发送的消息数量。 Pregel 允许用户自定义 Combiner 来实现这一目的。

三.系统设计

3.1 图分区

图分区的方法十分容易理解,前文提到 Pregel 是以 Vertex 为中心的计算模型, 因此在分区的时候也是以 Vertex 为中心。 当一个节点被划分到一个区,与之相连的局部信息(边、边属性、消息)也都会被分配到这个区上。 由于对图进行分区的函数是全局一致的,各个计算节点对消息的转发并不需要通过某一中心服务进行协调。

默认的分区方法就是对 VertexId 的 Hash 值进行取模操作。用户也可以自定义分区函数以增强分区的局部性(Locality), 减少计算节点之间的网络流量。

3.2 主从模型

Pregel 在分布式系统当中的任务调度是简单的主从模型。每个计算任务有一个 Master 进程协调所有计算,在每个 Superstep 当中,Master 会决定图分区、发送 RPC 调用到 Worker 节点激发任务以及监控任务完成。所有图分区都在 Worker 上,Master 不管理任何图分区。 不过,Pregel 的 Aggregator 运行在 Master 上。因此 Worker 需要将 Aggregator 所需信息发送到 Master 上进行聚合。

3.3 Message 缓冲

Message 缓冲是在计算节点(Worker)的层面上提高吞吐量的一个优化。 Message 在 Worker 之间传递时并不是来一个发一个,而是通过缓冲积攒一些 Message,之后以 Batch 的形式批量发送。 这一优化可以减少网络请求的 Overhead。

3.4 Checkpoint 和 Confined Recovery

Checkpoint 在 Superstep 执行前进行,用来保存当前系统的状态。当某一图分区计算失败但 Worker 仍然可用时, 可以从 Checkpoint 执行快速恢复

当某一 Worker 整体失败当机使得它所记录的全部状态丢失时,新启动的 Worker 可能要重新接收上一步发送出来的消息。 为了避免无限制的递归重新计算之前的步骤,Pregel 将 Worker 将要发送的每一条消息写入 Write Ahead Log。 这种方法被称为 Confined Recovery

四.源码

def apply[VD: ClassTag, ED: ClassTag, A: ClassTag]
(graph: Graph[VD, ED],
initialMsg: A,
maxIterations: Int = Int.MaxValue,
activeDirection: EdgeDirection = EdgeDirection.Either)
(vprog: (VertexId, VD, A) => VD,
sendMsg: EdgeTriplet[VD, ED] => Iterator[(VertexId, A)],
mergeMsg: (A, A) => A)
: Graph[VD, ED] =
{
//要求最大迭代数大于0,不然报错。
require(maxIterations > 0, s"Maximum number of iterations must be greater than 0," +
s" but got ${maxIterations}")
//第一次迭代,对每个节点用vprog函数计算。
var g = graph.mapVertices((vid, vdata) => vprog(vid, vdata, initialMsg)).cache()
// 根据发送、聚合信息的函数计算下次迭代用的信息。
var messages = GraphXUtils.mapReduceTriplets(g, sendMsg, mergeMsg)
//数一下还有多少节点活跃
var activeMessages = messages.count()
// 下面进入循环迭代
var prevG: Graph[VD, ED] = null
var i = 0
while (activeMessages > 0 && i < maxIterations) {
// 接受消息并更新节点信息
prevG = g
g = g.joinVertices(messages)(vprog).cache()

val oldMessages = messages
// Send new messages, skipping edges where neither side received a message. We must cache
// messages so it can be materialized on the next line, allowing us to uncache the previous
/*iteration这里用mapReduceTriplets实现消息的发送和聚合。mapReduceTriplets的*参数中有一个map方法和一个reduce方法,这里的*sendMsg就是map方法,*mergeMsg就是reduce方法
*/
messages = GraphXUtils.mapReduceTriplets(
g, sendMsg, mergeMsg, Some((oldMessages, activeDirection))).cache()
// The call to count() materializes `messages` and the vertices of `g`. This hides oldMessages
// (depended on by the vertices of g) and the vertices of prevG (depended on by oldMessages
// and the vertices of g).
activeMessages = messages.count()

logInfo("Pregel finished iteration " + i)

// Unpersist the RDDs hidden by newly-materialized RDDs
oldMessages.unpersist(blocking = false)
prevG.unpersistVertices(blocking = false)
prevG.edges.unpersist(blocking = false)
// count the iteration
i += 1
}
messages.unpersist(blocking = false)
g
} // end of apply
目录
相关文章
|
2月前
|
前端开发
iStack详解(一)——iStack基本原理
iStack详解(一)——iStack基本原理
83 4
|
2月前
|
机器学习/深度学习 人工智能 自然语言处理
什么是大模型?了解大模型的基本概念
大模型是具有大规模参数和复杂计算结构的机器学习模型,常由深度神经网络构建,用于处理如自然语言处理、计算机视觉等复杂任务。与小模型相比,大模型有更强的表达能力和准确性,但需要更多计算资源。大模型的发展经历了从传统神经网络到Transformer架构的演变,如GPT和BERT等,近年来以ChatGPT为代表的大模型在交互式对话和内容生成方面取得了重大突破。大模型的特点包括大规模、涌现能力、高性能和泛化能力,以及在多任务学习、自动化和跨领域应用中的优势。根据输入数据类型和应用领域,大模型可分为语言、视觉和多模态等类别,并通过微调适应特定任务。
|
2月前
|
运维 监控 NoSQL
RedisShake的基本原理
RedisShake的基本原理
180 0
|
12月前
|
机器学习/深度学习
总结机器学习中7种离散特征编码方式优缺点
整理总结对比了7种机器学习离散特征编码方式的优缺点
175 0
|
机器学习/深度学习 人工智能 算法
一文搞懂模型量化算法基础
一文搞懂模型量化算法基础
3295 0
|
机器学习/深度学习 算法 数据可视化
详细解读GraphFPN | 如何用图模型提升目标检测模型性能?
详细解读GraphFPN | 如何用图模型提升目标检测模型性能?
157 0
|
机器学习/深度学习 数据可视化 PyTorch
【Pytorch神经网络实战案例】14 构建条件变分自编码神经网络模型生成可控Fashon-MNST模拟数据
变分自编码神经网络在变分自编码神经网络基础之上,增加指导性条件。在编码阶段的输入端添加了与标签对应的特征,在解码阶段同样添加标签特征。这样,最终得到的模型将会把输入的标签特征当成原始数据的一部分,实现通过标签来生成可控模拟数据的效果。
102 0
|
机器学习/深度学习 算法 开发者
模型原理-1| 学习笔记
快速学习模型原理-1。
120 0
模型原理-1| 学习笔记
|
机器学习/深度学习 算法 开发者
模型原理-2| 学习笔记
快速学习模型原理-2。
91 0
模型原理-2| 学习笔记
|
缓存 Kubernetes 负载均衡
K8S原理剖析:网络模型原理剖析和实践
K8S原理剖析:网络模型原理剖析和实践
233 0
K8S原理剖析:网络模型原理剖析和实践