开发者学堂课程【Scala 核心编程 - 进阶:Master 检测心跳并删除超时的 Worker】学习笔记,与课程紧密连接,让用户快速学习知识。
课程地址:https://developer.aliyun.com/learning/course/610/detail/9126
Master 检测心跳并删除超时的 Worker
内容介绍:
一、功能要求
二、思路分析
三、程序代码
四、总结
一、功能要求
Master 启动定时任务,定时检测注册的 worker 中有哪些 worker 没有更新心跳。对于已经超时的 worker,Master 将其从 hashmap 中删除。
二、思路分析
在之前的课程中,我们曾搭建过一个关于 Master 和 Worker 的程序关系框图,这节课我们将继续结合之前搭建的程序关系框图来理解 Master 启动定时任务的原理和过程,在该分析过程中不考虑客户端,仅分析服务端即可。
如下图:
Master 维护了很多 hashmap,而 hashmap 中有很多 worker信息。
如图上所示,workers即 所有的 map 其中管理了许多 worker,其中包含了这些worker的信息,如图中的 worker id1信息、worker id2信息、worker id3信息等。
Hashmap 与这些 workers 是相关联的。假如客户端对应的worker已经许久没有更新心跳,而 Master 管理的 worker 应有实际意义,没有心跳就失去了其意义,此时可以将其清空。
要想将其清空,Master 需要进行相关处理,即定时检测管理的 worker 的心跳是否正常。如果心跳已经超时,即处于异常状态,此时就需要删除任务。
因此,在 Master 中应有自行启动定时任务的程序,让其反复地检测worker的心跳是否正常,是否超时,我们会启动定时检测的任务。在该图上用深绿色标注定时任务定时器。
直至此时,我们所完成功能与跟客户端没有任何关系,也就是说现在检测任务只是发生在服务器端。现在的问题在于如何去启动定时器?
应该在图中“start表示Master工作”部分启动较为合适,一旦Master启动,就立即检测,从此处开始触发条件命令服务器开始执行检测任务或者命令Master启动定时任务。
三、程序代码
从左侧的项目栏里快速找到Master,代码如下:
package com.lxb.sparkmasterworker.master
import akka.actor.{Actor, ActorRef, ActorSystem, Props}
import com.atguigu.akka.sparkmasterworker.commen.
import com.typesafe.config.ConfigFactory
import scala.collection.mutable
//引入一个需要的包
import scala.concurrent.duration.
Class SparkMaster extends Actor
//定义一个hm,管理workers
val workers = mutable.Map[string,WorkerInfo]()
override def receive: Receive = {
case "start" => {
println("master服务器启动了...")
//从这里开始正式编写代码
//我们曾经预设了两个协议,可以直接使用,也可以后期简化并作一步
//MessageProtocol.scala
//master给自己发送一个触发检查超时worker的信息
case object StartTimeOutWorker
//master给自己发消息,每隔一段时间检测worker,对于心跳超时的,执行删除命令
case object RemoveTimeOutWorker
//以上协议命令是按照Spark格式书写的,可以直接简化定时出发RemoveTimeOutWorker 请求即可。这两条命令都是object,说明其中不用带有信息,只是说明应执行的任务即可。
//接下来开始书写正式的代码
self ! StartTimeOutWorker // Master发送消息给自己
case RegisterWorkerInfo(id,cpu,ram) = > {
//接收到Worker的注册信息
If(!workers.contains(id)) {
//创建workerInfo 对象
val workerInfo =new WorkerInfo(id,cpu,ram)
//加入到workers
workers + = ((id,workerInfo))
println("服务器的workers="+workers)
//回复一个消息,说注册成功
send ()!RegisterWorkerInfo
}
}
}
case HeartBeat (id) => {
//更新对应的worker的心跳时间
//从worker取出workerInfo
val workerInfo = workers(id)
workerInfo.lastHeartBeat =System.currentTimeMillis()
println (“master更新了”+ id +“心跳时间…”)
}
//该段命令与 self ! StartTimeOutWorker 承接,为了不破坏命令请求的执行,将其放在最后面。
case StartTimeOutWorker = >
println(“开始了定时检测worker心跳的任务)
//如果收到了StartTimeOutWorker 信息,则说明此时要准备开始执行检测任务。而该阶段后期可能还要执行一些单独的工作,所以与前面的代码分开来写有一定的好处
//如何定时启动?
思路与原先相同,原先用到了dispatcher转换,此处只要将原先
的代码略作修改即可。
//首先引入一个需要的包(import scala.concurrent.durati
on. 为了方便读取,放在了代码最开始)
import context.dispatcher
//说明
// 1. 0millis:表示不延时,立即执行定时器
// 2. 9000millis:表示每9秒执行一次
// 3. self:表示发给自己
// 4. RemoveTimeOutWorker发送的内容
context.system.scheduler.schedule(0 millis,9000 mil
lis,self,RemoveTimeOutWorker)
// 此处将原本的HeartBeat改为了RemoveTimeOutW
orker。
且将原本的3秒改为9秒,因为worker原本3秒更新一次数据,而如果3秒检测一次间隔时间会偏短,可以适当延长检测的时间,如9秒,这几秒钟的延迟不会影响检测结果。StartTimeOutWorker 被触发一次,定时器就启用了。
}
//启用过后,对RemoveTimeOutWorker消息进行处理
case RemoveTimeOutWorker=>{
//由于该消息是一个object ,因此可以直接写入
//这里需要检测哪些worker心跳超时,并从map中删除。这里
需要充分利用scala函数式编程的特点。
//思路:
首先我们需要考虑何谓超时,所谓超时是指此时检测的时间减去上一次数据更新时间,若这个差值超过了某一范围,即称为超时。理论上这个值只要大于3秒即为超时,但是考虑到网络延时等原因,可以将其设为6秒,且一般情况下,服务器应当在同一机房,因此这3秒的延时对于数据的整体影响不大。
也就是now - lastHeartBeat > 6000即算为超时。
// 首先将workers所有的workerinfos信息取出来(val workers = mutable.Map[string,WorkerInfo]),因为 WorkerInfo 的 id 与string相同,所以我们这里不考虑其他信息
val workerInfos = workers.values
// 获取当前时间now
val nowTime = System.currentTimeMillis()
// 先将所有的超时的workerinfo过滤出来,然后删除
workerInfos.filter(workerInfo=>(nowTime-workerInf
o.lastHeartBeat ) > 6000)) .foreach(workerInfo=>work
ers.remove(workerInfo.id))
//若条件“两个值的差值大于6000”一旦被满足,该worker就会被过滤出来删除
//该段代码可能较难理解,简言之,可以理解为workerInfos是当前map中的所有workerInfo,也就是之前图示中表示的各worker的“信息”(不包含id),
通过workerInfo=>(nowTime-workerInfo.lastHeartBeat) >
6000函数将满足该条件的“信息”过滤出来,这部分“信息”即是要准备删除的内容,且该函数中的 workerInfo是从workerInfos取出的。
foreach(workerInfo=>workers.remove(workerInfo.id))是对过滤出的信息组合为一个集合的操作,
且此处的workerInfo仅为组合的集合的名称,可以进行更改,如改为foreach(worker=>workers.remove(worker.id))亦可。
前面的函数整的workerInfo同样如此,且这两个函数中的值亦无关。
println("当前有"+ workers.size+"个 worker存活的")
}
}
}
//运行代码检查代码有无bug,当然写完整段代码无需检测客户
端,因为目前检测任务与客户端没有任何关系。且此时两个时间分
别设为了9秒和6秒,是在基于网络延时的考虑下将时间设置的
宽裕了一些,更为安全。
关闭原先的 SparkMaster 与 SparkWorker,启动 SparkMaster,检测结果为“当前有0个 worker 存活的”,9秒之后再次弹出一个检测结果。再启动SparkWorker,结果显示该 SparkWorker每3秒钟发送一次心跳。每发送3次后,显示“当前有1个 worker 存活的”。
此时若使得S parkWorker 宕机,最先显示“当前有1个 worker 存活的”,接下来的检测结果显示“当前有0个worker存活的”。当然,启动两个 Master 也可以解决该问题。
四、总结
对于 Master 检测心跳并删除超时的worker这部分,我们进行了功能说明和思路分析,最后通过代码实现了这一功能。
对于思路分析,我们只分析服务端,具体分析见本讲义中第二部分中的示意图。
对于代码实现部分,我们仅是添加了两个协议,并修改服务器端的代码。
//更新了MessageProtocol.scala,即
//master给自己发送一个触发检查超时worker的信息
case object StartTimeOutWorker
//master给自己发消息,每隔一段时间检测worker,对于心跳超时的,执行删除命令
case object RemoveTimeOutWorker
//这里只是为了套用Spark的格式,相当于做了过渡,也可以可以直接调用RemoveTimeOut,更加简洁。
//更新了SparkMaster.scala,即增加了两部分内容:
case "start" => {
println("master服务器启动了...")
//从这里开始正式编写代码
self ! StartTimeOutWorker
}
case StartTimeOutWorker = >
println(“开始了定时检测worker心跳的任务)
import context.dispatcher
//说明
// 1. 0millis:表示不延时,立即执行定时器
// 2. 9000millis:表示每9秒执行一次
// 3. self:表示发给自己
// 4. RemoveTimeOutWorker发送的内容
context.system.scheduler.schedule(0 millis,9000 mil
lis,self,RemoveTimeOutWorker)
//对 RemoveTimeOutWorker 消息进行处理
//这里需要检测哪些worker心跳超时(now - lastHeartBeat > 6000),并从map中删除。
case RemoveTimeOutWorker=>{
// 首先将workers所有的workerinfos信息取出 val
workerInfos = workers.values
val nowTime = System.currentTimeMillis()
// 先将所有超时的workerinfo过滤出来,然后删除
workerInfos.filter(workerInfo=>(nowTime-workerInf
o.lastHeartBeat ) > 6000)) .foreach(workerInfo=>w
orkers.remove(workerInfo.id))
println("当前有"+ workers.size+"个 worker存活的")
}
以上就是定时检测超时walker并删除任务的所有内容。