176 Scala 项目案例(项目实现)

简介: 176 Scala 项目案例(项目实现)
架构图

ActorSystem

在Akka中,ActorSystem是一个重量级的结构,他需要分配多个线程,所以在实际应用中,ActorSystem通常是一个单例对象,我们可以使用这个ActorSystem创建很多Actor。

Actor

在Akka中,Actor负责通信,在Actor中有一些重要的生命周期方法。

  1. preStart()方法:该方法在Actor对象构造方法执行后执行,整个Actor生命周期中仅执行一次。
  2. receive()方法:该方法在Actor的preStart方法执行完成后执行,用于接收消息,会被反复执行。
Master类
package cn.itcast.spark
import scala.concurrent.duration._
import akka.actor.{Props, ActorSystem, Actor}
import akka.actor.Actor.Receive
import com.typesafe.config.ConfigFactory
import scala.collection.mutable
/**
  * Master为整个集群中的主节点
  * Master继承了Actor
  */
class Master extends Actor{
  //保存WorkerID和Work信息的map
  val idToWorker = new mutable.HashMap[String, WorkerInfo]
  //保存所有Worker信息的Set
  val workers = new mutable.HashSet[WorkerInfo]
  //Worker超时时间
  val WORKER_TIMEOUT = 10 * 1000
  //重新receive方法
  //导入隐式转换,用于启动定时器
  import context.dispatcher
  //构造方法执行完执行一次
  override def preStart(): Unit = {
    //启动定时器,定时执行
    context.system.scheduler.schedule(0 millis, WORKER_TIMEOUT millis, self, CheckOfTimeOutWorker)
  }
  //该方法会被反复执行,用于接收消息,通过case class模式匹配接收消息
  override def receive: Receive = {
    //Worker向Master发送的注册消息
    case RegisterWorker(id, workerHost, memory, cores) => {
      if(!idToWorker.contains(id)) {
        val worker = new WorkerInfo(id, workerHost, memory, cores)
        workers.add(worker)
        idToWorker(id) = worker
        sender ! RegisteredWorker("192.168.10.1")
      }
    }
    //Worker向Master发送的心跳消息
    case HeartBeat(workerId) => {
      val workerInfo = idToWorker(workerId)
      workerInfo.lastHeartbeat = System.currentTimeMillis()
    }
    //Master自己向自己发送的定期检查超时Worker的消息
    case CheckOfTimeOutWorker => {
      val currentTime = System.currentTimeMillis()
      val toRemove = workers.filter(w => currentTime - w.lastHeartbeat > WORKER_TIMEOUT).toArray
      for(worker <- toRemove){
        workers -= worker
        idToWorker.remove(worker.id)
      }
      println("worker size: " + workers.size)
    }
  }
}
object Master {
  //程序执行入口
  def main(args: Array[String]) {
    val host = "192.168.10.1"
    val port = 8888
    //创建ActorSystem的必要参数
    val configStr =
      s"""
         |akka.actor.provider = "akka.remote.RemoteActorRefProvider"
         |akka.remote.netty.tcp.hostname = "$host"
         |akka.remote.netty.tcp.port = "$port"
       """.stripMargin
    val config = ConfigFactory.parseString(configStr)
    //ActorSystem是单例的,用来创建Actor
    val actorSystem = ActorSystem.create("MasterActorSystem", config)
    //启动Actor,Master会被实例化,生命周期方法会被调用
    actorSystem.actorOf(Props[Master], "Master")
  }
}
Worker类
package cn.itcast.spark
import java.util.UUID
import scala.concurrent.duration._
import akka.actor.{ActorSelection, Props, ActorSystem, Actor}
import akka.actor.Actor.Receive
import com.typesafe.config.ConfigFactory
/**
  * Worker为整个集群的从节点
  * Worker继承了Actor
  */
class Worker extends Actor{
  //Worker端持有Master端的引用(代理对象)
  var master: ActorSelection = null
  //生成一个UUID,作为Worker的标识
  val id = UUID.randomUUID().toString
  //构造方法执行完执行一次
  override def preStart(): Unit = {
    //Worker向MasterActorSystem发送建立连接请求
    master = context.system.actorSelection("akka.tcp://MasterActorSystem@192.168.10.1:8888/user/Master")
    //Worker向Master发送注册消息
    master ! RegisterWorker(id, "192.168.10.1", 10240, 8)
  }
  //该方法会被反复执行,用于接收消息,通过case class模式匹配接收消息
  override def receive: Receive = {
    //Master向Worker的反馈信息
    case RegisteredWorker(masterUrl) => {
      import context.dispatcher
      //启动定时任务,向Master发送心跳
      context.system.scheduler.schedule(0 millis, 5000 millis, self, SendHeartBeat)
    }
    case SendHeartBeat => {
      println("worker send heartbeat")
      master ! HeartBeat(id)
    }
  }
}
object Worker {
  def main(args: Array[String]) {
    val clientPort = 2552
    //创建WorkerActorSystem的必要参数
    val configStr =
      s"""
         |akka.actor.provider = "akka.remote.RemoteActorRefProvider"
         |akka.remote.netty.tcp.port = $clientPort
       """.stripMargin
    val config = ConfigFactory.parseString(configStr)
    val actorSystem = ActorSystem("WorkerActorSystem", config)
    //启动Actor,Master会被实例化,生命周期方法会被调用
    actorSystem.actorOf(Props[Worker], "Worker")
  }
}


目录
相关文章
|
6月前
|
并行计算 Scala
175 Scala 项目案例(Akka简介)
175 Scala 项目案例(Akka简介)
55 0
|
6月前
|
分布式计算 Hadoop Scala
174 Scala 项目案例(需求分析)
174 Scala 项目案例(需求分析)
17 0
|
分布式计算 Java Shell
安装SBT环境运行Scala项目
安装SBT环境运行Scala项目
496 0
安装SBT环境运行Scala项目
|
8月前
|
Scala
Idea中创建scala项目中Cannot resolve plugin org.scala-tools:maven-scala-plugin: unknown
Idea中创建scala项目中Cannot resolve plugin org.scala-tools:maven-scala-plugin: unknown
53 0
|
JSON Java 测试技术
在Scala项目中使用Spring Cloud
在Scala项目中使用Spring Cloud
在Scala项目中使用Spring Cloud
|
分布式计算 小程序 IDE
IDEA 开发 Scala 项目|学习笔记
快速学习 IDEA 开发 Scala 项目。
205 0
IDEA 开发 Scala 项目|学习笔记
|
Java Scala Maven
Maven - Scala/Java 项目添加自己的 jar 包
一.引言 scala / java 项目引用非官方依赖 jar 包时,需要自定义并打入最终的 jar 包,经过试验以下方案可以实现。 二.添加 jar 包到 maven 库 ???? 第三方自定义 jar 包可以添加到本地 maven 库中,随后即可 mvn package 打入到最终的项目 jar 包中,该方法最方便。创建 install.sh 文件,jar_path 为第三方自定义 jar 包在设备的位置,groupId、artifactId 和 版本号 version 自己定义,执行脚本后
358 0
Maven - Scala/Java 项目添加自己的 jar 包
|
分布式计算 Java Hadoop
Spark集群搭建记录 | 云计算[CentOS8] | Scala Maven项目访问Spark(local模式)实现单词计数(下)
step6 创建scala object step7 修改pom文件 step8 配置项目 step9 添加依赖库(Spark的jar包) step10 设置输入路径
142 0
Spark集群搭建记录 | 云计算[CentOS8] | Scala Maven项目访问Spark(local模式)实现单词计数(下)
|
分布式计算 IDE Java
Spark集群搭建记录 | 云计算[CentOS7] | Scala Maven项目访问Spark(local模式)实现单词计数(上)
写在前面 step1 下载Scala IDE step2 解压安装Scala IDE step3 Scala 下载 step4 Scala 配置 step5 创建scala项目
128 0
Spark集群搭建记录 | 云计算[CentOS7] | Scala Maven项目访问Spark(local模式)实现单词计数(上)
|
3月前
|
分布式计算 Java Scala
spark 与 scala 的对应版本查看、在idea中maven版本不要选择17,弄了好久,换成11就可以啦
spark 与 scala 的对应版本查看、.在idea中maven版本不要选择17,弄了好久,换成11就可以啦
112 2