指定 Master 与 Worker 的启动参数 | 学习笔记

简介: 快速学习指定 Master 与 Worker 的启动参数

开发者学堂课程【Scala 核心编程 - 进阶指定 Master 与 Worker 的启动参数学习笔记,与课程紧密连接,让用户快速学习知识。

课程地址https://developer.aliyun.com/learning/course/610/detail/9127


指定 Master 与 Worker 的启动参数


内容介绍:

一、功能要求

二、代码实现

三、代码运行

四、课后作业

五、板书与总结


一、功能要求

目前我们代码还存在一定问题,worker 一旦启用,其运行的参数都是固定的。

如 Master 的 IP 地址是指定的,端口也是指定的,而这显然不合理的,因为在将来部署的时候,不可以允许其他人来修改源代码。所以涉及到的灵活性的参数,就应该在运行时将其做成一个参数传入再运行。

本节课要完成的任务 Master, Worker 的启动参数运行时的指定,而不是固定写在程序中。


二、代码实现

1、先改动 master

(1)原代码:

object sparkMaster{

def main(args: Array[string]): Unit = {

val config = ConfigFactory.parsestring(

s"""

|akka.actor.provider="akka.remote.RemoteActorRefProvider

|akka.remote.netty.tcp.hostname=127.0.0.1

|akka.remote.netty.tcp.port=10002

""".stripMargin)

val sparkMastersystem=Actorsystem("SparkMaster",config)

val sparkMasterRef=sparkMastersystem.actorof(Props[Spa

rkMaster],"SparkMaster-01")

sparkMasterRef !"start"

}

}

先分析一下应该将哪些参数做成动态指定,首先host应该指定一下,而不是固定的127.0.0.1;端口port也是可以改变的;甚至名字"sparkMaster"也可以改定,如一台机器可以运行多个Master也是有的;"SparkMaster-01"也是可以改定的。因此,共有4个参数可做成动态参数,并将其指定。

刚刚我们分析出有四个参数,这里我们可以灵活处理,暂时将SparkMaster值固定,也就是说这里我们要指定三个参数,即host、port和SparkMasterActor。

(2)修改后的代码:

object sparkMaster{

def main(args: Array[string]): Unit = {

//参数判断

if(args.length !=3) {

println("请输入参数:host port sparkMasterAcotor名字")

sys.exit()

//不满足条件即退出程序

}

//满足条件,在其中定义值

val host =args(0)

val port =args(1)

val name =args(2)

val config = ConfigFactory.parsestring(

s"""

|akka.actor.provider="akka.remote.RemoteActorRefProvider

|akka.remote.netty.tcp.hostname=$(host)

|akka.remote.netty.tcp.port=$(port)

""".stripMargin)

val sparkMastersystem=Actorsystem("SparkMaster",config)

val sparkMasterRef=sparkMastersystem.actorof(Props[Sp

arkMaster],s"$(name)")

sparkMasterRef !"start"

}

}

2、改动 worker

(1)原代码:

object SparkWorker {

def main(args: Array[string]):Unit ={

val workerHost = "127.0.0.1"

val workerport = 10001

val masterHost = "127.0.0.1"

val masterport = 10005

val config=ConfigFactory.parsestring(

s"""

|akka.actor.provider="akka.remote.RemoteActorRefProvider" |akka.remote.netty.tcp.hostname=127.0.0.1 |akka.remote.netty.tcp.port=10002

""".stripMargin)

val sparkWorkersystem=Actorsystem("sparkWorker",config)

val sparkWorkerRef=sparkWorkerSystem.actorof(Props(ne

w SparkWorker(masterHost, masterPort)),"Sparkworker-01")

先来分析一共有几个参数要指定,至少应包含workerHost、workerport 、masterHost 与masterport 4个参数。

此外相关的名字也应指定,再增加SparkWorker-01与SparkMaster-01的名字,因此,至少有六个参数是待指定的。

(2)修改后的代码

object SparkWorker {

def main(args: Array[string]):Unit ={

if(args.length !=4){

printin("请输入参数:workerHost  workerPort workerName masterHost  masterPort  masterName")

sys.exit()

}

val workerHost = args(0)

val workerPort = args(1)

val workerName = args(2)

val masterHost =args(3)

val masterPort =args(4)

val masterName = args(5)

val config=ConfigFactory.parsestring(

s"""

|akka.actor.provider="akka.remote.RemoteActorRefProvider" |akka.remote.netty.tcp.hostname=${workerHost} |akka.remote.netty.tcp.port=${workerPort}

""".stripMargin)

val sparkWorkersystem=Actorsystem("sparkWorker",config)//val sparkWorkerRef=sparkWorkerSystem.actorof(Props (new SparkWorker(masterHost,masterPort)),"Sparkwork

er-01")

//该代码显示错误,因为值类型不匹配,一方需要Int,一方需要

String。且"Sparkworker-01"也需要替换,因此,可改为:val sparkWorkerRef=sparkWorkerSystem.actorof(Props (new SparkWorker(masterHost, masterPort.to Int,master Name)),s"$(workerName)")

 //此代码仍存在问题,因为在构建Sparkworker的actorof时,原本的代码是固定的(.tcp://SparkMaster@${masterHost}:${masterPort}/user/"SparkMaster-01")    

println("masterProxy="+masterProxy)),因此应在前面的代码中增加一个参数

class Sparkworker(masterHost:string,masterPort:Int,mast

erName:string)extends Actor{

//masterName:string是增加的参数,原本的代码修改为.tcp://SparkMaster@${masterHost}:${masterPort}/user/${masterName}


三、代码运行

启动三个 worker 和一个 master 来运行。首先进行Master的设置,先找到SparkMaster 的 config,点击 Edit configurations,点击 program arguments 进入。

1.输入 Worker 的信息

输入 IP 地址127.0.0.1,端口10001(若 Master 与 Worker 在同一台机器上,则两者端口不可以相同),名字 SparkWorker01。

2.输入 Master 的信息

输入IP地址127.0.0.1,端口10005,名字 SparkMaster01,点击ok即可。

3.关闭端口,启动 Master

运行结果显示“ .tcp://SparkMaster@127.0.0.1:10005”,表明端口一致。显示Master服务器启动了,并开始了定时检测worker心跳的任务。

4.启动 Worker

运行代码后发现代码无误,Worker 开始向 Master 发送心跳,在 Master 端开始检测到有1个存活的 Worker。

5.修改Worker的参数,启动第二个Worker

点击 Edit configurations,点击 program arguments进入,不要改动主机(Master)的参数,将端口改成10002,名称改为 SparkWorker02,保持不与之前的 Worke 冲突。点击 OK,再次启动 Worker,即启动的第二个Worker。

运行代码之后,发现该 Worker注册成功,并开始发送心跳。Master 端检测到了2个存活的 Worker,并不断更新。

6.修改 Worker 的参数,启动第三个 Worker

操作同上,不要修改主机的参数,将端口改成10003,名称改为 SparkWorker03。点击 OK,再次启动 Worker,即启动的第三个 Worker。运行代码之后,发现该Worker注册成功,并开始发送心跳。Master 端检测到了3个存活的 Worker,并不断更新。

当关闭其中某一个 Worker时,过一段时间Master会检测该变化,显示有2个存活的Worker;再关闭一个 Worker时,过一段时间Master会检测该变化,显示有1个存活的 Worker;将剩余的 Worker也关闭时,过一段时间Master会检测该变化,显示有0个存活的 Worker。


四、课后作业

在课后,可以将 Linux(cent OS)打开,把服务器部署到cent OS上,在Windows中启动两个 Worker,因为一般的机器难以负荷3个 cent OS同时打开。然后使用课上学习过的代码进行编译修改完成这一任务。


五、板书与总结

指定 Master 与 Worker 的启动参数

1.功能要求

Master, Worker的启动参数运行时的指定,而不是固定写在程序中。

2.代码实现(修改内容)

(1)修改了 SparkMaster.scala

def main(args: Array[string]): Unit = {

if(args.length !=3) {

println("请输入参数:host port sparkMasterAcotor名字")

sys.exit()

}

val host =args(0)

val port =args(1)

val name =args(2)

(2)修改了 SparkWorker.scala

①第一部分:

if(args.length !=4){

printin("请输入参数:workerHost  workerPort workerName masterHost  masterPort  masterName")

sys.exit()

}

val workerHost = args(0)

val workerPort = args(1)

val workerName = args(2)

val masterHost =args(3)

val masterPort =args(4)

val masterName = args(5)

②第二部分

对构造器进行了修改,即增加了一个参数MasterName。修改后的代码为class Sparkworker(masterHost:string,masterPort:Int,masterName:string)extendsActor{ 及.tcp://SparkMaster@${masterHost}:${masterPort}/user/${masterName}。

3、参数指定(以 SparkMaster 参数指定为例)

(1)点击“SparkMaster”,选择 Edit configurations;

(2)点击 program arguments 进行参数设置;

(3)点击确定应用即可;

(4)若要运行第二个 Worker 服务,则要修改参数,再次运行。

相关文章
|
9月前
|
弹性计算 运维 监控
云产品评测 云服务诊断
作为一名开发工程师,我负责云资源的运维和管理。通过健康状态和诊断功能,可实时监控云资源(如ECS、网站等)的运行情况,快速排查并解决诸如访问异常、配置错误、安全风险等问题。诊断工具简化了问题定位流程,提供详细的修复建议,帮助用户在1-2分钟内完成诊断,迅速恢复业务。健康状态页面展示各实例的每小时健康数据,支持查看15天内的历史记录,极大提升了问题排查效率。
360 98
|
JSON JavaScript API
商品详情数据接口解析返回的JSON数据(API接口整套流程)
商品详情数据接口解析返回的JSON数据是API接口使用中的一个重要环节,它涉及从发送请求到接收并处理响应的整个流程。以下是一个完整的API接口使用流程,包括如何解析返回的JSON数据:
|
11月前
|
监控 Ubuntu Linux
使用VSCode通过SSH远程登录阿里云Linux服务器异常崩溃
通过 VSCode 的 Remote - SSH 插件远程连接阿里云 Ubuntu 22 服务器时,会因高 CPU 使用率导致连接断开。经排查发现,VSCode 连接根目录 ".." 时会频繁调用"rg"(ripgrep)进行文件搜索,导致 CPU 负载过高。解决方法是将连接目录改为"root"(或其他具体的路径),避免不必要的文件检索,从而恢复正常连接。
|
11月前
|
弹性计算 负载均衡 监控
slb配置健康检查
slb配置健康检查
317 2
|
10月前
|
数据可视化 DataX Python
Seaborn 教程-绘图函数
Seaborn 教程-绘图函数
337 8
|
11月前
|
缓存 运维 网络协议
深入Linux内核架构:操作系统的核心奥秘
深入Linux内核架构:操作系统的核心奥秘
502 2
|
机器学习/深度学习 人工智能 算法
软件测试的未来:AI与自动化的融合之路
随着技术的不断进步,人工智能(AI)和自动化技术在软件测试领域的应用日益增多。本文将探讨AI如何改变软件测试的面貌,以及自动化工具如何助力提升测试效率和准确性。我们将从实际案例出发,分析AI和自动化技术带来的优势与挑战,并展望未来软件测试的发展趋势。
|
消息中间件 监控 Java
惊呆了!Python性能测试高手都用这些神器:JMeter+Locust,效率翻倍📈
【8月更文挑战第4天】在软件开发中,性能测试至关重要。JMeter与Locust分别是跨平台与Python专用的性能测试利器。JMeter功能强大,支持多种协议,图形界面友好;而Locust则利用Python的协程实现高并发,代码清晰易维护。两者均可扩展,支持分布式测试。结合使用时,JMeter测试非Web服务,Locust专注Python Web服务,互补优势,大幅提升测试效率和准确性,为应用稳定运行提供坚实保障。
273 0
|
Web App开发 测试技术 iOS开发
一篇文章讲明白locust性能测试
一篇文章讲明白locust性能测试
404 0
|
UED 开发者 异构计算
浏览器刷新率是什么?
【5月更文挑战第29天】浏览器刷新率是什么?
608 1