一.图简介
图数据结构可以很好的展示关联关系,关联性计算是大数据计算核心,Pregel是一种基于BSP模型实现的并行图处理系统。主要用于图遍历(BFS)、最短路径(SSSP)、PageRank计算等等计算。
二.模型
2.1 简介
Pregel计算模型,输入是一个有向图,该有向图每个顶点(Vertex)是唯一,每个顶点都有属性,这些属性可以被修改,其初始值由用户定义,边也有属性,与顶点关联。
消息(Message),消息是Pregel计算模型的核心。每个 Vertex 在初始状态以及之后的每一个计算步骤当中都被 Attach 一个 Message 值作为 Vertex 当前的状态,算法的迭代通过 Vertex 之间互相发送的消息来完成。
超迭代(Superstep)。一个 Superstep 是 Pregel 在执行算法过程当中进行的一次迭代。 一次 Pregel 计算过程可能包括多个 Superstep。
2.2 Vertex
Pregel图计算过程与MapReduce非常接近:在迭代每一个步骤,将会以图的节点为中心进行Map操作,这意味Map函数当中我们只有以某一节点为中心的局部信息,包括:
- 一个 Vertex 和它当前 Attach 的 Message
- 当前 Vertex 向外指向的 Edge 和 Edge 上的属性
- 当前 Vertex 在上一步计算当中所接收到的全部 Message
Vertex状态变化
当一个Vertex在上上一步当中没有接收到消息,或者算法自己决定不在向外发送消息,它可以被转变为Inactive。在 Pregel 的术语当中,这被称为 Vote to halt。
当一个在之前已经 Inactive 的 Vertex 又接受到一条新的消息,它会在新的计算中转变为 Active 的状态。
在大多数算法当中,所有的 Vertex 都进入 Inactive 状态就意味着算法结束。
2.3 优化
Message网络传输
在有些情况下,我们可以在 Message 发送前先对他们进行一步聚合。比方说, 在算法中我们只关心接收到消息的最大值,那么与其把所有消息都发送到目的地再计算, 不如先将最大值求出,这样可以极大地减少需要发送的消息数量。 Pregel 允许用户自定义 Combiner 来实现这一目的。
三.系统设计
3.1 图分区
图分区的方法十分容易理解,前文提到 Pregel 是以 Vertex 为中心的计算模型, 因此在分区的时候也是以 Vertex 为中心。 当一个节点被划分到一个区,与之相连的局部信息(边、边属性、消息)也都会被分配到这个区上。 由于对图进行分区的函数是全局一致的,各个计算节点对消息的转发并不需要通过某一中心服务进行协调。
默认的分区方法就是对 VertexId 的 Hash 值进行取模操作。用户也可以自定义分区函数以增强分区的局部性(Locality), 减少计算节点之间的网络流量。
3.2 主从模型
Pregel 在分布式系统当中的任务调度是简单的主从模型。每个计算任务有一个 Master 进程协调所有计算,在每个 Superstep 当中,Master 会决定图分区、发送 RPC 调用到 Worker 节点激发任务以及监控任务完成。所有图分区都在 Worker 上,Master 不管理任何图分区。 不过,Pregel 的 Aggregator 运行在 Master 上。因此 Worker 需要将 Aggregator 所需信息发送到 Master 上进行聚合。
3.3 Message 缓冲
Message 缓冲是在计算节点(Worker)的层面上提高吞吐量的一个优化。 Message 在 Worker 之间传递时并不是来一个发一个,而是通过缓冲积攒一些 Message,之后以 Batch 的形式批量发送。 这一优化可以减少网络请求的 Overhead。
3.4 Checkpoint 和 Confined Recovery
Checkpoint 在 Superstep 执行前进行,用来保存当前系统的状态。当某一图分区计算失败但 Worker 仍然可用时, 可以从 Checkpoint 执行快速恢复
当某一 Worker 整体失败当机使得它所记录的全部状态丢失时,新启动的 Worker 可能要重新接收上一步发送出来的消息。 为了避免无限制的递归重新计算之前的步骤,Pregel 将 Worker 将要发送的每一条消息写入 Write Ahead Log。 这种方法被称为 Confined Recovery
四.源码
def apply[VD: ClassTag, ED: ClassTag, A: ClassTag]
(graph: Graph[VD, ED],
initialMsg: A,
maxIterations: Int = Int.MaxValue,
activeDirection: EdgeDirection = EdgeDirection.Either)
(vprog: (VertexId, VD, A) => VD,
sendMsg: EdgeTriplet[VD, ED] => Iterator[(VertexId, A)],
mergeMsg: (A, A) => A)
: Graph[VD, ED] =
{
//要求最大迭代数大于0,不然报错。
require(maxIterations > 0, s"Maximum number of iterations must be greater than 0," +
s" but got ${maxIterations}")
//第一次迭代,对每个节点用vprog函数计算。
var g = graph.mapVertices((vid, vdata) => vprog(vid, vdata, initialMsg)).cache()
// 根据发送、聚合信息的函数计算下次迭代用的信息。
var messages = GraphXUtils.mapReduceTriplets(g, sendMsg, mergeMsg)
//数一下还有多少节点活跃
var activeMessages = messages.count()
// 下面进入循环迭代
var prevG: Graph[VD, ED] = null
var i = 0
while (activeMessages > 0 && i < maxIterations) {
// 接受消息并更新节点信息
prevG = g
g = g.joinVertices(messages)(vprog).cache()
val oldMessages = messages
// Send new messages, skipping edges where neither side received a message. We must cache
// messages so it can be materialized on the next line, allowing us to uncache the previous
/*iteration这里用mapReduceTriplets实现消息的发送和聚合。mapReduceTriplets的*参数中有一个map方法和一个reduce方法,这里的*sendMsg就是map方法,*mergeMsg就是reduce方法
*/
messages = GraphXUtils.mapReduceTriplets(
g, sendMsg, mergeMsg, Some((oldMessages, activeDirection))).cache()
// The call to count() materializes `messages` and the vertices of `g`. This hides oldMessages
// (depended on by the vertices of g) and the vertices of prevG (depended on by oldMessages
// and the vertices of g).
activeMessages = messages.count()
logInfo("Pregel finished iteration " + i)
// Unpersist the RDDs hidden by newly-materialized RDDs
oldMessages.unpersist(blocking = false)
prevG.unpersistVertices(blocking = false)
prevG.edges.unpersist(blocking = false)
// count the iteration
i += 1
}
messages.unpersist(blocking = false)
g
} // end of apply