Scala高阶函数与akka 2

本文涉及的产品
全局流量管理 GTM,标准版 1个月
公共DNS(含HTTPDNS解析),每月1000万次HTTP解析
云解析 DNS,旗舰版 1个月
简介: Scala高阶函数与akka

3.4 Akka编程重要API


实现Akka Actor类

  1. 继承Actor(注意:要导入akka.actor包下的Actor
  2. 实现receive方法,receive方法中直接处理消息即可,不需要添加loop和react方法调用。Akka会自动调用receive来接收消息
  3. 【可选】还可以实现preStart()方法,该方法在Actor对象构建后执行,在Actor声明周期中仅执行一次


ActorSystem

在Akka中,ActorSystem是一个重量级的结构,它需要分配多个线程,所以在实际应用中,ActorSystem通常是一个单例对象,可以使用这个ActorSystem创建很多Actor。它负责创建和监督actor。

加载Akka Actor

  1. 要创建Akka的Actor,必须要先获取创建一个ActorSystem。需要给ActorSystem指定一个名称,并可以去加载一些配置项(后面会使用到)
  1. 调用ActorSystem.actorOf(Props(Actor对象), “Actor名字”)来加载Actor

获取ActorRef

  1. 在actor中,可以使用context上下文对象的actorSelection方法来获取ActorRef
  1. 获取到ActorRef就可以发送消息了

Actor中获取ActorSystem

直接使用context.system就可以获取到管理该Actor的ActorSystem的引用

Actor Path

每一个Actor都有一个Path,就像使用Spring MVC编写一个Controller/Handler一样,这个路径可以被外部引用。路径的格式如下:

Actor类型 路径 示例

image.png

3.5 Akka定时任务scheduler


如果我们想要在Actor定时的执行一些任务,该如何处理呢?

Akka中,提供一个scheduler对象来实现定时调度功能。使用ActorSystem.scheduler.schedule方法,可以启动一个定时任务。schedule方法针对scala提供两种使用形式:

第一种:直接指定Actor发送消息方式

def schedule(
    initialDelay: FiniteDuration,   // 延迟多久后启动定时任务
    interval: FiniteDuration,     // 每隔多久执行一次
    receiver: ActorRef,         // 给哪个Actor发送消息
    message: Any)           // 要发送的消息类型
(implicit executor: ExecutionContext) // 隐式参数:需要手动导入隐式转换

第二种:传入用户函数自定义实现方式

def schedule(
    initialDelay: FiniteDuration,     // 延迟多久后启动定时任务
    interval: FiniteDuration        // 每隔多久执行一次
)(f: ⇒ Unit)                // 定期要执行的函数,可以将逻辑写在这里
(implicit executor: ExecutionContext)   // 隐式参数:需要手动导入隐式转换

示例1:

object SechdulerActor extends Actor {
  override def receive: Receive = {
    case "timer" => println("收到消息...")
  }
}
object AkkaSchedulerDemo {
  def main(args: Array[String]): Unit = {
    val actorSystem = ActorSystem("SimpleAkkaDemo", ConfigFactory.load())
    val senderActor: ActorRef = actorSystem.actorOf(Props(SechdulerActor), "sechdulerActor")
    import actorSystem.dispatcher
    actorSystem.scheduler.schedule(
      FiniteDuration(0, TimeUnit.SECONDS),
      FiniteDuration(1, TimeUnit.SECONDS),
      senderActor,
      "timer"
    )
  }
}

示例2:

object SechdulerActor extends Actor {
  override def receive: Receive = {
    case "timer" => println("收到消息...")
  }
}
object AkkaSchedulerDemo {
  def main(args: Array[String]): Unit = {
    val actorSystem = ActorSystem("SimpleAkkaDemo", ConfigFactory.load())
    val senderActor: ActorRef = actorSystem.actorOf(Props(SechdulerActor), "sechdulerActor")
    import actorSystem.dispatcher
    import scala.concurrent.duration._
    actorSystem.scheduler.schedule(0 seconds, 1 seconds) {
      senderActor ! "timer"
    }
  }
}


要导入java.util.concurrent包下的TimeUnit

手动import actorSystem.dispatcher隐式参数

FiniteDuration(0, TimeUnit.SECONDS)可以使用 0 millis代替,但要提前导入import scala.concurrent.duration._隐式转换

4 实现两个进程之间的通信

4.1 案例介绍


需求:基于Akka实现在两个进程间发送、接收消息。Worker启动后去连接Master,并发送消息,Master接收到消息后,再回复Worker消息。

4.2 Master实现


实现步骤:

  1. 创建两个object,分别对应两个ActorSystem
  2. 分别创建两个Actor(MasterActor)
  3. 发送、接收消息
  4. 启动测试

为了支持远程通信,在创建ActorSystem时,需要指定如下配置:

    //准备配置文件信息
    val configStr=
      s"""
        |akka.actor.provider = "akka.remote.RemoteActorRefProvider"
        |akka.remote.netty.tcp.hostname = "IP地址"
        |akka.remote.netty.tcp.port = "端口号"
      """.stripMargin
    // 配置config对象 利用ConfigFactory解析配置文件,获取配置信息
    val config=ConfigFactory.parseString(configStr)
  // 创建ActorSystem
  val masterActorSystem = ActorSystem("masterActorSystem",config)

示例代码:

Master

object WorkerActor extends Actor {
  val logger = Logger.getLogger("WorkerActor")
  override def receive: Receive = {
    case "setup" =>
      logger.info("Worker启动成功")
      val masterActorRef = context.actorSelection("akka.tcp://masterActorSystem@127.0.0.1:8888/user/masterActor")
      masterActorRef ! "connect"
    case "success" =>
      logger.info("连接Master成功")
    case _ => Unit
  }
}
object Worker {
  def main(args: Array[String]): Unit = {
    //准备配置文件信息
    val configStr=
      s"""
         |akka.actor.provider = "akka.remote.RemoteActorRefProvider"
         |akka.remote.netty.tcp.hostname = "127.0.0.1"
         |akka.remote.netty.tcp.port = "9999"
      """.stripMargin
    // 配置config对象 利用ConfigFactory解析配置文件,获取配置信息
    val config=ConfigFactory.parseString(configStr)
    // 创建ActorSystem
    val masterActorSystem = ActorSystem("workerActorSystem",config)
    val workerActor = masterActorSystem.actorOf(Props(WorkerActor), "workerActor")
    workerActor ! "setup"
  }
}
  1. Logger是JRE中自带的日志系统API,可以使用Logger.getLogger(“Logger名字”)来获取一个日志器,获取Logger后,可以使用info来输出日志
  2. String的stripMargin方法,可以将字符串前面的|自动删除

4.3 Worker实现

object WorkerActor extends Actor {
  val logger = Logger.getLogger("WorkerActor")
  override def receive: Receive = {
    case "setup" =>
      logger.info("Worker启动成功")
      val masterActorRef = context.actorSelection("akka.tcp://masterActorSystem@127.0.0.1:8888/user/masterActor")
      masterActorRef ! "connect"
    case "success" =>
      logger.info("连接Master成功")
    case _ => Unit
  }
}
object Worker {
  def main(args: Array[String]): Unit = {
    //准备配置文件信息
    val configStr=
      s"""
         |akka.actor.provider = "akka.remote.RemoteActorRefProvider"
         |akka.remote.netty.tcp.hostname = "127.0.0.1"
         |akka.remote.netty.tcp.port = "9999"
      """.stripMargin
    // 配置config对象 利用ConfigFactory解析配置文件,获取配置信息
    val config=ConfigFactory.parseString(configStr)
    // 创建ActorSystem
    val masterActorSystem = ActorSystem("workerActorSystem",config)
    val workerActor = masterActorSystem.actorOf(Props(WorkerActor), "workerActor")
    workerActor ! "setup"
  }
}

5 简易版spark通信框架案例

案例介绍:

本案例分为三个阶段来实现:

  1. Worker注册阶段
  • Worker进程向Master注册(将自己的ID、CPU核数、内存大小(M)发送给Master)
  1. Worker定时发送心跳阶段
    Worker定期向Master发送心跳消息
  2. Master定时心跳检测阶段
    Master定期检查Worker心跳,将一些超时的Worker移除,并对Worker按照内存进行倒序排序
  1. 多个Worker测试阶段
    抽取Worker参数,通过命令行参数接收Worker参数(绑定端口号、CPU、内存)

5.1 Worker注册阶段实现

创建Master/Worker ActorSystem,以及MasterActor、WorkerActor

创建RegiterMessage、RegisterSucessMessage消息样例类

Worker向Master发送注册消息,Master保存Worker信息,并给Worker回复注册成功消息

创建UserMessage.scala保存用户消息

/**
  * 注册消息
  * @param workerId
  * @param cores CPU核数
  * @param memory 内存(单位:M)
  */
case class RegisterMessage(workerId:String, cores:String, memory:String)
/**
  * 注册成功消息
  */
case object RegisterSuccessMessage

创建UserEntity.scala保存实体类

case class WorkerInfo(workerId:String, cores:String, memory:String)

创建SparkMaster.scala构建MasterActorSystem和MasterActor

object MasterActor extends Actor {
  val logger = Logger.getLogger("MasterActor")
  val workerMap = scala.collection.mutable.Map[String, WorkerInfo]()      // 用于保存Worker信息
  override def preStart(): Unit = {
    logger.info("Master启动成功")
  }
  override def receive: Receive = {
    case RegisterMessage(workerId, cores, memroy) =>
      logger.info(s"新的worker: workerId=$workerId, CPU核数=${cores},内存=${memroy}")
      // 将新的worker保存到Map结构中
      workerMap += (workerId -> WorkerInfo(workerId, cores, memroy))
      // 回复注册成功消息
      sender ! RegisterSuccessMessage
    case _ => Unit
  }
}
object SparkMaster {
  def main(args: Array[String]): Unit = {
    //准备配置文件信息
    val configStr=
      s"""
         |akka.actor.provider = "akka.remote.RemoteActorRefProvider"
         |akka.remote.netty.tcp.hostname = "127.0.0.1"
         |akka.remote.netty.tcp.port = "8888"
      """.stripMargin
    // 配置config对象 利用ConfigFactory解析配置文件,获取配置信息
    val config=ConfigFactory.parseString(configStr)
    // 创建ActorSystem
    val masterActorSystem = ActorSystem("masterActorSystem",config)
    val masterActorRef = masterActorSystem.actorOf(Props(MasterActor), "masterActor")
  }
}

创建SparkWorker.scala用来构建WorkerActorSystem和WorkerActor

object WorkerActor extends Actor {
  val logger = Logger.getLogger("WorkerActor")
  override def preStart(): Unit = {
    val masterActorRef = context.actorSelection("akka.tcp://masterActorSystem@127.0.0.1:8888/user/masterActor")
    // 给Master发送注册消息
    masterActorRef ! RegisterMessage(UUID.randomUUID().toString, "4", "1024")
  }
  override def receive: Receive = {
    case RegisterSuccessMessage =>
      logger.info("注册成功!")
    case _ => Unit
  }
}
object SparkWorker {
  def main(args: Array[String]): Unit = {
    //准备配置文件信息
    val configStr=
      s"""
         |akka.actor.provider = "akka.remote.RemoteActorRefProvider"
         |akka.remote.netty.tcp.hostname = "127.0.0.1"
         |akka.remote.netty.tcp.port = "9101"
      """.stripMargin
    // 配置config对象 利用ConfigFactory解析配置文件,获取配置信息
    val config=ConfigFactory.parseString(configStr)
    // 创建ActorSystem
    val workerActorSystem = ActorSystem("workerActorSystem",config)
    val workerActorRef = workerActorSystem.actorOf(Props(WorkerActor))
  }
}

5.2 Worker定时发送心跳阶段


Worker什么时候给Master发送心跳呢?是启动就给Master发送心跳,还是在接收到Master返回注册成功消息后再发送呢?

应该是接收到Master返回注册成功后,再发送心跳消息。

Master收到Worker发送的心跳消息后,需要更新对应Worker的最后心跳时间。

示例代码:

添加心跳消息到UserMessage.scala中

case class HeartBeat(workerId:String)

SparkWorker.scala增加定时发送心跳消息逻辑:

  override def receive: Receive = {
    case RegisterSuccessMessage =>
      logger.info("注册成功!")
      logger.info("发送心跳消息...")
      import scala.concurrent.duration._
      import context.dispatcher
      // 10秒发送一次心跳给Master
      context.system.scheduler.schedule(0 seconds, 3 seconds) {
        masterActorRef ! HeartBeat(workerId)
      }
    case _ => Unit
  }

WorkInfo样例类添加最后更新时间字段

case class WorkerInfo(workerId:String, cores:String, memory:String, var lastUpdateTime:Date)

SparkMaster.scala增加接收心跳消息逻辑:

  override def receive: Receive = {
    case RegisterMessage(workerId, cores, memroy) =>
      logger.info(s"新的worker: workerId=$workerId, CPU核数=${cores},内存=${memroy}")
      // 将新的worker保存到Map结构中
      workerMap += (workerId -> WorkerInfo(workerId, cores, memroy))
      // 回复注册成功消息
      sender ! RegisterSuccessMessage
    case HeartBeat(workerId) =>
      logger.info(s"接收到Worker:${workerId}心跳消息...")
    case _ => Unit
  }

5.3 Master定时心跳检测阶段


如果某个worker已经意外终止了,Master需要将该worker从当前的Worker集合中移除。Master中,还是可以通过Akka的定时任务,来实现心跳超时检查。

Master什么时候开始定期执行心跳超时检查呢?

——Master一启动就可以进行心跳超时检查了。

SparkMaster.scala定期移除超时心跳消息:

override def preStart(): Unit = {
    logger.info("Master启动成功")
    logger.info("启动心跳超时检查...")
    import scala.concurrent.duration._
    import context.dispatcher
    context.system.scheduler.schedule(0 seconds, 3 seconds) {
      // 过滤出来超时的worker
      val timeoutWorkerId: List[String] = workerMap.toList.filter {
        workInfo =>
          val now = new Date
          val interval = now.getTime - workInfo._2.lastHearBeatTime.getTime
          if (interval > 10 * 1000) true else false
      }.map(_._1)
      if(timeoutWorkerId.size > 0) {
        timeoutWorkerId.foreach{
          workerId =>
            logger.warning(s"移除超时worker: $workerId...")
        }
        // 移除全部超时Worker
        workerMap --= timeoutWorkerId
        // 获取、并按照内存倒序排序后的Worker列表
        val sortedWorkInfo = workerMap.toList.sortWith {
          (idAndInfo1, idAndInfo2) =>
            idAndInfo2._2.memory > idAndInfo1._2.memory
        }.map(_._2)
        println(sortedWorkInfo)
      }
    }

5.4 多个Worker测试阶段


为了启动多个worker,需要将端口号、CPU核数、内存通过命令行参数传递给Worker的main方法,然后进行初始化。

SparkWorker.scala代码

此处将object改为class,因为这样更容易接受两个参数:cores、memory

class WorkerActor(cores:String, memory:String) extends Actor {
  val workerId = UUID.randomUUID().toString
  val logger = Logger.getLogger("WorkerActor")
  var masterActorRef:ActorSelection = _
  override def preStart(): Unit = {
    masterActorRef = context.actorSelection("akka.tcp://masterActorSystem@127.0.0.1:8888/user/masterActor")
    // 给Master发送注册消息
    masterActorRef ! RegisterMessage(workerId, cores, memory)
    logger.info(s"CPU核数: $cores, 内存大小:$memory(MB)")
  }
  override def receive: Receive = {
    case RegisterSuccessMessage =>
      logger.info("注册成功!")
      logger.info("发送心跳消息...")
      import scala.concurrent.duration._
      import context.dispatcher
      // 10秒发送一次心跳给Master
      context.system.scheduler.schedule(0 seconds, 3 seconds) {
        masterActorRef ! HeartBeat(workerId)
      }
    case _ => Unit
  }
}
object SparkWorker {
  def main(args: Array[String]): Unit = {
    //准备配置文件信息
    val configStr=
      s"""
         |akka.actor.provider = "akka.remote.RemoteActorRefProvider"
         |akka.remote.netty.tcp.hostname = "127.0.0.1"
         |akka.remote.netty.tcp.port = "${args(0)}"
      """.stripMargin
    // 配置config对象 利用ConfigFactory解析配置文件,获取配置信息
    val config=ConfigFactory.parseString(configStr)
    // 创建ActorSystem
    val workerActorSystem = ActorSystem("workerActorSystem",config)
    val workerActorRef = workerActorSystem.actorOf(Props(new WorkerActor(args(1), args(2))))
  }
}

UserEntity.scala,重写WorkerInfo的toString方法

case class WorkerInfo(workerId:String, cores:String, memory:String, var lastHearBeatTime:Date) {
  override def toString: String = s"内存: $memory - CPU核数: $cores"
}

目录
相关文章
|
11月前
|
并行计算 Scala
175 Scala 项目案例(Akka简介)
175 Scala 项目案例(Akka简介)
96 0
|
3月前
|
Scala 开发者
Scala中的模式匹配与高阶函数:探索强大的编程范式
【7月更文挑战第11天】Scala中的模式匹配和高阶函数是两种极其强大的特性,它们不仅提升了代码的表达力和可读性,还使得开发者能够编写出更加灵活和可重用的解决方案。通过
|
5月前
|
SQL 分布式计算 编译器
Scala:高阶函数、隐式转换(四)
Scala:高阶函数、隐式转换(四)
45 0
|
11月前
|
Scala
172 Scala 高阶函数例子
172 Scala 高阶函数例子
28 0
|
11月前
|
算法 Scala
171 Scala 高阶函数
171 Scala 高阶函数
33 0
|
算法 Java 编译器
Scala高阶函数与akka 1
Scala高阶函数与akka
62 0
Scala快速入门-9-高阶函数
作为值的函数 创建匿名函数 带函数参数的函数 闭包 柯里化
|
分布式计算 Shell API
scala函数式编程与高阶函数
谈到编程方式有指令式编程、过程化编程、函数式编程,函数式编程在使用时主要的思想就是把运算过程写成一些列嵌套的函数调用。把一组数据通过函数实现运算转化成另一组数据。函数式编程这种方式在我们将来开发Spark、Flink的业务代码时会大量使用。下面列出了一些我们经常使用的进行函数式编程的Iterable trait(可迭代特质)方法。
82 0
|
大数据 网络安全 Scala
大数据Scala系列之高阶函数
大数据Scala系列之高阶函数1.1. 概念如果一个函数的传入参数为函数或者返回值是函数,则该函数即为高阶函数。 1.2. 传入参数为函数Scala中,函数是头等公民,和数字一样。不仅可以调用,还可以在变量中存放函数,也可以作为参数传入函数,或者作为函数的返回值。
1033 0