Master 检测心跳并删除超时的 Worker | 学习笔记

简介: 快速学习 Master 检测心跳并删除超时的 Worker

开发者学堂课程【Scala 核心编程 - 进阶Master 检测心跳并删除超时的 Worker学习笔记,与课程紧密连接,让用户快速学习知识。

课程地址https://developer.aliyun.com/learning/course/610/detail/9126


Master 检测心跳并删除超时的 Worker


内容介绍:

一、功能要求

二、思路分析

三、程序代码

四、总结


一、功能要求

Master 启动定时任务,定时检测注册的 worker 中有哪些 worker 没有更新心跳。对于已经超时的 worker,Master 将其从 hashmap 中删除。


二、思路分析

在之前的课程中,我们曾搭建过一个关于 Master 和 Worker 的程序关系框图,这节课我们将继续结合之前搭建的程序关系框图来理解 Master 启动定时任务的原理和过程,在该分析过程中不考虑客户端,仅分析服务端即可。

如下图:

image.png

Master 维护了很多 hashmap,而 hashmap 中有很多 worker信息。

如图上所示,workers即 所有的 map 其中管理了许多 worker,其中包含了这些worker的信息,如图中的 worker id1信息、worker id2信息、worker id3信息等。

Hashmap 与这些 workers 是相关联的。假如客户端对应的worker已经许久没有更新心跳,而 Master 管理的 worker 应有实际意义,没有心跳就失去了其意义,此时可以将其清空。

要想将其清空,Master 需要进行相关处理,即定时检测管理的 worker 的心跳是否正常。如果心跳已经超时,即处于异常状态,此时就需要删除任务。

因此,在 Master 中应有自行启动定时任务的程序,让其反复地检测worker的心跳是否正常,是否超时,我们会启动定时检测的任务。在该图上用深绿色标注定时任务定时器。

直至此时,我们所完成功能与跟客户端没有任何关系,也就是说现在检测任务只是发生在服务器端。现在的问题在于如何去启动定时器?

应该在图中“start表示Master工作”部分启动较为合适,一旦Master启动,就立即检测,从此处开始触发条件命令服务器开始执行检测任务或者命令Master启动定时任务。


三、程序代码

从左侧的项目栏里快速找到Master,代码如下:

package com.lxb.sparkmasterworker.master

import akka.actor.{Actor, ActorRef, ActorSystem, Props}

import com.atguigu.akka.sparkmasterworker.commen.

import com.typesafe.config.ConfigFactory

import scala.collection.mutable

//引入一个需要的包

import scala.concurrent.duration.

Class SparkMaster extends Actor

//定义一个hm,管理workers

val workers = mutable.Map[string,WorkerInfo]()

override def receive: Receive = {

case "start" =>  {

println("master服务器启动了...")

//从这里开始正式编写代码

//我们曾经预设了两个协议,可以直接使用,也可以后期简化并作一步

//MessageProtocol.scala

//master给自己发送一个触发检查超时worker的信息

case object StartTimeOutWorker

//master给自己发消息,每隔一段时间检测worker,对于心跳超时的,执行删除命令

case object RemoveTimeOutWorker

//以上协议命令是按照Spark格式书写的,可以直接简化定时出发RemoveTimeOutWorker 请求即可。这两条命令都是object,说明其中不用带有信息,只是说明应执行的任务即可。

//接下来开始书写正式的代码

self  ! StartTimeOutWorker  // Master发送消息给自己

case RegisterWorkerInfo(id,cpu,ram) = > {

//接收到Worker的注册信息

If(!workers.contains(id)) {

//创建workerInfo 对象

val workerInfo =new WorkerInfo(id,cpu,ram)

//加入到workers

workers + = ((id,workerInfo))

println("服务器的workers="+workers)

//回复一个消息,说注册成功

send ()!RegisterWorkerInfo

}

}

}

case HeartBeat (id) => {

//更新对应的worker的心跳时间

//从worker取出workerInfo

val  workerInfo = workers(id)

workerInfo.lastHeartBeat =System.currentTimeMillis()

println (“master更新了”+ id +“心跳时间…”)

}

//该段命令与 self  ! StartTimeOutWorker 承接,为了不破坏命令请求的执行,将其放在最后面。

case StartTimeOutWorker = >

println(“开始了定时检测worker心跳的任务)

//如果收到了StartTimeOutWorker 信息,则说明此时要准备开始执行检测任务。而该阶段后期可能还要执行一些单独的工作,所以与前面的代码分开来写有一定的好处

//如何定时启动?

思路与原先相同,原先用到了dispatcher转换,此处只要将原先

的代码略作修改即可。

//首先引入一个需要的包(import scala.concurrent.durati

on. 为了方便读取,放在了代码最开始)

import context.dispatcher

//说明

// 1.  0millis:表示不延时,立即执行定时器

// 2.   9000millis:表示每9秒执行一次

// 3.   self:表示发给自己

// 4.  RemoveTimeOutWorker发送的内容

context.system.scheduler.schedule(0 millis,9000 mil

lis,self,RemoveTimeOutWorker)        

// 此处将原本的HeartBeat改为了RemoveTimeOutW

orker。

且将原本的3秒改为9秒,因为worker原本3秒更新一次数据,而如果3秒检测一次间隔时间会偏短,可以适当延长检测的时间,如9秒,这几秒钟的延迟不会影响检测结果。StartTimeOutWorker 被触发一次,定时器就启用了。

}

//启用过后,对RemoveTimeOutWorker消息进行处理

case RemoveTimeOutWorker=>{

//由于该消息是一个object ,因此可以直接写入

//这里需要检测哪些worker心跳超时,并从map中删除。这里

需要充分利用scala函数式编程的特点。

//思路:

首先我们需要考虑何谓超时,所谓超时是指此时检测的时间减去上一次数据更新时间,若这个差值超过了某一范围,即称为超时。理论上这个值只要大于3秒即为超时,但是考虑到网络延时等原因,可以将其设为6秒,且一般情况下,服务器应当在同一机房,因此这3秒的延时对于数据的整体影响不大。

也就是now - lastHeartBeat > 6000即算为超时。          

// 首先将workers所有的workerinfos信息取出来(val workers = mutable.Map[string,WorkerInfo]),因为 WorkerInfo 的 id 与string相同,所以我们这里不考虑其他信息

val workerInfos = workers.values    

// 获取当前时间now

val nowTime = System.currentTimeMillis()

// 先将所有的超时的workerinfo过滤出来,然后删除

workerInfos.filter(workerInfo=>(nowTime-workerInf

o.lastHeartBeat ) > 6000))  .foreach(workerInfo=>work

ers.remove(workerInfo.id))

//若条件“两个值的差值大于6000”一旦被满足,该worker就会被过滤出来删除

//该段代码可能较难理解,简言之,可以理解为workerInfos是当前map中的所有workerInfo,也就是之前图示中表示的各worker的“信息”(不包含id),

通过workerInfo=>(nowTime-workerInfo.lastHeartBeat) >

6000函数将满足该条件的“信息”过滤出来,这部分“信息”即是要准备删除的内容,且该函数中的 workerInfo是从workerInfos取出的。

foreach(workerInfo=>workers.remove(workerInfo.id))是对过滤出的信息组合为一个集合的操作,

且此处的workerInfo仅为组合的集合的名称,可以进行更改,如改为foreach(worker=>workers.remove(worker.id))亦可。

前面的函数整的workerInfo同样如此,且这两个函数中的值亦无关。

println("当前有"+ workers.size+"个 worker存活的")

}

}

}

//运行代码检查代码有无bug,当然写完整段代码无需检测客户

端,因为目前检测任务与客户端没有任何关系。且此时两个时间分

别设为了9秒和6秒,是在基于网络延时的考虑下将时间设置的

宽裕了一些,更为安全。

关闭原先的 SparkMaster 与 SparkWorker,启动 SparkMaster,检测结果为“当前有0个 worker 存活的”,9秒之后再次弹出一个检测结果。再启动SparkWorker,结果显示该 SparkWorker每3秒钟发送一次心跳。每发送3次后,显示“当前有1个 worker 存活的”。

此时若使得S parkWorker 宕机,最先显示“当前有1个 worker 存活的”,接下来的检测结果显示“当前有0个worker存活的”。当然,启动两个 Master 也可以解决该问题。


四、总结

对于 Master 检测心跳并删除超时的worker这部分,我们进行了功能说明和思路分析,最后通过代码实现了这一功能。

对于思路分析,我们只分析服务端,具体分析见本讲义中第二部分中的示意图。

对于代码实现部分,我们仅是添加了两个协议,并修改服务器端的代码。

//更新了MessageProtocol.scala,即

//master给自己发送一个触发检查超时worker的信息

case object StartTimeOutWorker

//master给自己发消息,每隔一段时间检测worker,对于心跳超时的,执行删除命令

case object RemoveTimeOutWorker

//这里只是为了套用Spark的格式,相当于做了过渡,也可以可以直接调用RemoveTimeOut,更加简洁。

//更新了SparkMaster.scala,即增加了两部分内容:

case "start" =>  {

println("master服务器启动了...")

//从这里开始正式编写代码

self  ! StartTimeOutWorker  

}

case StartTimeOutWorker = >

println(“开始了定时检测worker心跳的任务)

import context.dispatcher

//说明

// 1.  0millis:表示不延时,立即执行定时器

// 2.   9000millis:表示每9秒执行一次

// 3.   self:表示发给自己

// 4.  RemoveTimeOutWorker发送的内容

context.system.scheduler.schedule(0 millis,9000 mil

lis,self,RemoveTimeOutWorker)

//对 RemoveTimeOutWorker 消息进行处理

//这里需要检测哪些worker心跳超时(now - lastHeartBeat > 6000),并从map中删除。

case RemoveTimeOutWorker=>{

// 首先将workers所有的workerinfos信息取出 val

workerInfos = workers.values

val nowTime = System.currentTimeMillis()

// 先将所有超时的workerinfo过滤出来,然后删除

workerInfos.filter(workerInfo=>(nowTime-workerInf

o.lastHeartBeat ) > 6000))  .foreach(workerInfo=>w

orkers.remove(workerInfo.id))

println("当前有"+ workers.size+"个 worker存活的")

}

以上就是定时检测超时walker并删除任务的所有内容。

相关文章
|
8月前
|
Java Windows
JavaWebSocket心跳机制详解
WebSocket是一种在Web浏览器和服务器之间进行全双工通信的协议,它提供了一种简单而强大的方式来实现实时数据传输。在使用WebSocket时,心跳机制是非常关键的,它能够保持连接的稳定性并及时发现连接的异常。本文将详细解释JavaWebSocket心跳机制的实现原理和步骤。
239 0
|
6月前
|
NoSQL Redis 容器
Redis集群报错cluster_state:fail,如何解决并重新恢复集群(IP问题/ slot未完全分配问题)
Redis集群报错cluster_state:fail,如何解决并重新恢复集群(IP问题/ slot未完全分配问题)
110 0
|
9月前
|
关系型数据库 MySQL Shell
检查MySQL主从同步状态
检查MySQL主从同步状态
64 1
|
分布式计算 Scala Spark
Spark worker 定时更新心跳 | 学习笔记
快速学习 Spark worker 定时更新心跳
152 0
Spark worker 定时更新心跳 | 学习笔记
|
网络协议 Linux Scala
指定 Master 与 Worker 的启动参数 | 学习笔记
快速学习指定 Master 与 Worker 的启动参数
104 0
心跳 —— 超时机制分析
在C/S模式中,有时我们会长时间保持一个连接,以避免频繁地建立连接,但同时,一般会有一个超时时间,在这个时间内没发起任何请求的连接会被断开,以减少负载,节约资源。并且该机制一般都是在服务端实现,因为client强制关闭或意外断开连接,server端在此刻是感知不到的,如果放到client端实现,在上述情况下,该超时机制就失效了。本来这问题很普通,不太值得一提,但最近在项目中看到了该机制的一种糟糕的实现,故在此深入分析一下。
762 0
心跳 —— 超时机制分析
Giraph源码分析(四)—— Master 如何检查Worker启动成功
本文的目的 说明Giraph如何借助ZooKeeper来实现Master与Workers间的同步(不太确定)。 环境 在单机上(机器名:giraphx)启动了2个workers。 Giraph遵从单Master多Workers结构,BSPServiceMaster使用MasterThread线程来进行全局的同步。
|
MySQL 关系型数据库
master/slave 相同server_id引起的同步失败
昨天在做MySQL SwitchOver遇到一个诡异的想象,切换前后的结构图如下: 当我把一切都切换好之后,应其他需求,重启了04上的mysql,然后show slave status\G发现报错: Last_IO_Error: Fatal error: The slav...
919 0
|
NoSQL 测试技术 Redis
Redis集群master选举时长测试
在一台物理机上启动6个Redis实例,组成3主3从集群,端口号依次为:1379 ~ 1384,端口号1379、1380和1384三个为master,端口1379的进程ID为17620。现将进程17620暂停(发送SIGSTOP信号),观察集群发现故障时长,和主从切换时长。
2246 0