Giraph源码分析(四)—— Master 如何检查Worker启动成功

简介: 本文的目的说明Giraph如何借助ZooKeeper来实现Master与Workers间的同步(不太确定)。环境在单机上(机器名:giraphx)启动了2个workers。Giraph遵从单Master多Workers结构,BSPServiceMaster使用MasterThread线程来进行全局的同步。

本文的目的

说明Giraph如何借助ZooKeeper来实现Master与Workers间的同步(不太确定)。

环境

在单机上(机器名:giraphx)启动了2个workers。

Giraph遵从单Master多Workers结构,BSPServiceMaster使用MasterThread线程来进行全局的同步。每个Worker启动成功后,会向Master汇报自身的健康状况,那么Master是如何检测Workers是否都成功启动了?

Master在ZooKeeper上创两个目录,_workerHealthyDir和 _workerUnhealthyDir,分别用来记录Healthy Workers和UnHealthy Workers。

主要在BspServiceMaster类中的getAllWorkerInfos()方法来完成,其调用关系如下,注意下getAllWorkerInfos()到MasterThread.run()方法调用关系,比较难找。

创建的两个目录如下:

/_hadoopBsp/job_201404102333_0002/_applicationAttemptsDir/0/_superstepDir/-1/_workerHealthyDir /_hadoopBsp/job_201404102333_0002/_applicationAttemptsDir/0/_superstepDir/-1/_workerUnhealthyDir

每个Worker在setup()中,调用registerHealth()方法来注册自身的状态。

若自身是Healthy的,则在_workerHealthyDir目录下添加子节点 /wokerInfo.getHostNameId(),否则在_workerUnhealthyDir目录下添加。wokerInfo.getHostNameId()为:Hostname+“_”+TaskId。 Task1和Task2 (Task 0是master) 创建的子节点如下:

/_hadoopBsp/job_201404102333_0002/_applicationAttemptsDir/0/_superstepDir/-1/_workerHealthyDir/giraphx_1
/_hadoopBsp/job_201404102333_0002/_applicationAttemptsDir/0/_superstepDir/-1/_workerHealthyDir/giraphx_2

Master 在checkWorkers()方法中,在While死循环中(实际有超时限制),通过调用getAllWorkerInfos()方法来获取_workerHealthyDir目录下的子节点,然后比较子节点数目是否达到maxWorkers(启动job时定义的,-w参数)。

若小于maxWorkers,则继续调用getAllWorkerInfos()方法进行下一轮检测;若等于maxWorker,退出While循环,然后返回healthyWorkersInfoList:[Worker(hostname=giraphx, MRtaskID=1, port=30001), Worker(hostname=giraphx, MRtaskID=2, port=30002)] 。

问题:由于在分布式环境中,每个Worker和Maste都是并行运行,彼此不知道对方的运行情况。上述第3步骤中,若还有子节点还没有创建,就一直在while死循环中调用来检测getAllWorkerInfos()方法检测,效率比较低下,当然也比较笨!

Giraph借用ZooKeeper来高效的进行检测。设计理念如下:

  1. master在获取子节点时,注册Watcher(为注册器,用于触发相应事件)。

若某个task创建了子节点后,就会触发Watcher事件。

若子节点数目小于maxWorkers,就调用 workerHealthRegistrationChanged的await()方法释放当前线程的锁,陷入等待状态。不会进行无用的检测。

说明:workerHealthRegistrationChanged为PredicateLock类型(implements BspEvent接口),PredicateLock里面使用可重入锁 ReentrantLock和Condition进行线程的控制。

当某个task创建了子节点后,触发Watcher事件。

调用BspService中的public final void Process(WatchedEvent event)事件,该方法根据事件的路径来激活相应的BspEvent事件。此处对应的是:

实验运行如下:

s(926)) - process: Got a new event, path = /_hadoopBsp/job_201404102333_0002/_applicationAttemptsDir/0/_superstepDir/-1/_workerHealthyDir, type = NodeChildrenChanged, state = SyncConnected INFO bsp.BspService (BspService.java:process(960)) - process: workerHealthRegistrationChanged (worker health reported - healthy/unhealthy )

这样就会激活master线程,开始下一轮检测。

子节点数目等于maxWorkers时,就停止。

总结:每创建一个子节点时,才会进行一次检测,效率较高!

相关文章
|
4月前
|
网络协议 小程序 物联网
Gateway-Worker启动失败或者启动无法正常使用的几种方法
Workerman是一款开源高性能异步PHP socket即时通讯框架。支持高并发,超高稳定性,被广泛的用于手机app、移动通讯,微信小程序,手游服务端、网络游戏、PHP聊天室、硬件通讯、智能家居、车联网、物联网等领域的开发。 支持TCP长连接,支持Websocket、HTTP等协议,支持自定义协议
129 3
|
6月前
|
NoSQL Redis 数据库
Redis server启动后会做哪些操作?
Redis server启动后会做哪些操作?
解决es集群启动完成后报master_not_discovered_exception
解决es集群启动完成后报master_not_discovered_exception
1339 0
|
设计模式 分布式计算 Scala
Spark Master 和 Worker 项目需求 | 学习笔记
快速学习 Spark Master 和 Worker 项目需求
Spark Master 和 Worker 项目需求 | 学习笔记
|
分布式计算 安全 Scala
Master 检测心跳并删除超时的 Worker | 学习笔记
快速学习 Master 检测心跳并删除超时的 Worker
Master 检测心跳并删除超时的 Worker | 学习笔记
|
网络协议 Linux Scala
指定 Master 与 Worker 的启动参数 | 学习笔记
快速学习指定 Master 与 Worker 的启动参数
妹子问我为啥启动线程时使用 start 而不是 run
今天团队里面的妹子问阿粉,为什么在启动线程的时候,都使用 start 方法,而不是 run 方法呢 还好阿粉平时一直有在学习,要不真的被妹子问住了 在多线程中,如果想让一个线程启动,你使用的方法一定是 thread.start() 方法,而不是 thread.run() 方法(啥,你用的不是 thread.start() 方法?乖,你的打开方式不对哦,下次不要这样了 有没有疑惑,为什么每次我们都习惯调用 start() 方法,为什么不直接调用 run() 方法来启动线程呢? 而且如果去看源码的话,你会发现,在 thread.start() 方法中,其实最后还是调用了 thread.ru
妹子问我为啥启动线程时使用 start 而不是 run
|
存储 算法 Unix
bthread源码剖析(四): 通过ParkingLot实现Worker间任务状态同步
通过之前的文章我们知道TaskGroup(以下简称TG)是在死循环等待任务,然后切换栈去执行任务。在当前TG没有任务的时候会进行“工作窃取”窃取其他TG的任务。在没有任务的时候TG会“休眠”,当任务出现的时候被唤醒然后消费。
319 0
|
NoSQL 关系型数据库 Linux