Pregel模型原理

本文涉及的产品
RDS DuckDB + QuickBI 企业套餐,8核32GB + QuickBI 专业版
简介: Pregel模型原理

一.图简介

图数据结构可以很好的展示关联关系,关联性计算是大数据计算核心,Pregel是一种基于BSP模型实现的并行图处理系统。主要用于图遍历(BFS)、最短路径(SSSP)、PageRank计算等等计算。

二.模型

2.1 简介

Pregel计算模型,输入是一个有向图,该有向图每个顶点(Vertex)是唯一,每个顶点都有属性,这些属性可以被修改,其初始值由用户定义,边也有属性,与顶点关联。

消息(Message),消息是Pregel计算模型的核心。每个 Vertex 在初始状态以及之后的每一个计算步骤当中都被 Attach 一个 Message 值作为 Vertex 当前的状态,算法的迭代通过 Vertex 之间互相发送的消息来完成。

超迭代(Superstep)。一个 Superstep 是 Pregel 在执行算法过程当中进行的一次迭代。 一次 Pregel 计算过程可能包括多个 Superstep。

2.2 Vertex

Pregel图计算过程与MapReduce非常接近:在迭代每一个步骤,将会以图的节点为中心进行Map操作,这意味Map函数当中我们只有以某一节点为中心的局部信息,包括:

  • 一个 Vertex 和它当前 Attach 的 Message
  • 当前 Vertex 向外指向的 Edge 和 Edge 上的属性
  • 当前 Vertex 在上一步计算当中所接收到的全部 Message

Vertex状态变化

当一个Vertex在上上一步当中没有接收到消息,或者算法自己决定不在向外发送消息,它可以被转变为Inactive。在 Pregel 的术语当中,这被称为 Vote to halt。

当一个在之前已经 Inactive 的 Vertex 又接受到一条新的消息,它会在新的计算中转变为 Active 的状态。

在大多数算法当中,所有的 Vertex 都进入 Inactive 状态就意味着算法结束。

2.3 优化

Message网络传输

在有些情况下,我们可以在 Message 发送前先对他们进行一步聚合。比方说, 在算法中我们只关心接收到消息的最大值,那么与其把所有消息都发送到目的地再计算, 不如先将最大值求出,这样可以极大地减少需要发送的消息数量。 Pregel 允许用户自定义 Combiner 来实现这一目的。

三.系统设计

3.1 图分区

图分区的方法十分容易理解,前文提到 Pregel 是以 Vertex 为中心的计算模型, 因此在分区的时候也是以 Vertex 为中心。 当一个节点被划分到一个区,与之相连的局部信息(边、边属性、消息)也都会被分配到这个区上。 由于对图进行分区的函数是全局一致的,各个计算节点对消息的转发并不需要通过某一中心服务进行协调。

默认的分区方法就是对 VertexId 的 Hash 值进行取模操作。用户也可以自定义分区函数以增强分区的局部性(Locality), 减少计算节点之间的网络流量。

3.2 主从模型

Pregel 在分布式系统当中的任务调度是简单的主从模型。每个计算任务有一个 Master 进程协调所有计算,在每个 Superstep 当中,Master 会决定图分区、发送 RPC 调用到 Worker 节点激发任务以及监控任务完成。所有图分区都在 Worker 上,Master 不管理任何图分区。 不过,Pregel 的 Aggregator 运行在 Master 上。因此 Worker 需要将 Aggregator 所需信息发送到 Master 上进行聚合。

3.3 Message 缓冲

Message 缓冲是在计算节点(Worker)的层面上提高吞吐量的一个优化。 Message 在 Worker 之间传递时并不是来一个发一个,而是通过缓冲积攒一些 Message,之后以 Batch 的形式批量发送。 这一优化可以减少网络请求的 Overhead。

3.4 Checkpoint 和 Confined Recovery

Checkpoint 在 Superstep 执行前进行,用来保存当前系统的状态。当某一图分区计算失败但 Worker 仍然可用时, 可以从 Checkpoint 执行快速恢复

当某一 Worker 整体失败当机使得它所记录的全部状态丢失时,新启动的 Worker 可能要重新接收上一步发送出来的消息。 为了避免无限制的递归重新计算之前的步骤,Pregel 将 Worker 将要发送的每一条消息写入 Write Ahead Log。 这种方法被称为 Confined Recovery

四.源码

def apply[VD: ClassTag, ED: ClassTag, A: ClassTag]
(graph: Graph[VD, ED],
initialMsg: A,
maxIterations: Int = Int.MaxValue,
activeDirection: EdgeDirection = EdgeDirection.Either)
(vprog: (VertexId, VD, A) => VD,
sendMsg: EdgeTriplet[VD, ED] => Iterator[(VertexId, A)],
mergeMsg: (A, A) => A)
: Graph[VD, ED] =
{
//要求最大迭代数大于0,不然报错。
require(maxIterations > 0, s"Maximum number of iterations must be greater than 0," +
s" but got ${maxIterations}")
//第一次迭代,对每个节点用vprog函数计算。
var g = graph.mapVertices((vid, vdata) => vprog(vid, vdata, initialMsg)).cache()
// 根据发送、聚合信息的函数计算下次迭代用的信息。
var messages = GraphXUtils.mapReduceTriplets(g, sendMsg, mergeMsg)
//数一下还有多少节点活跃
var activeMessages = messages.count()
// 下面进入循环迭代
var prevG: Graph[VD, ED] = null
var i = 0
while (activeMessages > 0 && i < maxIterations) {
// 接受消息并更新节点信息
prevG = g
g = g.joinVertices(messages)(vprog).cache()

val oldMessages = messages
// Send new messages, skipping edges where neither side received a message. We must cache
// messages so it can be materialized on the next line, allowing us to uncache the previous
/*iteration这里用mapReduceTriplets实现消息的发送和聚合。mapReduceTriplets的*参数中有一个map方法和一个reduce方法,这里的*sendMsg就是map方法,*mergeMsg就是reduce方法
*/
messages = GraphXUtils.mapReduceTriplets(
g, sendMsg, mergeMsg, Some((oldMessages, activeDirection))).cache()
// The call to count() materializes `messages` and the vertices of `g`. This hides oldMessages
// (depended on by the vertices of g) and the vertices of prevG (depended on by oldMessages
// and the vertices of g).
activeMessages = messages.count()

logInfo("Pregel finished iteration " + i)

// Unpersist the RDDs hidden by newly-materialized RDDs
oldMessages.unpersist(blocking = false)
prevG.unpersistVertices(blocking = false)
prevG.edges.unpersist(blocking = false)
// count the iteration
i += 1
}
messages.unpersist(blocking = false)
g
} // end of apply
目录
相关文章
|
iOS开发 MacOS
推荐几个mac分屏显示的工具
桌面分屏是我们工作中经常会用到的功能,但是Mac上自带的分屏功能非常有限,必须进入全屏才能使用,管理窗口时使用起来并不方便。本文中我们就来推荐几款好用的Mac分屏软件,让你的窗口管理更简单
|
9月前
|
存储 安全 前端开发
CC&LG实践|基于 LangGraph 一步步实现 Claude-Code 核心设计
本文旨在深入剖析 Claude-Code 的核心设计思想与关键技术实现,逆向分析其功能模块,结合 LangGraph 框架的能力,系统性地演示如何从一个最基础的 ReAct Agent 出发,逐步构建一个功能完备的简版 Claude-Code。
5229 19
CC&LG实践|基于 LangGraph 一步步实现 Claude-Code 核心设计
|
9月前
|
人工智能 安全 数据可视化
深度解析三大AI协议:MCP、ACP与A2A,看懂智能代理的通信法则
在AI代理技术快速发展的背景下,MCP、ACP和A2A三大协议成为推动AI生态协作的关键标准。MCP(模型上下文协议)为大模型提供标准化信息接口,提升AI处理外部数据的效率;ACP(代理通信协议)专注于边缘设备间的低延迟通信,实现本地系统的高效协同;A2A(代理对代理协议)则构建跨平台通信标准,打通不同AI系统的协作壁垒。三者各司其职,共同推动AI从独立工具向智能协作团队演进,提升整体智能化水平与应用灵活性。
4548 2
深度解析三大AI协议:MCP、ACP与A2A,看懂智能代理的通信法则
|
机器学习/深度学习 分布式计算 算法
【大数据分析&机器学习】分布式机器学习
本文主要介绍分布式机器学习基础知识,并介绍主流的分布式机器学习框架,结合实例介绍一些机器学习算法。
|
7月前
|
机器学习/深度学习 人工智能 前端开发
终端里的 AI 编程助手:OpenCode 使用指南
OpenCode 是开源的终端 AI 编码助手,支持 Claude、GPT-4 等模型,可在命令行完成代码编写、Bug 修复、项目重构。提供原生终端界面和上下文感知能力,适合全栈开发者和终端用户使用。
55719 11
|
边缘计算 人工智能 云计算
HIS系统的核心模块:医生工作站
医生工作站是HIS系统的核心模块,围绕诊疗流程一体化、电子病历智能化、医嘱闭环管理、辅助决策支持及移动化操作五大方面优化医疗流程。它整合患者信息,提供结构化病历生成与质控,实现医嘱全流程追踪,结合药品和诊疗知识库辅助决策,并支持多场景便捷操作。通过数据整合与智能工具,提升诊疗效率与质量,助力智慧医疗发展。
729 2
|
机器学习/深度学习 人工智能 自然语言处理
师资研修|AI技术赋能教材建设和课程开发——乌鲁木齐某教育部门
近日,TsingtaoAI派出AI专家为乌鲁木齐中职院校的教师团队,举办“AI技术赋能教材建设与课程开发”的师资研修。此次培训由TsingtaoAI的AI专家高寒和教育专家刘建老师亲自授课,面对的是来自乌鲁木齐的教育工作者,特别是中职院校的教学骨干。整个活动不仅涉及人工智能技术本身的深度解析,还深入探讨了如何将这些前沿技术高效应用于教材和课程体系的创新。
551 0
|
存储 Java 测试技术
阿里巴巴java开发手册
这篇文章是关于阿里巴巴Java开发手册的整理,内容包括编程规约、异常日志、单元测试、安全规约、MySQL数据库使用以及工程结构等方面的详细规范和建议,旨在帮助开发者编写更加规范、高效和安全的代码。

热门文章

最新文章