3.4 Akka编程重要API
实现Akka Actor类
- 继承Actor(注意:要导入akka.actor包下的Actor)
- 实现receive方法,receive方法中直接处理消息即可,不需要添加loop和react方法调用。Akka会自动调用receive来接收消息
- 【可选】还可以实现preStart()方法,该方法在Actor对象构建后执行,在Actor声明周期中仅执行一次
ActorSystem
在Akka中,ActorSystem是一个重量级的结构,它需要分配多个线程,所以在实际应用中,ActorSystem通常是一个单例对象,可以使用这个ActorSystem创建很多Actor。它负责创建和监督actor。
加载Akka Actor
- 要创建Akka的Actor,必须要先获取创建一个ActorSystem。需要给ActorSystem指定一个名称,并可以去加载一些配置项(后面会使用到)
- 调用ActorSystem.actorOf(Props(Actor对象), “Actor名字”)来加载Actor
获取ActorRef
- 在actor中,可以使用context上下文对象的actorSelection方法来获取ActorRef
- 获取到ActorRef就可以发送消息了
Actor中获取ActorSystem
直接使用context.system就可以获取到管理该Actor的ActorSystem的引用
Actor Path
每一个Actor都有一个Path,就像使用Spring MVC编写一个Controller/Handler一样,这个路径可以被外部引用。路径的格式如下:
Actor类型 | 路径 | 示例 |
3.5 Akka定时任务scheduler
如果我们想要在Actor定时的执行一些任务,该如何处理呢?
Akka中,提供一个scheduler对象来实现定时调度功能。使用ActorSystem.scheduler.schedule方法,可以启动一个定时任务。schedule方法针对scala提供两种使用形式:
第一种:直接指定Actor发送消息方式
def schedule( initialDelay: FiniteDuration, // 延迟多久后启动定时任务 interval: FiniteDuration, // 每隔多久执行一次 receiver: ActorRef, // 给哪个Actor发送消息 message: Any) // 要发送的消息类型 (implicit executor: ExecutionContext) // 隐式参数:需要手动导入隐式转换
第二种:传入用户函数自定义实现方式
def schedule( initialDelay: FiniteDuration, // 延迟多久后启动定时任务 interval: FiniteDuration // 每隔多久执行一次 )(f: ⇒ Unit) // 定期要执行的函数,可以将逻辑写在这里 (implicit executor: ExecutionContext) // 隐式参数:需要手动导入隐式转换
示例1:
object SechdulerActor extends Actor { override def receive: Receive = { case "timer" => println("收到消息...") } } object AkkaSchedulerDemo { def main(args: Array[String]): Unit = { val actorSystem = ActorSystem("SimpleAkkaDemo", ConfigFactory.load()) val senderActor: ActorRef = actorSystem.actorOf(Props(SechdulerActor), "sechdulerActor") import actorSystem.dispatcher actorSystem.scheduler.schedule( FiniteDuration(0, TimeUnit.SECONDS), FiniteDuration(1, TimeUnit.SECONDS), senderActor, "timer" ) } }
示例2:
object SechdulerActor extends Actor { override def receive: Receive = { case "timer" => println("收到消息...") } } object AkkaSchedulerDemo { def main(args: Array[String]): Unit = { val actorSystem = ActorSystem("SimpleAkkaDemo", ConfigFactory.load()) val senderActor: ActorRef = actorSystem.actorOf(Props(SechdulerActor), "sechdulerActor") import actorSystem.dispatcher import scala.concurrent.duration._ actorSystem.scheduler.schedule(0 seconds, 1 seconds) { senderActor ! "timer" } } }
要导入java.util.concurrent包下的TimeUnit
手动import actorSystem.dispatcher隐式参数
FiniteDuration(0, TimeUnit.SECONDS)可以使用 0 millis代替,但要提前导入import scala.concurrent.duration._隐式转换
4 实现两个进程之间的通信
4.1 案例介绍
需求:基于Akka实现在两个进程间发送、接收消息。Worker启动后去连接Master,并发送消息,Master接收到消息后,再回复Worker消息。
4.2 Master实现
实现步骤:
- 创建两个object,分别对应两个ActorSystem
- 分别创建两个Actor(MasterActor)
- 发送、接收消息
- 启动测试
为了支持远程通信,在创建ActorSystem时,需要指定如下配置:
//准备配置文件信息 val configStr= s""" |akka.actor.provider = "akka.remote.RemoteActorRefProvider" |akka.remote.netty.tcp.hostname = "IP地址" |akka.remote.netty.tcp.port = "端口号" """.stripMargin // 配置config对象 利用ConfigFactory解析配置文件,获取配置信息 val config=ConfigFactory.parseString(configStr) // 创建ActorSystem val masterActorSystem = ActorSystem("masterActorSystem",config)
示例代码:
Master
object WorkerActor extends Actor { val logger = Logger.getLogger("WorkerActor") override def receive: Receive = { case "setup" => logger.info("Worker启动成功") val masterActorRef = context.actorSelection("akka.tcp://masterActorSystem@127.0.0.1:8888/user/masterActor") masterActorRef ! "connect" case "success" => logger.info("连接Master成功") case _ => Unit } } object Worker { def main(args: Array[String]): Unit = { //准备配置文件信息 val configStr= s""" |akka.actor.provider = "akka.remote.RemoteActorRefProvider" |akka.remote.netty.tcp.hostname = "127.0.0.1" |akka.remote.netty.tcp.port = "9999" """.stripMargin // 配置config对象 利用ConfigFactory解析配置文件,获取配置信息 val config=ConfigFactory.parseString(configStr) // 创建ActorSystem val masterActorSystem = ActorSystem("workerActorSystem",config) val workerActor = masterActorSystem.actorOf(Props(WorkerActor), "workerActor") workerActor ! "setup" } }
- Logger是JRE中自带的日志系统API,可以使用Logger.getLogger(“Logger名字”)来获取一个日志器,获取Logger后,可以使用info来输出日志
- String的stripMargin方法,可以将字符串前面的
|
自动删除
4.3 Worker实现
object WorkerActor extends Actor { val logger = Logger.getLogger("WorkerActor") override def receive: Receive = { case "setup" => logger.info("Worker启动成功") val masterActorRef = context.actorSelection("akka.tcp://masterActorSystem@127.0.0.1:8888/user/masterActor") masterActorRef ! "connect" case "success" => logger.info("连接Master成功") case _ => Unit } } object Worker { def main(args: Array[String]): Unit = { //准备配置文件信息 val configStr= s""" |akka.actor.provider = "akka.remote.RemoteActorRefProvider" |akka.remote.netty.tcp.hostname = "127.0.0.1" |akka.remote.netty.tcp.port = "9999" """.stripMargin // 配置config对象 利用ConfigFactory解析配置文件,获取配置信息 val config=ConfigFactory.parseString(configStr) // 创建ActorSystem val masterActorSystem = ActorSystem("workerActorSystem",config) val workerActor = masterActorSystem.actorOf(Props(WorkerActor), "workerActor") workerActor ! "setup" } }
5 简易版spark通信框架案例
案例介绍:
本案例分为三个阶段来实现:
- Worker注册阶段
- Worker进程向Master注册(将自己的ID、CPU核数、内存大小(M)发送给Master)
- Worker定时发送心跳阶段
Worker定期向Master发送心跳消息 - Master定时心跳检测阶段
Master定期检查Worker心跳,将一些超时的Worker移除,并对Worker按照内存进行倒序排序
- 多个Worker测试阶段
抽取Worker参数,通过命令行参数接收Worker参数(绑定端口号、CPU、内存)
5.1 Worker注册阶段实现
创建Master/Worker ActorSystem,以及MasterActor、WorkerActor
创建RegiterMessage、RegisterSucessMessage消息样例类
Worker向Master发送注册消息,Master保存Worker信息,并给Worker回复注册成功消息
创建UserMessage.scala保存用户消息
/** * 注册消息 * @param workerId * @param cores CPU核数 * @param memory 内存(单位:M) */ case class RegisterMessage(workerId:String, cores:String, memory:String) /** * 注册成功消息 */ case object RegisterSuccessMessage
创建UserEntity.scala保存实体类
case class WorkerInfo(workerId:String, cores:String, memory:String)
创建SparkMaster.scala构建MasterActorSystem和MasterActor
object MasterActor extends Actor { val logger = Logger.getLogger("MasterActor") val workerMap = scala.collection.mutable.Map[String, WorkerInfo]() // 用于保存Worker信息 override def preStart(): Unit = { logger.info("Master启动成功") } override def receive: Receive = { case RegisterMessage(workerId, cores, memroy) => logger.info(s"新的worker: workerId=$workerId, CPU核数=${cores},内存=${memroy}") // 将新的worker保存到Map结构中 workerMap += (workerId -> WorkerInfo(workerId, cores, memroy)) // 回复注册成功消息 sender ! RegisterSuccessMessage case _ => Unit } } object SparkMaster { def main(args: Array[String]): Unit = { //准备配置文件信息 val configStr= s""" |akka.actor.provider = "akka.remote.RemoteActorRefProvider" |akka.remote.netty.tcp.hostname = "127.0.0.1" |akka.remote.netty.tcp.port = "8888" """.stripMargin // 配置config对象 利用ConfigFactory解析配置文件,获取配置信息 val config=ConfigFactory.parseString(configStr) // 创建ActorSystem val masterActorSystem = ActorSystem("masterActorSystem",config) val masterActorRef = masterActorSystem.actorOf(Props(MasterActor), "masterActor") } }
创建SparkWorker.scala用来构建WorkerActorSystem和WorkerActor
object WorkerActor extends Actor { val logger = Logger.getLogger("WorkerActor") override def preStart(): Unit = { val masterActorRef = context.actorSelection("akka.tcp://masterActorSystem@127.0.0.1:8888/user/masterActor") // 给Master发送注册消息 masterActorRef ! RegisterMessage(UUID.randomUUID().toString, "4", "1024") } override def receive: Receive = { case RegisterSuccessMessage => logger.info("注册成功!") case _ => Unit } } object SparkWorker { def main(args: Array[String]): Unit = { //准备配置文件信息 val configStr= s""" |akka.actor.provider = "akka.remote.RemoteActorRefProvider" |akka.remote.netty.tcp.hostname = "127.0.0.1" |akka.remote.netty.tcp.port = "9101" """.stripMargin // 配置config对象 利用ConfigFactory解析配置文件,获取配置信息 val config=ConfigFactory.parseString(configStr) // 创建ActorSystem val workerActorSystem = ActorSystem("workerActorSystem",config) val workerActorRef = workerActorSystem.actorOf(Props(WorkerActor)) } }
5.2 Worker定时发送心跳阶段
Worker什么时候给Master发送心跳呢?是启动就给Master发送心跳,还是在接收到Master返回注册成功消息后再发送呢?
应该是接收到Master返回注册成功后,再发送心跳消息。
Master收到Worker发送的心跳消息后,需要更新对应Worker的最后心跳时间。
示例代码:
添加心跳消息到UserMessage.scala中
case class HeartBeat(workerId:String)
SparkWorker.scala增加定时发送心跳消息逻辑:
override def receive: Receive = { case RegisterSuccessMessage => logger.info("注册成功!") logger.info("发送心跳消息...") import scala.concurrent.duration._ import context.dispatcher // 10秒发送一次心跳给Master context.system.scheduler.schedule(0 seconds, 3 seconds) { masterActorRef ! HeartBeat(workerId) } case _ => Unit }
WorkInfo样例类添加最后更新时间字段
case class WorkerInfo(workerId:String, cores:String, memory:String, var lastUpdateTime:Date)
SparkMaster.scala增加接收心跳消息逻辑:
override def receive: Receive = { case RegisterMessage(workerId, cores, memroy) => logger.info(s"新的worker: workerId=$workerId, CPU核数=${cores},内存=${memroy}") // 将新的worker保存到Map结构中 workerMap += (workerId -> WorkerInfo(workerId, cores, memroy)) // 回复注册成功消息 sender ! RegisterSuccessMessage case HeartBeat(workerId) => logger.info(s"接收到Worker:${workerId}心跳消息...") case _ => Unit }
5.3 Master定时心跳检测阶段
如果某个worker已经意外终止了,Master需要将该worker从当前的Worker集合中移除。Master中,还是可以通过Akka的定时任务,来实现心跳超时检查。
Master什么时候开始定期执行心跳超时检查呢?
——Master一启动就可以进行心跳超时检查了。
SparkMaster.scala定期移除超时心跳消息:
override def preStart(): Unit = { logger.info("Master启动成功") logger.info("启动心跳超时检查...") import scala.concurrent.duration._ import context.dispatcher context.system.scheduler.schedule(0 seconds, 3 seconds) { // 过滤出来超时的worker val timeoutWorkerId: List[String] = workerMap.toList.filter { workInfo => val now = new Date val interval = now.getTime - workInfo._2.lastHearBeatTime.getTime if (interval > 10 * 1000) true else false }.map(_._1) if(timeoutWorkerId.size > 0) { timeoutWorkerId.foreach{ workerId => logger.warning(s"移除超时worker: $workerId...") } // 移除全部超时Worker workerMap --= timeoutWorkerId // 获取、并按照内存倒序排序后的Worker列表 val sortedWorkInfo = workerMap.toList.sortWith { (idAndInfo1, idAndInfo2) => idAndInfo2._2.memory > idAndInfo1._2.memory }.map(_._2) println(sortedWorkInfo) } }
5.4 多个Worker测试阶段
为了启动多个worker,需要将端口号、CPU核数、内存通过命令行参数传递给Worker的main方法,然后进行初始化。
SparkWorker.scala代码
此处将object改为class
,因为这样更容易接受两个参数:cores、memory
class WorkerActor(cores:String, memory:String) extends Actor { val workerId = UUID.randomUUID().toString val logger = Logger.getLogger("WorkerActor") var masterActorRef:ActorSelection = _ override def preStart(): Unit = { masterActorRef = context.actorSelection("akka.tcp://masterActorSystem@127.0.0.1:8888/user/masterActor") // 给Master发送注册消息 masterActorRef ! RegisterMessage(workerId, cores, memory) logger.info(s"CPU核数: $cores, 内存大小:$memory(MB)") } override def receive: Receive = { case RegisterSuccessMessage => logger.info("注册成功!") logger.info("发送心跳消息...") import scala.concurrent.duration._ import context.dispatcher // 10秒发送一次心跳给Master context.system.scheduler.schedule(0 seconds, 3 seconds) { masterActorRef ! HeartBeat(workerId) } case _ => Unit } } object SparkWorker { def main(args: Array[String]): Unit = { //准备配置文件信息 val configStr= s""" |akka.actor.provider = "akka.remote.RemoteActorRefProvider" |akka.remote.netty.tcp.hostname = "127.0.0.1" |akka.remote.netty.tcp.port = "${args(0)}" """.stripMargin // 配置config对象 利用ConfigFactory解析配置文件,获取配置信息 val config=ConfigFactory.parseString(configStr) // 创建ActorSystem val workerActorSystem = ActorSystem("workerActorSystem",config) val workerActorRef = workerActorSystem.actorOf(Props(new WorkerActor(args(1), args(2)))) } }
UserEntity.scala,重写WorkerInfo的toString方法
case class WorkerInfo(workerId:String, cores:String, memory:String, var lastHearBeatTime:Date) { override def toString: String = s"内存: $memory - CPU核数: $cores" }