spark之master与worker通信模型讲解

简介:

通信模型架构图

wKioL1lcy9uzX8myAAFZXwNSxTw417.png-wh_50

master 端代码
import akka.actor.{Actor, ActorSystem, Props}
import com.typesafe.config.ConfigFactory
// 需要导入这2个包 封装一些属性。
class MasterActor extends Actor {

  //在开始之前调用一次
  override def preStart(): Unit = {

  }

  //用于接收消息
  override def receive: Receive = {
    case "started" => {
      println("Master has been started!")
      //进入这个分支,说明这个Master线程已经启动完成
    }
    case "connecting" => {
      println("Master has been get connect from Worker!")
      println("a Worker Node has been register!")
      //返回消息给Worker
      sender() ! "connected"
      Thread.sleep(1000)
    }

    case "stoped" => {

    }
  }

}

object Demo01MasterActor {

  def main(args: Array[String]) {

    //设置MasterIP和端口
    val masterHost = "localhost"
    val masterPort = "1234"

    //端口和IP封装到akka架构,获取一个属性配置文件
    val conStr =
      s"""
         |akka.actor.provider = "akka.remote.RemoteActorRefProvider"
         |akka.remote.netty.tcp.hostname = "$masterHost"
         |akka.remote.netty.tcp.port = "$masterPort"
  """.stripMargin

    val config = ConfigFactory.parseString(conStr)
    val masterActorSystem = ActorSystem("MasterActorSystem", config)
    val masterActor = masterActorSystem.actorOf(Props[MasterActor], "MasterActor")
    masterActor ! "started"
    masterActorSystem.awaitTermination();

  }

}

worker端代码
import akka.actor.{Actor, ActorSelection, ActorSystem, Props}
import com.typesafe.config.ConfigFactory

class WorkerActor extends Actor {
  var masterURL: ActorSelection = null

  //启动Actor之前执行,做初始化工作
  override def preStart(): Unit = {
    //配置访问Master的URL
    //MasterIP:localhost
    //MasterPort:8888(根据Master配置)
    //Master的 ActorSystem对象:MasterActorSystem、MasterActor
    masterURL = context.actorSelection("akka.tcp://MasterActorSystem@localhost:8888/user/MasterActor")
  }

  override def receive: Receive = {
    case "started" => {
      println("Worker has been started!")
      //进入这个分支,说明这个Worker线程已经启动完成
      //可以去向Master注册

      //请求和Master建立连接
      masterURL ! "connecting"
    }
    case "connected" => {
      println("Worker 收到来自Master确认信息!")
    }
    case "stoped" => {

    }
  }


}

object Demo01WorkerActor {

  def main(args: Array[String]) {
    //初始化MastereIP和端口、WorkerIP和端口

    //    val masterHost = args(0)
    //    val masterPort = args(1)
    //    val workerHost = args(2)
    //    val workePort = args(3)

    val masterHost = "localhost"
    val masterPort = "8888"

    val workerHost = "localhost"
    val workePort = "8889"


    //端口和IP封装到akka架构,获取一个属性配置文件
    val conStr =
      s"""
         |akka.actor.provider = "akka.remote.RemoteActorRefProvider"
         |akka.remote.netty.tcp.hostname = "$workerHost"
         |akka.remote.netty.tcp.port = "$workePort"
  """.stripMargin

    val config = ConfigFactory.parseString(conStr)
    val workerActorSystem = ActorSystem("WorkerActorSystem", config)
    val workerActor = workerActorSystem.actorOf(Props[WorkerActor], "WorkerActor")
    workerActor ! "started"
    workerActorSystem.awaitTermination();


  }

}
本文转自  ChinaUnicom110  51CTO博客,原文链接:http://blog.51cto.com/xingyue2011/1944836

相关文章
|
6月前
|
Java Shell 分布式数据库
【大数据技术Hadoop+Spark】HBase数据模型、Shell操作、Java API示例程序讲解(附源码 超详细)
【大数据技术Hadoop+Spark】HBase数据模型、Shell操作、Java API示例程序讲解(附源码 超详细)
157 0
|
6月前
|
SQL 存储 分布式计算
【大数据技术Hadoop+Spark】Hive数据仓库架构、优缺点、数据模型介绍(图文解释 超详细)
【大数据技术Hadoop+Spark】Hive数据仓库架构、优缺点、数据模型介绍(图文解释 超详细)
1079 0
|
6月前
|
存储 分布式计算 负载均衡
【大数据技术Hadoop+Spark】MapReduce概要、思想、编程模型组件、工作原理详解(超详细)
【大数据技术Hadoop+Spark】MapReduce概要、思想、编程模型组件、工作原理详解(超详细)
223 0
|
1月前
|
存储 缓存 分布式计算
大数据-83 Spark 集群 RDD编程简介 RDD特点 Spark编程模型介绍
大数据-83 Spark 集群 RDD编程简介 RDD特点 Spark编程模型介绍
37 4
|
21天前
|
分布式计算 Java 开发工具
阿里云MaxCompute-XGBoost on Spark 极限梯度提升算法的分布式训练与模型持久化oss的实现与代码浅析
本文介绍了XGBoost在MaxCompute+OSS架构下模型持久化遇到的问题及其解决方案。首先简要介绍了XGBoost的特点和应用场景,随后详细描述了客户在将XGBoost on Spark任务从HDFS迁移到OSS时遇到的异常情况。通过分析异常堆栈和源代码,发现使用的`nativeBooster.saveModel`方法不支持OSS路径,而使用`write.overwrite().save`方法则能成功保存模型。最后提供了完整的Scala代码示例、Maven配置和提交命令,帮助用户顺利迁移模型存储路径。
|
4月前
|
SQL 弹性计算 资源调度
云服务器 ECS产品使用问题之bin/spark-sql --master yarn如何进行集群模式运行
云服务器ECS(Elastic Compute Service)是各大云服务商阿里云提供的一种基础云计算服务,它允许用户租用云端计算资源来部署和运行各种应用程序。以下是一个关于如何使用ECS产品的综合指南。
|
5月前
|
机器学习/深度学习 分布式计算 算法
基于Spark中随机森林模型的天气预测系统
基于Spark中随机森林模型的天气预测系统
125 1
|
6月前
|
Python
【已解决】Caused by: org.apache.spark.SparkException: Python worker failed to connect back.
【已解决】Caused by: org.apache.spark.SparkException: Python worker failed to connect back.
294 0
|
SQL 分布式计算 算法
【大数据处理框架】Spark大数据处理框架,包括其底层原理、架构、编程模型、生态圈
【大数据处理框架】Spark大数据处理框架,包括其底层原理、架构、编程模型、生态圈
510 0
|
分布式计算 搜索推荐 算法
大数据Spark MLlib基于模型的协同过滤
大数据Spark MLlib基于模型的协同过滤
88 0
下一篇
无影云桌面