Spark如何使用Akka实现进程、节点通信的简明介绍

简介: Akka是一款提供了用于构建高并发的、分布式的、可伸缩的、基于Java虚拟机的消息驱动应用的工具集和运行时环境。

Akka简介

Scala认为Java线程通过共享数据以及通过锁来维护共享数据的一致性是糟糕的做法,容易引起锁的争用,而且线程的上下文切换会带来不少开销,降低并发程序的性能,甚至会引入死锁的问题。在Scala中只需要自定义类型继承Actor,并且提供act方法,就如同Java里实现Runnable接口,需要实现run方法一样。但是不能直接调用act方法,而是通过发送消息的方式(Scala发送消息是异步的),传递数据。如:

     Actor ! message
     Akka是Actor编程模型的高级类库,类似于JDK 1.5之后越来越丰富的并发工具包,简化了程序员并发编程的难度。Akka是一款提供了用于构建高并发的、分布式的、可伸缩的、基于Java虚拟机的消息驱动应用的工具集和运行时环境。从下面Akka官网提供的一段代码示例,可以看出Akka并发编程的简约。
case class Greeting(who: String)
class GreetingActor extends Actor with ActorLogging {
def receive = {
case Greeting(who) ⇒ log.info("Hello " + who)
      }
}
val system = ActorSystem("MySystem")
val greeter = system.actorOf(Props[GreetingActor], name = "greeter")
greeter ! Greeting("Charlie Parker")

Akka提供了分布式的框架,意味着用户不需要考虑如何实现分布式部署,Akka官网提供了下面的示例演示如何获取远程Actor的引用。

// config on all machines
akka {
 actor {
   provider = akka.remote.RemoteActorRefProvider
   deployment {
     /greeter {
       remote = akka.tcp://MySystem@machine1:2552
     }
   }
 }
}
// ------------------------------
// define the greeting actor and the greeting message
case class Greeting(who: String) extends Serializable
class GreetingActor extends Actor with ActorLogging {
  def receive = {
    case Greeting(who) ⇒ log.info("Hello " + who)
  }
}
// ------------------------------
// on machine 1: empty system, target for deployment from machine 2
val system = ActorSystem("MySystem")
// ------------------------------
// on machine 2: Remote Deployment - deploying on machine1
val system = ActorSystem("MySystem")
val greeter = system.actorOf(Props[GreetingActor], name = "greeter")
// ------------------------------
// on machine 3: Remote Lookup (logical home of “greeter” is machine2, remote deployment is transparent)
val system = ActorSystem("MySystem")
val greeter = system.actorSelection("akka.tcp://MySystem@machine2:2552/user/greeter")
greeter ! Greeting("Sonny Rollins")

Actor之间最终会构成一棵树,作为父亲的Actor应当对所有儿子的异常失败进行处理(监管)Akka给出了简单的示例,代码如下。

class Supervisor extends Actor {
  override val supervisorStrategy =
    OneForOneStrategy(maxNrOfRetries = 10, withinTimeRange = 1 minute) {
      case _: ArithmeticException      ⇒ Resume
      case _: NullPointerException     ⇒ Restart
      case _: Exception                ⇒ Escalate
    }
  val worker = context.actorOf(Props[Worker])
  def receive = {
    case n: Int => worker forward n
  }
}

Akka的更多信息请访问官方网站:http://akka.io/

基于Akka的分布式消息系统ActorSystem

     Spark使用Akka提供的消息系统实现并发:ActorSystem是Spark中最基础的设施,Spark既使用它发送分布式消息,又用它实现并发编程。正是因为Actor轻量级的并发编程、消息发送以及ActorSystem支持分布式消息发送等特点,Spark选择了ActorSystem。
     SparkEnv中创建ActorSystem时用到了AkkaUtils工具类,代码如下。
    val (actorSystem, boundPort) =
      Option(defaultActorSystem) match {
        case Some(as) => (as, port)
        case None =>
          val actorSystemName = if (isDriver) driverActorSystemName else executorActorSystemName
          AkkaUtils.createActorSystem(actorSystemName, hostname, port, conf, securityManager)
      }

AkkaUtils.createActorSystem方法用于启动ActorSystem,代码如下。

  def createActorSystem(
      name: String,
      host: String,
      port: Int,
      conf: SparkConf,
      securityManager: SecurityManager): (ActorSystem, Int) = {
    val startService: Int => (ActorSystem, Int) = { actualPort =>
      doCreateActorSystem(name, host, actualPort, conf, securityManager)
    }
    Utils.startServiceOnPort(port, startService, conf, name)
  }

AkkaUtils使用了Utils的静态方法startServiceOnPort, startServiceOnPort最终会回调方法startService: Int=> (T, Int),此处的startService实际是方法doCreateActorSystem。真正启动ActorSystem是由doCreateActorSystem方法完成的,doCreateActorSystem的具体实现细节请见AkkaUtils的详细介绍。关于startServiceOnPort的实现,请参阅《Spark中常用工具类Utils的简明介绍》一文的内容。

AkkaUtils

AkkaUtils是Spark对Akka相关API的又一层封装,这里对其常用的功能进行介绍。

(1)doCreateActorSystem

功能描述:创建ActorSystem。

private def doCreateActorSystem(
      name: String,
      host: String,
      port: Int,
      conf: SparkConf,
      securityManager: SecurityManager): (ActorSystem, Int) = {

    val akkaThreads   = conf.getInt("spark.akka.threads", 4)
    val akkaBatchSize = conf.getInt("spark.akka.batchSize", 15)
    val akkaTimeout = conf.getInt("spark.akka.timeout", 100)
    val akkaFrameSize = maxFrameSizeBytes(conf)
    val akkaLogLifecycleEvents = conf.getBoolean("spark.akka.logLifecycleEvents", false)
    val lifecycleEvents = if (akkaLogLifecycleEvents) "on" else "off"
    if (!akkaLogLifecycleEvents) {
      Option(Logger.getLogger("akka.remote.EndpointWriter")).map(l => l.setLevel(Level.FATAL))
    }
    val logAkkaConfig = if (conf.getBoolean("spark.akka.logAkkaConfig", false)) "on" else "off"
    val akkaHeartBeatPauses = conf.getInt("spark.akka.heartbeat.pauses", 6000)
    val akkaFailureDetector =
      conf.getDouble("spark.akka.failure-detector.threshold", 300.0)
    val akkaHeartBeatInterval = conf.getInt("spark.akka.heartbeat.interval", 1000)
val secretKey = securityManager.getSecretKey()
    val isAuthOn = securityManager.isAuthenticationEnabled()
    if (isAuthOn && secretKey == null) {
      throw new Exception("Secret key is null with authentication on")
    }
    val requireCookie = if (isAuthOn) "on" else "off"
    val secureCookie = if (isAuthOn) secretKey else ""
    logDebug("In createActorSystem, requireCookie is: " + requireCookie)
    val akkaConf = ConfigFactory.parseMap(conf.getAkkaConf.toMap[String, String]).withFallback(
      ConfigFactory.parseString(
      s"""
      |akka.daemonic = on
      |akka.loggers = [""akka.event.slf4j.Slf4jLogger""]
      |akka.stdout-loglevel = "ERROR"
      |akka.jvm-exit-on-fatal-error = off
      |akka.remote.require-cookie = "$requireCookie"
      |akka.remote.secure-cookie = "$secureCookie"
      |akka.remote.transport-failure-detector.heartbeat-interval = $akkaHeartBeatInterval s
      |akka.remote.transport-failure-detector.acceptable-heartbeat-pause = $akkaHeartBeatPauses s
      |akka.remote.transport-failure-detector.threshold = $akkaFailureDetector
      |akka.actor.provider = "akka.remote.RemoteActorRefProvider"
      |akka.remote.netty.tcp.transport-class = "akka.remote.transport.netty.NettyTransport"
      |akka.remote.netty.tcp.hostname = "$host"
      |akka.remote.netty.tcp.port = $port
      |akka.remote.netty.tcp.tcp-nodelay = on
      |akka.remote.netty.tcp.connection-timeout = $akkaTimeout s
      |akka.remote.netty.tcp.maximum-frame-size = ${akkaFrameSize}B
      |akka.remote.netty.tcp.execution-pool-size = $akkaThreads
      |akka.actor.default-dispatcher.throughput = $akkaBatchSize
      |akka.log-config-on-start = $logAkkaConfig
      |akka.remote.log-remote-lifecycle-events = $lifecycleEvents
      |akka.log-dead-letters = $lifecycleEvents
      |akka.log-dead-letters-during-shutdown = $lifecycleEvents
      """.stripMargin))
    val actorSystem = ActorSystem(name, akkaConf)
    val provider = actorSystem.asInstanceOf[ExtendedActorSystem].provider
    val boundPort = provider.getDefaultAddress.port.get
    (actorSystem, boundPort)
  }

(2)makeDriverRef

功能描述:从远端ActorSystem中查找已经注册的某个Actor。

def makeDriverRef(name: String, conf: SparkConf, actorSystem: ActorSystem): ActorRef = {
    val driverActorSystemName = SparkEnv.driverActorSystemName
    val driverHost: String = conf.get("spark.driver.host", "localhost")
    val driverPort: Int = conf.getInt("spark.driver.port", 7077)
    Utils.checkHost(driverHost, "Expected hostname")
    val url = s"akka.tcp://$driverActorSystemName@$driverHost:$driverPort/user/$name"
    val timeout = AkkaUtils.lookupTimeout(conf)
    logInfo(s"Connecting to $name: $url")
    Await.result(actorSystem.actorSelection(url).resolveOne(timeout), timeout)
  }
相关文章
|
1月前
|
消息中间件 Unix Linux
Linux进程间通信(IPC)介绍:详细解析IPC的执行流程、状态和通信机制
Linux进程间通信(IPC)介绍:详细解析IPC的执行流程、状态和通信机制
48 1
|
4月前
|
消息中间件 存储 Unix
进程间通信和线程间通信总结
写在前面 面试的时候一定不要疲劳战,比如上午面了一个,然后中午不休息直接赶到另外一个相距比较远的公司,影响状态。 面试的时候一定不要紧张,不管对方有几个人,总之面试的时候做好充分准备,休息好,放松心态。 好了,言归正传,开始总结。
39 0
|
4月前
|
消息中间件 存储 程序员
进程间的通信
进程间的通信
33 1
|
5月前
|
Linux
百度搜索:蓝易云【【Linux】进程间的通信之共享内存】
总结而言,Linux中的共享内存是一种用于进程间通信的高效机制,它允许多个进程共享同一块物理内存区域,实现数据的快速交换。通过适当的创建、连接、分离和删除操作,可以实现进程对共享内存的使用和管理。
223 7
|
3月前
|
编解码 监控 API
操作系统:进程的控制和通信(Windows2000)
操作系统:进程的控制和通信(Windows2000)
45 0
|
1月前
|
消息中间件 Linux API
跨进程通信设计:Qt 进程间通讯类全面解析
跨进程通信设计:Qt 进程间通讯类全面解析
78 0
|
1月前
|
消息中间件 并行计算 网络协议
探秘高效Linux C/C++项目架构:让进程、线程和通信方式助力你的代码飞跃
探秘高效Linux C/C++项目架构:让进程、线程和通信方式助力你的代码飞跃
34 0
|
2月前
|
Python
Python多进程间通信的最佳实践
Python多进程间通信的最佳实践
|
2月前
|
消息中间件 Linux
Linux下的系统编程——进程间的通信(九)
Linux下的系统编程——进程间的通信(九)
36 1
Linux下的系统编程——进程间的通信(九)
|
3月前
|
存储 缓存 Unix
C语言第四章(进程间的通信,管道通信,pipe()函数)
C语言第四章(进程间的通信,管道通信,pipe()函数)
53 0

相关实验场景

更多