Master 检测心跳并删除超时的 Worker | 学习笔记

简介: 快速学习 Master 检测心跳并删除超时的 Worker

开发者学堂课程【Scala 核心编程 - 进阶Master 检测心跳并删除超时的 Worker学习笔记,与课程紧密连接,让用户快速学习知识。

课程地址https://developer.aliyun.com/learning/course/610/detail/9126


Master 检测心跳并删除超时的 Worker


内容介绍:

一、功能要求

二、思路分析

三、程序代码

四、总结


一、功能要求

Master 启动定时任务,定时检测注册的 worker 中有哪些 worker 没有更新心跳。对于已经超时的 worker,Master 将其从 hashmap 中删除。


二、思路分析

在之前的课程中,我们曾搭建过一个关于 Master 和 Worker 的程序关系框图,这节课我们将继续结合之前搭建的程序关系框图来理解 Master 启动定时任务的原理和过程,在该分析过程中不考虑客户端,仅分析服务端即可。

如下图:

image.png

Master 维护了很多 hashmap,而 hashmap 中有很多 worker信息。

如图上所示,workers即 所有的 map 其中管理了许多 worker,其中包含了这些worker的信息,如图中的 worker id1信息、worker id2信息、worker id3信息等。

Hashmap 与这些 workers 是相关联的。假如客户端对应的worker已经许久没有更新心跳,而 Master 管理的 worker 应有实际意义,没有心跳就失去了其意义,此时可以将其清空。

要想将其清空,Master 需要进行相关处理,即定时检测管理的 worker 的心跳是否正常。如果心跳已经超时,即处于异常状态,此时就需要删除任务。

因此,在 Master 中应有自行启动定时任务的程序,让其反复地检测worker的心跳是否正常,是否超时,我们会启动定时检测的任务。在该图上用深绿色标注定时任务定时器。

直至此时,我们所完成功能与跟客户端没有任何关系,也就是说现在检测任务只是发生在服务器端。现在的问题在于如何去启动定时器?

应该在图中“start表示Master工作”部分启动较为合适,一旦Master启动,就立即检测,从此处开始触发条件命令服务器开始执行检测任务或者命令Master启动定时任务。


三、程序代码

从左侧的项目栏里快速找到Master,代码如下:

package com.lxb.sparkmasterworker.master

import akka.actor.{Actor, ActorRef, ActorSystem, Props}

import com.atguigu.akka.sparkmasterworker.commen.

import com.typesafe.config.ConfigFactory

import scala.collection.mutable

//引入一个需要的包

import scala.concurrent.duration.

Class SparkMaster extends Actor

//定义一个hm,管理workers

val workers = mutable.Map[string,WorkerInfo]()

override def receive: Receive = {

case "start" =>  {

println("master服务器启动了...")

//从这里开始正式编写代码

//我们曾经预设了两个协议,可以直接使用,也可以后期简化并作一步

//MessageProtocol.scala

//master给自己发送一个触发检查超时worker的信息

case object StartTimeOutWorker

//master给自己发消息,每隔一段时间检测worker,对于心跳超时的,执行删除命令

case object RemoveTimeOutWorker

//以上协议命令是按照Spark格式书写的,可以直接简化定时出发RemoveTimeOutWorker 请求即可。这两条命令都是object,说明其中不用带有信息,只是说明应执行的任务即可。

//接下来开始书写正式的代码

self  ! StartTimeOutWorker  // Master发送消息给自己

case RegisterWorkerInfo(id,cpu,ram) = > {

//接收到Worker的注册信息

If(!workers.contains(id)) {

//创建workerInfo 对象

val workerInfo =new WorkerInfo(id,cpu,ram)

//加入到workers

workers + = ((id,workerInfo))

println("服务器的workers="+workers)

//回复一个消息,说注册成功

send ()!RegisterWorkerInfo

}

}

}

case HeartBeat (id) => {

//更新对应的worker的心跳时间

//从worker取出workerInfo

val  workerInfo = workers(id)

workerInfo.lastHeartBeat =System.currentTimeMillis()

println (“master更新了”+ id +“心跳时间…”)

}

//该段命令与 self  ! StartTimeOutWorker 承接,为了不破坏命令请求的执行,将其放在最后面。

case StartTimeOutWorker = >

println(“开始了定时检测worker心跳的任务)

//如果收到了StartTimeOutWorker 信息,则说明此时要准备开始执行检测任务。而该阶段后期可能还要执行一些单独的工作,所以与前面的代码分开来写有一定的好处

//如何定时启动?

思路与原先相同,原先用到了dispatcher转换,此处只要将原先

的代码略作修改即可。

//首先引入一个需要的包(import scala.concurrent.durati

on. 为了方便读取,放在了代码最开始)

import context.dispatcher

//说明

// 1.  0millis:表示不延时,立即执行定时器

// 2.   9000millis:表示每9秒执行一次

// 3.   self:表示发给自己

// 4.  RemoveTimeOutWorker发送的内容

context.system.scheduler.schedule(0 millis,9000 mil

lis,self,RemoveTimeOutWorker)        

// 此处将原本的HeartBeat改为了RemoveTimeOutW

orker。

且将原本的3秒改为9秒,因为worker原本3秒更新一次数据,而如果3秒检测一次间隔时间会偏短,可以适当延长检测的时间,如9秒,这几秒钟的延迟不会影响检测结果。StartTimeOutWorker 被触发一次,定时器就启用了。

}

//启用过后,对RemoveTimeOutWorker消息进行处理

case RemoveTimeOutWorker=>{

//由于该消息是一个object ,因此可以直接写入

//这里需要检测哪些worker心跳超时,并从map中删除。这里

需要充分利用scala函数式编程的特点。

//思路:

首先我们需要考虑何谓超时,所谓超时是指此时检测的时间减去上一次数据更新时间,若这个差值超过了某一范围,即称为超时。理论上这个值只要大于3秒即为超时,但是考虑到网络延时等原因,可以将其设为6秒,且一般情况下,服务器应当在同一机房,因此这3秒的延时对于数据的整体影响不大。

也就是now - lastHeartBeat > 6000即算为超时。          

// 首先将workers所有的workerinfos信息取出来(val workers = mutable.Map[string,WorkerInfo]),因为 WorkerInfo 的 id 与string相同,所以我们这里不考虑其他信息

val workerInfos = workers.values    

// 获取当前时间now

val nowTime = System.currentTimeMillis()

// 先将所有的超时的workerinfo过滤出来,然后删除

workerInfos.filter(workerInfo=>(nowTime-workerInf

o.lastHeartBeat ) > 6000))  .foreach(workerInfo=>work

ers.remove(workerInfo.id))

//若条件“两个值的差值大于6000”一旦被满足,该worker就会被过滤出来删除

//该段代码可能较难理解,简言之,可以理解为workerInfos是当前map中的所有workerInfo,也就是之前图示中表示的各worker的“信息”(不包含id),

通过workerInfo=>(nowTime-workerInfo.lastHeartBeat) >

6000函数将满足该条件的“信息”过滤出来,这部分“信息”即是要准备删除的内容,且该函数中的 workerInfo是从workerInfos取出的。

foreach(workerInfo=>workers.remove(workerInfo.id))是对过滤出的信息组合为一个集合的操作,

且此处的workerInfo仅为组合的集合的名称,可以进行更改,如改为foreach(worker=>workers.remove(worker.id))亦可。

前面的函数整的workerInfo同样如此,且这两个函数中的值亦无关。

println("当前有"+ workers.size+"个 worker存活的")

}

}

}

//运行代码检查代码有无bug,当然写完整段代码无需检测客户

端,因为目前检测任务与客户端没有任何关系。且此时两个时间分

别设为了9秒和6秒,是在基于网络延时的考虑下将时间设置的

宽裕了一些,更为安全。

关闭原先的 SparkMaster 与 SparkWorker,启动 SparkMaster,检测结果为“当前有0个 worker 存活的”,9秒之后再次弹出一个检测结果。再启动SparkWorker,结果显示该 SparkWorker每3秒钟发送一次心跳。每发送3次后,显示“当前有1个 worker 存活的”。

此时若使得S parkWorker 宕机,最先显示“当前有1个 worker 存活的”,接下来的检测结果显示“当前有0个worker存活的”。当然,启动两个 Master 也可以解决该问题。


四、总结

对于 Master 检测心跳并删除超时的worker这部分,我们进行了功能说明和思路分析,最后通过代码实现了这一功能。

对于思路分析,我们只分析服务端,具体分析见本讲义中第二部分中的示意图。

对于代码实现部分,我们仅是添加了两个协议,并修改服务器端的代码。

//更新了MessageProtocol.scala,即

//master给自己发送一个触发检查超时worker的信息

case object StartTimeOutWorker

//master给自己发消息,每隔一段时间检测worker,对于心跳超时的,执行删除命令

case object RemoveTimeOutWorker

//这里只是为了套用Spark的格式,相当于做了过渡,也可以可以直接调用RemoveTimeOut,更加简洁。

//更新了SparkMaster.scala,即增加了两部分内容:

case "start" =>  {

println("master服务器启动了...")

//从这里开始正式编写代码

self  ! StartTimeOutWorker  

}

case StartTimeOutWorker = >

println(“开始了定时检测worker心跳的任务)

import context.dispatcher

//说明

// 1.  0millis:表示不延时,立即执行定时器

// 2.   9000millis:表示每9秒执行一次

// 3.   self:表示发给自己

// 4.  RemoveTimeOutWorker发送的内容

context.system.scheduler.schedule(0 millis,9000 mil

lis,self,RemoveTimeOutWorker)

//对 RemoveTimeOutWorker 消息进行处理

//这里需要检测哪些worker心跳超时(now - lastHeartBeat > 6000),并从map中删除。

case RemoveTimeOutWorker=>{

// 首先将workers所有的workerinfos信息取出 val

workerInfos = workers.values

val nowTime = System.currentTimeMillis()

// 先将所有超时的workerinfo过滤出来,然后删除

workerInfos.filter(workerInfo=>(nowTime-workerInf

o.lastHeartBeat ) > 6000))  .foreach(workerInfo=>w

orkers.remove(workerInfo.id))

println("当前有"+ workers.size+"个 worker存活的")

}

以上就是定时检测超时walker并删除任务的所有内容。

相关文章
|
SQL 运维 关系型数据库
一款 SQL 自动检查神器,再也不用担心 SQL 出错了,自动补全、回滚等功能大全
一款 SQL 自动检查神器,再也不用担心 SQL 出错了,自动补全、回滚等功能大全
343 0
|
域名解析 缓存 负载均衡
在Linux中,自定义解析域名的时候,可以编辑哪个⽂件?是否可以⼀个ip对应多个域名?是否⼀个域名对应多个ip?
在Linux中,自定义解析域名的时候,可以编辑哪个⽂件?是否可以⼀个ip对应多个域名?是否⼀个域名对应多个ip?
|
Ubuntu Linux Shell
C++ 之 perf+火焰图分析与调试
【10月更文挑战第8天】在遇到一些内存异常的时候,经常这部分的代码是很难去进行分析的,最近了解到Perf这个神器,这里也展开介绍一下如何使用Perf以及如何去画火焰图。
217 1
|
前端开发 JavaScript API
MonacoEditor 加载很慢该怎么优化?
MonacoEditor 加载很慢该怎么优化?
2655 0
|
12月前
|
Web App开发 算法 安全
什么是阿里云WoSign SSL证书?_沃通SSL技术文档
WoSign品牌SSL证书由阿里云平台SSL证书合作伙伴沃通CA提供,上线阿里云平台以来,成为阿里云平台热销的国产品牌证书产品。
2332 2
|
存储 Prometheus 监控
Redis 调优指南:提高性能和稳定性的全面策略
Redis 调优指南:提高性能和稳定性的全面策略
881 0
|
开发框架 前端开发 JavaScript
前后端分离,Asp.net core webapi 如何配置跨域
前后端分离,Asp.net core webapi 如何配置跨域
361 0
|
缓存 Java Spring
Guava缓存工具类封装和使用
Guava缓存工具类封装和使用
360 0
|
机器学习/深度学习 数据可视化 TensorFlow
Python深度学习基于Tensorflow(4)Tensorflow 数据处理和数据可视化
Python深度学习基于Tensorflow(4)Tensorflow 数据处理和数据可视化
175 3
|
存储 SQL NoSQL
案例实践:某券商从 Neo4j 迁移至悦数图数据库
许多国内企业在早期使用 Neo4j 作为图相关业务场景的探索基础设施。然而,随着业务发展和环境变化,原有的图数据库已经逐渐无法满足不断发展的特定业务场景需求。如何将 Neo4j 迁移到一款兼具良好扩展性、性能、专业服务能力的图产品,已成为业界普遍关心的问题。本文将为您呈现华东某大型券商从 Neo4j 迁移至悦数图数据库的选型、迁移和提升能力的完整历程,供广大企业参考。
案例实践:某券商从 Neo4j 迁移至悦数图数据库