Spark worker 定时更新心跳 | 学习笔记

简介: 快速学习 Spark worker 定时更新心跳

开发者学堂课程【Scala 核心编程 - 进阶Spark worker 定时更新心跳学习笔记,与课程紧密连接,让用户快速学习知识。

课程地址https://developer.aliyun.com/learning/course/610/detail/9125


Spark worker 定时更新心跳


前一节完成到了 sparkworker 可以向 master 完成注册,服务器可以将这个worker 保存到它的 map 中,这是第一个功能。

image.png

下面来完成第二个功能,即 worker 定时发送心跳,因为 walker 在工作的过程中,有可能因为网络原因或 worker 本身运行的原因导致瘫痪或宕机,作为服务器的管理者,应知道哪一台 worker 被宕机,甚至可以将宕机的模块进行再次的管理,因此要实现 worker 可以定时发送心跳给 master 的效果,master 接收到以后要更新 worker 上一次的心跳时间,需在示意图中加逻辑,即worker 会定时发送心跳,定时发送的信息假设确定为三秒,这个时间可以设置。

根据前期的业务逻辑,发送信息也是通过网络传递的。发送心跳得到后,会进行处理,即接收到处理心跳时间的更新任务后,从 map 中取出该 worker 更新上一次心跳时间。

显然需要定义新的协议,按照顺序走,首先要解决的问题是如何完成定时的发送心跳,因此要做一个定时器,做定时干一件事情的任务,引出定时器的问题。

如下图为代码大体的流程,worker 到时间会写context system,里面有个scheduler,有四个参数稍后解释,先发一个SendHeartBeat 的地址,这里3000毫秒等于三秒,三秒去发一个 SendHeartBeat 的消息,这个消息会触发 case 语句,然后用masterProxy 发送一个 HearBeat 给服务器,而 SendHeartBeat 是触发上述行为,因此至少目前需要定义两个,一个是SendHeartBeat ,一个是 HearBeat ,SendHeartBeat 是一个样例类,拿到以后进行更新即可。

image.png

代码非常简单,具体如下:

首先在 Message 里增加两个协议,这个协议是 worker 每隔一定时间由定时器发给自己一个消息,因为是表示出发,因此用 object 。下面是 worker 每隔一定时间由定时器触发,向 master 发送的协议消息,这个消息要带上自己的ID号。这里, workerinfo 要做一个改进,因为目前为止这个 workerinfo 里面是没有包含记录上次心跳的时间,因此要加一个信息,写一个car lastHeartBeat 即上一次的心跳的时间,选择 Long 这个类型,默认0,也可以给一个默认值或直接用当前的时间来做。

获取当前时间也很简单,有一个方法叫 system current time ,就可以拿到,因此也可以初始化,system.currentTimeMillis ,后面不停地更新。

思路有了之后,找到 worker 加代码就可以。触发这个行为最好在注册完毕后,当注册成功后就定义一个定时器,每隔一定时间发送 SendHeartBeat 消息给自己。

做一个注释,需要引入一个包,import scale.concurrent.duration. 里面有实验单位,这样用就没问题了。

SendHeartBeat 要引入,引完后这句话要做一个解释,做一点简短的说明,对于第一个参数0代表立即执行这个定时器,假设写了1000代表一秒以后再出发,写了0则表示立即出发,不延时立即执行定时器。

第二个3000毫秒表示每隔三秒执行一次,即每隔三秒发一次这个信息由系统定时器来完成的。

第三个 self 表示发给自己。

第四个 SendHeartBeat 即发送的内容。有了这些信息后,如果得到了这个信息,则向服务器发送协议,协议前面讲了为 heartbeat ,要带上自己的ID号,因为要知道是谁来进行更新,

因此写 masterporxy HeartBeat ,引入刚才写的 HeartBeat ,把事先有的ID传入,此时每隔一定时间就发消息过去了,可以打一个信息,worker=“+ID+“给 master 发送心跳”心跳。

另一边接收到心跳更新后,打开 master 做一个行为,做处理 case ,如果拿到了 heartbeat 这个信息,要带 id ,拿到了要更新对应的 worker 的心跳时间,要更新心跳时间,首先要拿到当前时间线的第一步从 workers 取出workerinfo ,从map里面取一个东西出来以前最简单的方式直接写就可以了, val 取出workerinfo ,取出后再获取当前时间,把当前时间附进去即可,workerinfo . lastHeartBeat =System . currentTimeMillis ,然后更新了,那更新我输出一句话服务器或 master 更新了,拿了谁谁谁的这个新跳master更新了ID的心跳时间。

核对逻辑是否正确,首先注册成功就触发了定时器,每隔三秒发一个 heartbeat ,heartbeat 在 master 里拿到过后,将对应的workinfo 取出,然后将上一次心跳时间进行更新,并输出一句话。

下面测试代码能否正确的运行,先运行master ,发现提示了服务器已经启动,没有问题。

继续启动 worker ,启动以后发送了一个心跳,每隔三秒发送一次心跳给master , 而master 在对应的时间也更新了心跳。

将代码简单的整理如下:

1.功能要求: worker 定时发送心跳给 Master , Master 能够接收到,并更新 worker 上一次心跳时间。

2.思路分析(程序框架图)

image.png

3.代码实现

(1)//在MessageProtocol.scala中增加了对应的协议

// 这个是 WorkerInfo,这个信息将来是保存到master的 hm(该 hashmap 是用于管理 worker)

// 将来这个WorkerInfo会扩展(比如增加worker上一次的心跳时间)

class WorkerInfo(val id: Stringval cpu: Intval ram:Int) {

varlastHeatBeat:Long=Svstem.curentTimeMillis

}

//当worker 注册成功,服务器返回一个RegisteredWorkerInfo对象

case object RegisteredWorkerInfo

//worker 每隔一定时间由定时器发给自己的一个消息

case object SendHeartBeat

//worker 每隔一定时间由定时器触发,而向 master 发现的协议消息

case class HeaitBeatid: String)

(2) //更新SparkWorker.scala

case RegisteredWorkerInfo=>{

println("workerid="+id+"注册成功~")

//当注册成功后,就定义一个定时器,每隔一定时间,发送 SendHeartBeat 给自己

import context.dispatcher

//说明

//1.0 millis 不延时,立即执行定时器

//2.3000 millis 表示每隔3秒执行一次

//3.self:表示发给自己

//4.SendHeartBeat 发送的内容context.system.scheduler.schedule(0 millis,3000 millisselfSendHeartBeat)

}

case SendHeartBeat=>{

println("worker="+id+"给master发送心跳")

masterPoixy ! HeatBeat(id)

}

(3) //更新 SparkMaster.scala

case HeartBeat(id) =>{

//更新对应的worker 的心跳时间

//1.从 workers 取出workerInfo

val workerInfo =workers(id)

workerInfo.lastHeartBeat=Svstem.currentTimeMillis( )

println("master 更新了"+id+"心跳时间.….")

}

相关文章
|
4月前
|
机器学习/深度学习 分布式计算 API
技术好文:Spark机器学习笔记一
技术好文:Spark机器学习笔记一
30 0
|
5月前
|
Python
【已解决】Caused by: org.apache.spark.SparkException: Python worker failed to connect back.
【已解决】Caused by: org.apache.spark.SparkException: Python worker failed to connect back.
199 0
|
分布式计算 大数据 Spark
高级特性_闭包_Spark 闭包分发 | 学习笔记
快速学习 高级特性_闭包_Spark 闭包分发
高级特性_闭包_Spark 闭包分发 | 学习笔记
|
分布式计算 大数据 Spark
Spark 原理_总体介绍_物理执行图 | 学习笔记
快速学习 Spark 原理_总体介绍_物理执行图
107 0
Spark 原理_总体介绍_物理执行图 | 学习笔记
|
分布式计算 大数据 数据处理
Spark 原理_总体介绍_逻辑执行图 | 学习笔记
快速学习 Spark 原理_总体介绍_逻辑执行图
116 0
Spark 原理_总体介绍_逻辑执行图 | 学习笔记
|
分布式计算 大数据 调度
Spark 原理_总体介绍_集群环境 | 学习笔记
快速学习 Spark 原理_总体介绍_集群环境
Spark 原理_总体介绍_集群环境 | 学习笔记
|
分布式计算 大数据 Spark
Spark 原理_总体介绍_概要 | 学习笔记
快速学习 Spark 原理_总体介绍_概要
Spark 原理_总体介绍_概要 | 学习笔记
|
分布式计算 Hadoop 大数据
Spark 原理_总结介绍_案例编写 | 学习笔记
快速学习 Spark 原理_总结介绍_案例编写
108 0
Spark 原理_总结介绍_案例编写 | 学习笔记
|
分布式计算 大数据 调度
Spark 原理_运行过程_总结和流程 | 学习笔记
快速学习 Spark 原理_运行过程_总结和流程
105 0
Spark 原理_运行过程_总结和流程 | 学习笔记
|
缓存 分布式计算 Hadoop
Spark 原理_运行过程_概念介绍 | 学习笔记
快速学习 Spark 原理_运行过程_概念介绍
121 0
Spark 原理_运行过程_概念介绍 | 学习笔记
下一篇
无影云桌面