开发者学堂课程【Scala 核心编程 - 进阶:Spark worker 注册功能完成】学习笔记,与课程紧密连接,让用户快速学习知识。
课程地址:https://developer.aliyun.com/learning/course/610/detail/9124
Spark worker 注册功能完成
首先完成 walker 的注册功能,完成后要接收到 walker 注册成功的消息,程序图如下。
程序图有指导意义,稍后使用。接下来开始走代码,打开先建一个文件夹,新建一个包,名字叫做 sparkmasterworker ,同样的道理,也建三个包,先写一个测试的稍后会用到。
先建第一个包,是 surver 端,即 master 里。另外一个是客户端,直接叫 master 也可以。另一个叫 worker ,还有一个通用的部分是 common ,因为他要通讯好,这样就分成三个部分。三个部分,先写master ,第一个叫Sparkmaster ,给它一个类。然后仍然是 Object ,后面换成主函数。
然后继承一个 actor ,继承后去重写这个方法,把基本的代码走完。走完代码后,先创建ActorSystem ,这个配置的IP地址,沿用之前的即可。
直接写为 config ,里面有IP地址和端口,端口是1005。稍后全部改成灵活的部分,将它引一下,然后创建 ActorSystem ,叫Sparkmaster ,把 config 传进入,传入后写一个 val ,前面叫SparkMasterSystem ,写完之后创建一个Sparkmaster-actor ,因为它不是主动方,所以无需先持有对方的引用,也拿不到,即人家找你可以,你找别人是找不到的。
然后sparkMasterSystem.加位置位置,这些代码前面见过,不新鲜,直接拿过来起个名字,叫 sparkmaster ,然后引入. val ,里面要引入它的引用,现在先启动sparkMaster ,sparkMasterRef 发出一个消息叫 start ,写这个代码case start ,如果接收到一个消息“ master 服务器启动了”,等待别人连接,这个启动这段配置先简单测试一下有没有跑起来,运行过程显示目前已经启动了,1005也处于监听状态,没有问题。
package com.atguigu.akkasparkmasterworkermaster
import akka.actor.Actor
class SparkMaster extends Actor{
override def receive:Receive={
case "start" => println("master服务器启动了...")
}
object SparkMaster {
def main(args: Array[string]): Unit = {
//先创建Actorsystem
val config=ConfigFactory.parsestring(
S"""
|akka.actor.provider="akka.remote.RemoteActorRefProvider"
|akka.remote.netty.tcp.hastname=127.0.0.1 |akka.remote.netty.tcp.port=10005
""".stripMargin)
val sparkMastersystem =Actorsystem("sparkMaster",config)
//创建SparkMaster -actor
val sparkMasterRef =sparkMastersystem.actorof(Props[sparkMasterl, "sparkMaster"
//启动sparkMaster
sparkMasterRef !"start"
}
}
继续写,因为离我注册成功还有很多步,拿到的码先分析,不要乱写,出现了问题,worker一旦启动会发出一个注册的消息,而服务器会管理很多客户端,显然应该有 map 来管理,那 map 该放在哪里比较合适?
是放在 actor 里,而且是可变的 HashMap ,有一个mutable.HashMap 的属性,不可忘记。由它管理和维护一系列的 worker ,有多少都可管理,也有机制,如最多一个 master 管1000个,如果再多,发一个协议,请找另外一个,不接收。
拿到IP地址后找另外一个 master ,学 reduce 时知道哨兵机制,还有检测机制,心跳机制都是有的,其实底层都相同。
写完过后有了这个机制,应该写 worker ,打开 worker 写这个代码,同样写一个SparkWorker 。
直接写 object SparkWorker ,以主函数的方式来做,同样继承 actor ,另一边要实现 receive 的方法,偏函数也习惯写上去,接下来仍然是和小黄鸡客服的服务器一样的道理,前面IP地址已有,配置直接拿来先用,首先是walkerhost ,是本地的IP地址,也写 walkerPort ,
这边端口按顺序,如1001,1002,1003,但是不能1005,1005其实也可以用,因为不在同一个机器上,理论是可以用的,下面还用大家熟悉的方式,这里写的简单一点,直接把这个写出,但是和前面讲的不太一致,其实是一样的,相当于说我直接把 masterURL 写死了,这样肯定不好,所以还改为原先的较灵活的方式。
写为 masterHost ,mustarHost 目前是127.0.1,注意将来这个参数是要传入的,要根据实际情况来定,因为没有两台机器无法演示,将来此参数是要灵活的在命令行里面输入好,紧接着再写一个masterPort ,刚才 master 的端口写的为1005,因此这里也写1005。
这边引入一个包包,下面代码就创建actor system ,这个较容易,同样取个名字,跟前面比对叫Sparkworker ,为了以示区别呢,写个01,理论上可以,也可以不行,因为将来两个 worker 是不一样的,所以写了同一个名字,在不同机器上也没有问题啊,然后把 config 写入,然后 var,找到 sparkWorkerSystem 。
下面显然去创建 sparkwalker的引用,actor 的引用,和刚才的思路一样,serverHost:String ,叫 master不叫 server ,叫master更明确一点,masterHost 主机和masterPort ,写一个 int ,写完可以看到应该有一个值叫 master 的代理,取这个名字,在这里它叫 Porxy 代理,这种名称其实就是前面讲的 refer ,所以底层源码看到是 Porxy 代表 ref 。
同样的道理,另一边叫ActorSelection ,写一个下划线。同样接着进行初始化任务,初始化master的代理,这个代理就是引用,只是名称发生了变化,Proxy是底层的代理,就是master的代理及引用ref。做起来非常简单,masterproxy直接使用context. ,以前用过这个方法叫路径,就可以用刚才看到的路径拿来所用,后面再把它做灵活一点,后面写上 user ,稍后不要忘记换个名字吧,叫01,因为将来master可能有多个,因此换一个叫 sparkmaster-01,写上01就找到了,找到后要做一个处理。
前面有个错误,不应该是 val ,应该是 var,现在要替换一下,因为这两个值是传进去的,所以写masterHost ,是 master 的一端口将其扔进去,这是初始化任务。服务器端要发出 star 的请求,开始运行,因此创 sparkwalker 的引用或代理,如何去创建呢?
刚才 spark. actorOf ,Props 用 new 的方式处理,因为要传东西进去,所以 sparkwalker 传入 masterHost 和 masterPort 的端口,取名字叫 sparkwalker-01 ,这是第一个,此时.vr ,得到 actorRef 的引用啊,写全一点叫 sparkwalker 的引用,更清晰一点,稍后启动 actor ,较简单,写一个 start 启动了,启动后要处理得到的 start ,此时工作任务有两个,第一个先提示启动了,第二个发出一个注册的信息,提示一句话“ walker 启动了”,因为代码一个地方错了都会出问题,因此先运行确定能否打印出,而且 masterproxy是否正确,做一个简单的验证,写一部分测一下,写一个 prestart 启动测试,调用检测代码是否正确的运行,保持所有,先启动 sparkmaster 是没有任何的问题,再启动walker 也没有任何问题,地址也没有问题,继续往下走,发出一个注册请求,继续写协议了,spark 底层就是这么写的,那spark 底层发送消息,要订协议就按照规范来定义,协议前面已经解释过直接拿来用,暂时定了两三个协写入Messageprotocol ,第一个协议有一个registerworkerinf ,是注册时发给服务器阐明ID是多少号,CPU有多少,内存多少,是人为指定的,也可以加其他信息,如服务器的IP地址是什么,也加些其他信息,这是 walker 信息。第二个是 workerinfo ,目前该workerinfo 和RegisterWorkerinfo 完全一样,这个信息将来是保存到master 中的 HashMap 中,因为将来要管理需有 master 信息,不可以直接放进去,因为将来信息还会有扩展。
例如,将来一定会保存该worker 的最近一次心跳,所以把它分开写,是一个合理的现象,包括 spark 里层,也是这样去分别。该hashmap 用于管理 worker ,将来这个信息会扩展,将来这个是会被扩展的,如增加 worker 上一次的心跳时间。
没有写 case 是因为没有必要写,因为在整个通讯机子里,该 workerinfo 不参与,相对于到里面重新打一个对象放进去,因此没有加 case ,这两个是协议,相当于会用到的类型系。下面加了 object ,加了最大的区别是不需要再加字段了。RegisteredWorkerinfo 仅仅表明注册成功,如果一旦注册成功,服务器打一个这样的实例,即当 worker 注册成功,服务器返回一个这样的对象。因此用的是 object ,但如写 class 也没问题,有点多余。
package com.atguigu.akka.sparkmasterworker.common
//worker注册信息 //MessageProtocol.scala
case class RegisterWorkerInfo(id: String,cpu: Int, ram: Int)
//这个是workerInfo,这个信息将来是保存到master的hm(该hashmap是用于管理worker)
//将来这个workerInfo会扩展(比如增加worker上一次的心跳时间)
class workerInfo(val id: strine, val cou: Int, val ram: Int
//当worker注册成功,服务器返回一个RegisteredworkerInfo 对象
case object RegisteredWorkerInfo
接下来发一个注册消息,走的这个协议为RegisterWorkerinfo ,信息发出去在 walker 里,需要发个消息,因为已经持有引用了,怎么发都可。刚才写的registerwalkerinfo 这个信息,一共有三个参数要传入,第一个是ID,做一个随机数传入,做成一个字符串的,一般做成一个随机数,也可以做其他。
如 val id =java.util.UUID.randomUUID。这里ID是随机的,将ID填入。CPU随便写如CPU是16,内存为16个g,1024,这样就传过去了。
系统出现报错,前面写错了,要 import ,就发到了服务器。经过网络发到了服务器,经过底层的消息机制,即可接收到 worker 发送的注册信息,取得注册信息后,打开 master ,接收到register 的注册信息,信息取到,因为用的是对象提取器, CPU ,还有内存,现在接收到客户端 worker 的注册信息,将它装入到 Hashmap 中,因为要管理,因此定义一个 hm 去管理 workers ,所以名字叫val workers = mutable map ,map 要指定一个类型,第一个是 string ,做ID;第二个存入 workerinfo ,里面保存的是每一个worker 的信息,引相应的包好,要引可变的包。
此时有了 Hashmap ,也知道要点,因此接到 worker 信息后要先判断里面有没有,即 workers . contains 里面是否有传入的ID,若没有则要注册进去,注册时先创建一个 workerinfo 对象,注意此时看起来 ID ,CPU 一样,但是将来要扩展,所以一般底层都会分开,直接写个 workerInfo = new workerinfo ,这里是另一个,因为不是一类,将 ID,CPU ,内存传入,CPU,然后加入到 workers ,workers +=( ),将传入的ID加入,workerinfo 是放进去的信息,切记要加括号,因为括起来才代表一个原宿,如果二项组放入不匹配。之前笔记讲解过,可以用箭头的方式。这个 workers 就放进去了,切记写完之后,还需回一个注册成功, spark 底层给 worker 回复消息,即回消息说注册成功。
直接 sender ( ) !RegistereWorkerInfo ,引包就可以了。返回后拿到 worker 会得到信息,如果拿到RegistereWorkerInfo 则注册成功,则说明是谁,注册成功,即 workerid =“+id +“注册成功”,把 Hashmap 打出看有多少东西,如目前有多少个 workers 。
进行运行先启动 master ,启动后再启动 worker ,worker 启动后去注册,注册成功得到消息注册成功。运行后确实返回了一个这样的信息注册成功,在 master 也启动成功。注册信即完成。
本节课第一个功能完成了代码的实现,思路图如下
具体代码实现结构分别呢写了三个文件,一个 message ,一个 spark ,一个 worker
具体代码如下
//SparkMaster.scala
package com.atguigu.akka.sparkmasterworker.master
import akka.actor.{Actor, ActorSystem, Props}
importcom.atguigu.akka.sparkmasterworker.common.{RegisterWorkerInfo,Registered WorkerInfo WorkerInfo}
import com.typesafe.config.ConfigFactory
import scala.collection.mutable
class SparkMaster extends Actor {
//定义个 hm.管理 workers
val workers = mutable.Map[String, WorkerInfo ]( )
override defreceive: Receive ={
case "start" => println("master 服务器启动了..." )
case RegisterWorkerInfo(idcpuram)=>{
//接收到worker注册信息
if(!workers.contains(id)){
//创建WorkerInfo 对象
val workerInfo = new WorkerInfo(id,cpu,ram)
//加入到 workers
workers +=((id,workerInfo))
println("服务器的 workers-" + workers)
//回复一个消息,说注册成功
sender)!RegisteredWorkerInfo
}
}
object SparkMaster {
def main(args: Array[String]): Unit={
//先创建 ActorSystem
val config =ConfigFactory.parseString
s"""
|akka.actor.provider="akka.remote.RemoteActorRefProvider"
|akka.remote.netty.tcp.hostname=127.0.0.1 |akka.remote.netty.tcp.pot=10005
""".stripMargin)
val sarkMasterSvstem=ActorSvstem("SparkMaster". config)
//创建SparkMaster-actor
valsparkMasterRef=sparkMasterSystem.actorOf(PropsSparkMaster],"SparkMaster-01")
//启动 SparkMaster
sparkMasterRef!"start"
}
}
//SparkWorker.scala
import akka.actor.{Actor,ActorSystem,Props}
importcom.atguigu.akka,sparkmasterworkercommonRegisteredworkerInfo
import com.typesafeconfig.ConfigFactory
class sparkworker(masterHost:String,masterPort:Int) extends Actorf
//masterProxy是Master的代理/引用ref
var masterporxy:Actorselection =_
val id = java.util.UUID.randomUUID().tostring
override def prestart(): Unit = {
println("prestart()调用")
//初始化masterPorxy
masterPorxy = context.actorSelection(s"akka
.tcp://sparkMaster@${masterHost}:${masterPort}/user/SparkMaster-01")
println("masterProxy=" + masterPorxy)
}
override def receive:Receive = {
case "start" => [
printin("worker启动了")
//发出一个注册消息
masterPorxy ! RegisterworkerInfo(id, 16, 16 * 1024)
}
case RegisteredWorkerInfo => {
println("workerid="+id +"注册成功~")
}
}
}object sparkworker {
def main(args: Array[String]):Unit={
val workerHost ="127.0.0.1" val workerport =10001
val masterHost ="127.0.0.1" val masterport =10005
val config=ConfigFactory.parsestring(
s’’’’’’
val spark WorkerRef sparkWorkerSystem.actorOf(Props(new SparkWorker(masterHost
masterPort)),"SparkWorker-01")
//启动 actor
sparkWorkerRef! "start"
}
}
}