指定 Master 与 Worker 的启动参数 | 学习笔记

简介: 快速学习指定 Master 与 Worker 的启动参数

开发者学堂课程【Scala 核心编程 - 进阶指定 Master 与 Worker 的启动参数学习笔记,与课程紧密连接,让用户快速学习知识。

课程地址https://developer.aliyun.com/learning/course/610/detail/9127


指定 Master 与 Worker 的启动参数


内容介绍:

一、功能要求

二、代码实现

三、代码运行

四、课后作业

五、板书与总结


一、功能要求

目前我们代码还存在一定问题,worker 一旦启用,其运行的参数都是固定的。

如 Master 的 IP 地址是指定的,端口也是指定的,而这显然不合理的,因为在将来部署的时候,不可以允许其他人来修改源代码。所以涉及到的灵活性的参数,就应该在运行时将其做成一个参数传入再运行。

本节课要完成的任务 Master, Worker 的启动参数运行时的指定,而不是固定写在程序中。


二、代码实现

1、先改动 master

(1)原代码:

object sparkMaster{

def main(args: Array[string]): Unit = {

val config = ConfigFactory.parsestring(

s"""

|akka.actor.provider="akka.remote.RemoteActorRefProvider

|akka.remote.netty.tcp.hostname=127.0.0.1

|akka.remote.netty.tcp.port=10002

""".stripMargin)

val sparkMastersystem=Actorsystem("SparkMaster",config)

val sparkMasterRef=sparkMastersystem.actorof(Props[Spa

rkMaster],"SparkMaster-01")

sparkMasterRef !"start"

}

}

先分析一下应该将哪些参数做成动态指定,首先host应该指定一下,而不是固定的127.0.0.1;端口port也是可以改变的;甚至名字"sparkMaster"也可以改定,如一台机器可以运行多个Master也是有的;"SparkMaster-01"也是可以改定的。因此,共有4个参数可做成动态参数,并将其指定。

刚刚我们分析出有四个参数,这里我们可以灵活处理,暂时将SparkMaster值固定,也就是说这里我们要指定三个参数,即host、port和SparkMasterActor。

(2)修改后的代码:

object sparkMaster{

def main(args: Array[string]): Unit = {

//参数判断

if(args.length !=3) {

println("请输入参数:host port sparkMasterAcotor名字")

sys.exit()

//不满足条件即退出程序

}

//满足条件,在其中定义值

val host =args(0)

val port =args(1)

val name =args(2)

val config = ConfigFactory.parsestring(

s"""

|akka.actor.provider="akka.remote.RemoteActorRefProvider

|akka.remote.netty.tcp.hostname=$(host)

|akka.remote.netty.tcp.port=$(port)

""".stripMargin)

val sparkMastersystem=Actorsystem("SparkMaster",config)

val sparkMasterRef=sparkMastersystem.actorof(Props[Sp

arkMaster],s"$(name)")

sparkMasterRef !"start"

}

}

2、改动 worker

(1)原代码:

object SparkWorker {

def main(args: Array[string]):Unit ={

val workerHost = "127.0.0.1"

val workerport = 10001

val masterHost = "127.0.0.1"

val masterport = 10005

val config=ConfigFactory.parsestring(

s"""

|akka.actor.provider="akka.remote.RemoteActorRefProvider" |akka.remote.netty.tcp.hostname=127.0.0.1 |akka.remote.netty.tcp.port=10002

""".stripMargin)

val sparkWorkersystem=Actorsystem("sparkWorker",config)

val sparkWorkerRef=sparkWorkerSystem.actorof(Props(ne

w SparkWorker(masterHost, masterPort)),"Sparkworker-01")

先来分析一共有几个参数要指定,至少应包含workerHost、workerport 、masterHost 与masterport 4个参数。

此外相关的名字也应指定,再增加SparkWorker-01与SparkMaster-01的名字,因此,至少有六个参数是待指定的。

(2)修改后的代码

object SparkWorker {

def main(args: Array[string]):Unit ={

if(args.length !=4){

printin("请输入参数:workerHost  workerPort workerName masterHost  masterPort  masterName")

sys.exit()

}

val workerHost = args(0)

val workerPort = args(1)

val workerName = args(2)

val masterHost =args(3)

val masterPort =args(4)

val masterName = args(5)

val config=ConfigFactory.parsestring(

s"""

|akka.actor.provider="akka.remote.RemoteActorRefProvider" |akka.remote.netty.tcp.hostname=${workerHost} |akka.remote.netty.tcp.port=${workerPort}

""".stripMargin)

val sparkWorkersystem=Actorsystem("sparkWorker",config)//val sparkWorkerRef=sparkWorkerSystem.actorof(Props (new SparkWorker(masterHost,masterPort)),"Sparkwork

er-01")

//该代码显示错误,因为值类型不匹配,一方需要Int,一方需要

String。且"Sparkworker-01"也需要替换,因此,可改为:val sparkWorkerRef=sparkWorkerSystem.actorof(Props (new SparkWorker(masterHost, masterPort.to Int,master Name)),s"$(workerName)")

 //此代码仍存在问题,因为在构建Sparkworker的actorof时,原本的代码是固定的(.tcp://SparkMaster@${masterHost}:${masterPort}/user/"SparkMaster-01")    

println("masterProxy="+masterProxy)),因此应在前面的代码中增加一个参数

class Sparkworker(masterHost:string,masterPort:Int,mast

erName:string)extends Actor{

//masterName:string是增加的参数,原本的代码修改为.tcp://SparkMaster@${masterHost}:${masterPort}/user/${masterName}


三、代码运行

启动三个 worker 和一个 master 来运行。首先进行Master的设置,先找到SparkMaster 的 config,点击 Edit configurations,点击 program arguments 进入。

1.输入 Worker 的信息

输入 IP 地址127.0.0.1,端口10001(若 Master 与 Worker 在同一台机器上,则两者端口不可以相同),名字 SparkWorker01。

2.输入 Master 的信息

输入IP地址127.0.0.1,端口10005,名字 SparkMaster01,点击ok即可。

3.关闭端口,启动 Master

运行结果显示“ .tcp://SparkMaster@127.0.0.1:10005”,表明端口一致。显示Master服务器启动了,并开始了定时检测worker心跳的任务。

4.启动 Worker

运行代码后发现代码无误,Worker 开始向 Master 发送心跳,在 Master 端开始检测到有1个存活的 Worker。

5.修改Worker的参数,启动第二个Worker

点击 Edit configurations,点击 program arguments进入,不要改动主机(Master)的参数,将端口改成10002,名称改为 SparkWorker02,保持不与之前的 Worke 冲突。点击 OK,再次启动 Worker,即启动的第二个Worker。

运行代码之后,发现该 Worker注册成功,并开始发送心跳。Master 端检测到了2个存活的 Worker,并不断更新。

6.修改 Worker 的参数,启动第三个 Worker

操作同上,不要修改主机的参数,将端口改成10003,名称改为 SparkWorker03。点击 OK,再次启动 Worker,即启动的第三个 Worker。运行代码之后,发现该Worker注册成功,并开始发送心跳。Master 端检测到了3个存活的 Worker,并不断更新。

当关闭其中某一个 Worker时,过一段时间Master会检测该变化,显示有2个存活的Worker;再关闭一个 Worker时,过一段时间Master会检测该变化,显示有1个存活的 Worker;将剩余的 Worker也关闭时,过一段时间Master会检测该变化,显示有0个存活的 Worker。


四、课后作业

在课后,可以将 Linux(cent OS)打开,把服务器部署到cent OS上,在Windows中启动两个 Worker,因为一般的机器难以负荷3个 cent OS同时打开。然后使用课上学习过的代码进行编译修改完成这一任务。


五、板书与总结

指定 Master 与 Worker 的启动参数

1.功能要求

Master, Worker的启动参数运行时的指定,而不是固定写在程序中。

2.代码实现(修改内容)

(1)修改了 SparkMaster.scala

def main(args: Array[string]): Unit = {

if(args.length !=3) {

println("请输入参数:host port sparkMasterAcotor名字")

sys.exit()

}

val host =args(0)

val port =args(1)

val name =args(2)

(2)修改了 SparkWorker.scala

①第一部分:

if(args.length !=4){

printin("请输入参数:workerHost  workerPort workerName masterHost  masterPort  masterName")

sys.exit()

}

val workerHost = args(0)

val workerPort = args(1)

val workerName = args(2)

val masterHost =args(3)

val masterPort =args(4)

val masterName = args(5)

②第二部分

对构造器进行了修改,即增加了一个参数MasterName。修改后的代码为class Sparkworker(masterHost:string,masterPort:Int,masterName:string)extendsActor{ 及.tcp://SparkMaster@${masterHost}:${masterPort}/user/${masterName}。

3、参数指定(以 SparkMaster 参数指定为例)

(1)点击“SparkMaster”,选择 Edit configurations;

(2)点击 program arguments 进行参数设置;

(3)点击确定应用即可;

(4)若要运行第二个 Worker 服务,则要修改参数,再次运行。

相关文章
|
设计模式 分布式计算 Scala
Spark Master 和 Worker 项目需求 | 学习笔记
快速学习 Spark Master 和 Worker 项目需求
82 0
Spark Master 和 Worker 项目需求 | 学习笔记
|
分布式计算 网络协议 Scala
Spark worker 注册功能完成 | 学习笔记
快速学习 Spark worker 注册功能完成
144 0
Spark worker 注册功能完成 | 学习笔记
zookepeer启动节点报错,unable to run quorum server
zookepeer启动节点报错,unable to run quorum server
231 0
ZK集群启动停止脚本
ZK集群启动停止脚本
188 0
|
Docker Perl 容器
使用 kubeadm 初始化 worker节点出现 not ready 故障
使用 kubeadm 初始化 worker节点出现 not ready 故障
830 1
使用 kubeadm 初始化 worker节点出现 not ready 故障
Giraph源码分析(四)—— Master 如何检查Worker启动成功
本文的目的 说明Giraph如何借助ZooKeeper来实现Master与Workers间的同步(不太确定)。 环境 在单机上(机器名:giraphx)启动了2个workers。 Giraph遵从单Master多Workers结构,BSPServiceMaster使用MasterThread线程来进行全局的同步。
Giraph源码分析(二)—启动Master/Worker服务
作者 | 白松 注:本文为原创,引用转载需与数澜联系。 1、org.apache.giraph.bsp.CentralizedService 接口 功能:Basic service interface shared by both CentralizedServiceMaster and CentralizedServiceWorker.
|
MySQL 关系型数据库
master/slave 相同server_id引起的同步失败
昨天在做MySQL SwitchOver遇到一个诡异的想象,切换前后的结构图如下: 当我把一切都切换好之后,应其他需求,重启了04上的mysql,然后show slave status\G发现报错: Last_IO_Error: Fatal error: The slav...
934 0
|
分布式计算 资源调度 Spark
Spark Master资源调度--worker向master注册
Spark Master资源调度–Worker向Master注册 更多资源 github: https://github.
1118 0
|
分布式计算 Apache Spark
Spark Master启动源码分析
Spark Master启动源码分析 更多资源 github: https://github.com/opensourceteams/spark-scala-maven csdn(汇总视频在线看): https://blog.
954 0