Spark如何使用Akka实现进程、节点通信的简明介绍

简介: Akka是一款提供了用于构建高并发的、分布式的、可伸缩的、基于Java虚拟机的消息驱动应用的工具集和运行时环境。

Akka简介

Scala认为Java线程通过共享数据以及通过锁来维护共享数据的一致性是糟糕的做法,容易引起锁的争用,而且线程的上下文切换会带来不少开销,降低并发程序的性能,甚至会引入死锁的问题。在Scala中只需要自定义类型继承Actor,并且提供act方法,就如同Java里实现Runnable接口,需要实现run方法一样。但是不能直接调用act方法,而是通过发送消息的方式(Scala发送消息是异步的),传递数据。如:

     Actor ! message
     Akka是Actor编程模型的高级类库,类似于JDK 1.5之后越来越丰富的并发工具包,简化了程序员并发编程的难度。Akka是一款提供了用于构建高并发的、分布式的、可伸缩的、基于Java虚拟机的消息驱动应用的工具集和运行时环境。从下面Akka官网提供的一段代码示例,可以看出Akka并发编程的简约。
case class Greeting(who: String)
class GreetingActor extends Actor with ActorLogging {
def receive = {
case Greeting(who) ⇒ log.info("Hello " + who)
      }
}
val system = ActorSystem("MySystem")
val greeter = system.actorOf(Props[GreetingActor], name = "greeter")
greeter ! Greeting("Charlie Parker")

Akka提供了分布式的框架,意味着用户不需要考虑如何实现分布式部署,Akka官网提供了下面的示例演示如何获取远程Actor的引用。

// config on all machines
akka {
 actor {
   provider = akka.remote.RemoteActorRefProvider
   deployment {
     /greeter {
       remote = akka.tcp://MySystem@machine1:2552
     }
   }
 }
}
// ------------------------------
// define the greeting actor and the greeting message
case class Greeting(who: String) extends Serializable
class GreetingActor extends Actor with ActorLogging {
  def receive = {
    case Greeting(who) ⇒ log.info("Hello " + who)
  }
}
// ------------------------------
// on machine 1: empty system, target for deployment from machine 2
val system = ActorSystem("MySystem")
// ------------------------------
// on machine 2: Remote Deployment - deploying on machine1
val system = ActorSystem("MySystem")
val greeter = system.actorOf(Props[GreetingActor], name = "greeter")
// ------------------------------
// on machine 3: Remote Lookup (logical home of “greeter” is machine2, remote deployment is transparent)
val system = ActorSystem("MySystem")
val greeter = system.actorSelection("akka.tcp://MySystem@machine2:2552/user/greeter")
greeter ! Greeting("Sonny Rollins")

Actor之间最终会构成一棵树,作为父亲的Actor应当对所有儿子的异常失败进行处理(监管)Akka给出了简单的示例,代码如下。

class Supervisor extends Actor {
  override val supervisorStrategy =
    OneForOneStrategy(maxNrOfRetries = 10, withinTimeRange = 1 minute) {
      case _: ArithmeticException      ⇒ Resume
      case _: NullPointerException     ⇒ Restart
      case _: Exception                ⇒ Escalate
    }
  val worker = context.actorOf(Props[Worker])
  def receive = {
    case n: Int => worker forward n
  }
}

Akka的更多信息请访问官方网站:http://akka.io/

基于Akka的分布式消息系统ActorSystem

     Spark使用Akka提供的消息系统实现并发:ActorSystem是Spark中最基础的设施,Spark既使用它发送分布式消息,又用它实现并发编程。正是因为Actor轻量级的并发编程、消息发送以及ActorSystem支持分布式消息发送等特点,Spark选择了ActorSystem。
     SparkEnv中创建ActorSystem时用到了AkkaUtils工具类,代码如下。
    val (actorSystem, boundPort) =
      Option(defaultActorSystem) match {
        case Some(as) => (as, port)
        case None =>
          val actorSystemName = if (isDriver) driverActorSystemName else executorActorSystemName
          AkkaUtils.createActorSystem(actorSystemName, hostname, port, conf, securityManager)
      }

AkkaUtils.createActorSystem方法用于启动ActorSystem,代码如下。

  def createActorSystem(
      name: String,
      host: String,
      port: Int,
      conf: SparkConf,
      securityManager: SecurityManager): (ActorSystem, Int) = {
    val startService: Int => (ActorSystem, Int) = { actualPort =>
      doCreateActorSystem(name, host, actualPort, conf, securityManager)
    }
    Utils.startServiceOnPort(port, startService, conf, name)
  }

AkkaUtils使用了Utils的静态方法startServiceOnPort, startServiceOnPort最终会回调方法startService: Int=> (T, Int),此处的startService实际是方法doCreateActorSystem。真正启动ActorSystem是由doCreateActorSystem方法完成的,doCreateActorSystem的具体实现细节请见AkkaUtils的详细介绍。关于startServiceOnPort的实现,请参阅《Spark中常用工具类Utils的简明介绍》一文的内容。

AkkaUtils

AkkaUtils是Spark对Akka相关API的又一层封装,这里对其常用的功能进行介绍。

(1)doCreateActorSystem

功能描述:创建ActorSystem。

private def doCreateActorSystem(
      name: String,
      host: String,
      port: Int,
      conf: SparkConf,
      securityManager: SecurityManager): (ActorSystem, Int) = {

    val akkaThreads   = conf.getInt("spark.akka.threads", 4)
    val akkaBatchSize = conf.getInt("spark.akka.batchSize", 15)
    val akkaTimeout = conf.getInt("spark.akka.timeout", 100)
    val akkaFrameSize = maxFrameSizeBytes(conf)
    val akkaLogLifecycleEvents = conf.getBoolean("spark.akka.logLifecycleEvents", false)
    val lifecycleEvents = if (akkaLogLifecycleEvents) "on" else "off"
    if (!akkaLogLifecycleEvents) {
      Option(Logger.getLogger("akka.remote.EndpointWriter")).map(l => l.setLevel(Level.FATAL))
    }
    val logAkkaConfig = if (conf.getBoolean("spark.akka.logAkkaConfig", false)) "on" else "off"
    val akkaHeartBeatPauses = conf.getInt("spark.akka.heartbeat.pauses", 6000)
    val akkaFailureDetector =
      conf.getDouble("spark.akka.failure-detector.threshold", 300.0)
    val akkaHeartBeatInterval = conf.getInt("spark.akka.heartbeat.interval", 1000)
val secretKey = securityManager.getSecretKey()
    val isAuthOn = securityManager.isAuthenticationEnabled()
    if (isAuthOn && secretKey == null) {
      throw new Exception("Secret key is null with authentication on")
    }
    val requireCookie = if (isAuthOn) "on" else "off"
    val secureCookie = if (isAuthOn) secretKey else ""
    logDebug("In createActorSystem, requireCookie is: " + requireCookie)
    val akkaConf = ConfigFactory.parseMap(conf.getAkkaConf.toMap[String, String]).withFallback(
      ConfigFactory.parseString(
      s"""
      |akka.daemonic = on
      |akka.loggers = [""akka.event.slf4j.Slf4jLogger""]
      |akka.stdout-loglevel = "ERROR"
      |akka.jvm-exit-on-fatal-error = off
      |akka.remote.require-cookie = "$requireCookie"
      |akka.remote.secure-cookie = "$secureCookie"
      |akka.remote.transport-failure-detector.heartbeat-interval = $akkaHeartBeatInterval s
      |akka.remote.transport-failure-detector.acceptable-heartbeat-pause = $akkaHeartBeatPauses s
      |akka.remote.transport-failure-detector.threshold = $akkaFailureDetector
      |akka.actor.provider = "akka.remote.RemoteActorRefProvider"
      |akka.remote.netty.tcp.transport-class = "akka.remote.transport.netty.NettyTransport"
      |akka.remote.netty.tcp.hostname = "$host"
      |akka.remote.netty.tcp.port = $port
      |akka.remote.netty.tcp.tcp-nodelay = on
      |akka.remote.netty.tcp.connection-timeout = $akkaTimeout s
      |akka.remote.netty.tcp.maximum-frame-size = ${akkaFrameSize}B
      |akka.remote.netty.tcp.execution-pool-size = $akkaThreads
      |akka.actor.default-dispatcher.throughput = $akkaBatchSize
      |akka.log-config-on-start = $logAkkaConfig
      |akka.remote.log-remote-lifecycle-events = $lifecycleEvents
      |akka.log-dead-letters = $lifecycleEvents
      |akka.log-dead-letters-during-shutdown = $lifecycleEvents
      """.stripMargin))
    val actorSystem = ActorSystem(name, akkaConf)
    val provider = actorSystem.asInstanceOf[ExtendedActorSystem].provider
    val boundPort = provider.getDefaultAddress.port.get
    (actorSystem, boundPort)
  }

(2)makeDriverRef

功能描述:从远端ActorSystem中查找已经注册的某个Actor。

def makeDriverRef(name: String, conf: SparkConf, actorSystem: ActorSystem): ActorRef = {
    val driverActorSystemName = SparkEnv.driverActorSystemName
    val driverHost: String = conf.get("spark.driver.host", "localhost")
    val driverPort: Int = conf.getInt("spark.driver.port", 7077)
    Utils.checkHost(driverHost, "Expected hostname")
    val url = s"akka.tcp://$driverActorSystemName@$driverHost:$driverPort/user/$name"
    val timeout = AkkaUtils.lookupTimeout(conf)
    logInfo(s"Connecting to $name: $url")
    Await.result(actorSystem.actorSelection(url).resolveOne(timeout), timeout)
  }
相关文章
|
1月前
|
存储 Unix Linux
进程间通信方式-----管道通信
【10月更文挑战第29天】管道通信是一种重要的进程间通信机制,它为进程间的数据传输和同步提供了一种简单有效的方法。通过合理地使用管道通信,可以实现不同进程之间的协作,提高系统的整体性能和效率。
|
1月前
|
消息中间件 存储 供应链
进程间通信方式-----消息队列通信
【10月更文挑战第29天】消息队列通信是一种强大而灵活的进程间通信机制,它通过异步通信、解耦和缓冲等特性,为分布式系统和多进程应用提供了高效的通信方式。在实际应用中,需要根据具体的需求和场景,合理地选择和使用消息队列,以充分发挥其优势,同时注意其可能带来的复杂性和性能开销等问题。
|
2月前
|
存储 Python
Python中的多进程通信实践指南
Python中的多进程通信实践指南
33 0
|
3月前
|
Java Android开发 数据安全/隐私保护
Android中多进程通信有几种方式?需要注意哪些问题?
本文介绍了Android中的多进程通信(IPC),探讨了IPC的重要性及其实现方式,如Intent、Binder、AIDL等,并通过一个使用Binder机制的示例详细说明了其实现过程。
404 4
|
4月前
|
Linux
Linux源码阅读笔记13-进程通信组件中
Linux源码阅读笔记13-进程通信组件中
|
4月前
|
消息中间件 安全 Java
Linux源码阅读笔记13-进程通信组件上
Linux源码阅读笔记13-进程通信组件上
|
4月前
|
消息中间件 存储 安全
python多进程并发编程之互斥锁与进程间的通信
python多进程并发编程之互斥锁与进程间的通信
|
4月前
|
微服务 Windows
【Azure微服务 Service Fabric 】在SF节点中开启Performance Monitor及设置抓取进程的方式
【Azure微服务 Service Fabric 】在SF节点中开启Performance Monitor及设置抓取进程的方式
|
4月前
|
Python
Python IPC深度探索:解锁跨进程通信的无限可能,以管道与队列为翼,让你的应用跨越边界,无缝协作,震撼登场
【8月更文挑战第3天】Python IPC大揭秘:解锁进程间通信新姿势,让你的应用无界连接
28 0
|
5月前
|
消息中间件 分布式计算 网络协议
从管道路由到共享内存:进程间通信的演变(内附通信方式经典面试题及详解)
进程间通信(Inter-Process Communication, IPC)是计算机科学中的一个重要概念,指的是运行在同一系统或不同系统上的多个进程之间互相发送和接收信息的能力。IPC机制允许进程间共享数据、协调执行流程,是实现分布式系统、多任务操作系统和并发编程的基础。
从管道路由到共享内存:进程间通信的演变(内附通信方式经典面试题及详解)

相关实验场景

更多