Akka框架基本要点介绍

本文涉及的产品
日志服务 SLS,月写入数据量 50GB 1个月
简介:

Akka基于Actor模型,提供了一个用于构建可扩展的(Scalable)、弹性的(Resilient)、快速响应的(Responsive)应用程序的平台。本文基本上是基于Akka的官方文档(版本是2.3.12),通过自己的理解,来阐述Akka提供的一些组件或概念,另外总结了Akka的一些使用场景。

Actor

维基百科这样定义Actor模型:

在计算科学领域,Actor模型是一个并行计算(Concurrent Computation)模型,它把actor作为并行计算的基本元素来对待:为响应一个接收到的消息,一个actor能够自己做出一些决策,如创建更多的actor,或发送更多的消息,或者确定如何去响应接收到的下一个消息。

Actor是Akka中最核心的概念,它是一个封装了状态和行为的对象,Actor之间可以通过交换消息的方式进行通信,每个Actor都有自己的收件箱(Mailbox)。
通过Actor能够简化锁及线程管理,可以非常容易地开发出正确地并发程序和并行系统,Actor具有如下特性:

  • 提供了一种高级抽象,能够简化在并发(Concurrency)/并行(Parallelism)应用场景下的编程开发
  • 提供了异步非阻塞的、高性能的事件驱动编程模型
  • 超级轻量级事件处理(每GB堆内存几百万Actor)

实现一个Actor,可以继承特质akka.actor.Actor,实现一个receive方法,应该在receive方法中定义一系列的case语句,基于标准Scala的模式匹配方法,来实现每一种消息的处理逻辑。
我们先看一下Akka中特质Actor的定义:

01 trait Actor {
02
03 import Actor._
04
05 type Receive = Actor.Receive
06
07 implicit val context: ActorContext = {
08 val contextStack = ActorCell.contextStack.get
09 if ((contextStack.isEmpty) || (contextStack.head eq null))
10 throw ActorInitializationException(
11 s"You cannot create an instance of [${getClass.getName}] explicitly using the constructor (new). " +
12 "You have to use one of the 'actorOf' factory methods to create a new actor. See the documentation.")
13 val c = contextStack.head
14 ActorCell.contextStack.set(null :: contextStack)
15 c
16 }
17
18 implicit final val self = context.self //MUST BE A VAL, TRUST ME
19
20 final def sender(): ActorRef = context.sender()
21
22 def receive: Actor.Receive // 这个是在子类中一定要实现的抽象方法
23
24 protected[akka] def aroundReceive(receive: Actor.Receive, msg: Any): Unit =receive.applyOrElse(msg, unhandled)
25
26 protected[akka] def aroundPreStart(): Unit = preStart()
27
28 protected[akka] def aroundPostStop(): Unit = postStop()
29
30 protected[akka] def aroundPreRestart(reason: Throwable, message: Option[Any]): Unit= preRestart(reason, message)
31
32 protected[akka] def aroundPostRestart(reason: Throwable): Unit = postRestart(reason)
33
34 def supervisorStrategy: SupervisorStrategy = SupervisorStrategy.defaultStrategy
35
36 @throws(classOf[Exception]) // when changing this you MUST also change UntypedActorDocTest
37 def preStart(): Unit = () // 启动Actor之前需要执行的操作,默认为空实现,可以重写该方法
38
39 @throws(classOf[Exception]) // when changing this you MUST also change UntypedActorDocTest
40 def postStop(): Unit = () // 终止Actor之前需要执行的操作,默认为空实现,可以重写该方法
41
42 @throws(classOf[Exception]) // when changing this you MUST also change UntypedActorDocTest
43 def preRestart(reason: Throwable, message: Option[Any]): Unit = { // 重启Actor之前需要执行的操作,默认终止该Actor所监督的所有子Actor,然后调用postStop()方法,可以重写该方法
44 context.children foreach { child ⇒
45 context.unwatch(child)
46 context.stop(child)
47 }
48 postStop()
49 }
50
51 @throws(classOf[Exception]) // when changing this you MUST also change UntypedActorDocTest
52 def postRestart(reason: Throwable): Unit = { // 重启Actor之前需要执行的操作,默认执行preStart()的实现逻辑,可以重写该方法
53 preStart()
54 }
55
56 def unhandled(message: Any): Unit = {
57 message match {
58 case Terminated(dead) ⇒ throw new DeathPactException(dead)
59 case _ ⇒ context.system.eventStream.publish(UnhandledMessage(message, sender(), self))
60 }
61 }
62 }

上面特质中提供了几个Hook,具体说明可以看代码中注释,我们可以在继承该特质时重写Hook方法,实现自己的处理逻辑。
一个Actor是有生命周期(Lifecycle)的,如下图所示:

通过上图我们可以看到,一除了/system路径下面的Actor外,一个Actor初始时路径为空,调用ActorSystem的actorOf方法创建一个Actor实例,返回一个引用ActorRef,它包括一个UID和一个Path,标识了一个Actor,可以通过该引用向该Actor实例发送消息。

ActorSystem

在Akka中,一个ActorSystem是一个重量级的结构,他需要分配多个线程,所以在实际应用中,按照逻辑划分的每个应用对应一个ActorSystem实例。
一个ActorSystem是具有分层结构(Hierarchical Structure)的:一个Actor能够管理(Oversee)某个特定的函数,他可能希望将一个task分解为更小的多个子task,这样它就需要创建多个子Actor(Child Actors),并监督这些子Actor处理任务的进度等详细情况,实际上这个Actor创建了一个Supervisor来监督管理子Actor执行拆分后的多个子task,如果一个子Actor执行子task失败,那么就要向Supervisor发送一个消息说明处理子task失败。需要知道的是,一个Actor能且仅能有一个Supervisor,就是创建它的那个Actor。基于被监控任务的性质和失败的性质,一个Supervisor可以选择执行如下操作选择:

  1. 重新开始(Resume)一个子Actor,保持它内部的状态
  2. 重启一个子Actor,清除它内部的状态
  3. 终止一个子Actor
  4. 扩大失败的影响,从而使这个子Actor失败

将一个Actor以一个监督层次结构视图来看是非常重要的,因为它诠释了上面第4种操作选择的存在性,而且对前3种操作选择也有影响:重新开始(Resume)一个Actor,则该Actor的所有子Actor都继续工作;重启一个Actor,则该Actor的所有子Actor都被重新启动;终止一个Actor,则该Actor的所有子Actor都被终止。另外,一个Actor的preRestart方法的默认行为是终止所有子Actor,如果我们不想这样,可以在继承Actor的实现中重写preRestart方法的逻辑。
一个ActorSystem在创建过程中,至少启动3个Actor,如下图所示:

上图是一个类似树状层次结构,ActorSystem的Top-Level层次结构,与Actor关联起来,称为Actor路径(Actor Path),不同的路径代表了不同的监督范围(Supervision Scope)。下面说明ActorSystem的监督范围:

  • “/”路径:通过根路径可以搜索到所有的Actor
  • “/user”路径:用户创建的Top-Level Actor在该路径下面,通过调用ActorSystem.actorOf来实现Actor的创建
  • “/system”路径:系统创建的Top-Level Actor在该路径下面
  • “/deadLetters”路径:消息被发送到已经终止,或者不存在的Actor,这些Actor都在该路径下面
  • “/temp”路径:被系统临时创建的Actor在该路径下面
  • “/remote”路径:改路径下存在的Actor,它们的Supervisor都是远程Actor的引用

TypedActor

TypedActor是Akka基于Active对象(Active Object)设计模式的一个实现,关于Active对象模式,可以看维基百科的定义:
Active对象模式解耦了在一个对象上执行方法和调用方法的逻辑,执行方法和调用方法分别在各自的线程执行上下文中。该模式的目标是通过使用异步方法调用和一个调度器来处理请求,从而实现并行计算处理,该模式由6个元素组成:

  • 一个Proxy对象,提供一个面向客户端的接口和一组公共的方法
  • 一个接口,定义了请求一个Active对象上的方法的集合
  • 一个来自客户端请求的列表
  • 一个调度器,确定下一次处理哪一个请求
  • Active对象上方法的实现
  • 一个回掉或者变量,供客户端接收请求被处理后的结果

通过前面对Actor的了解,我们知道Actor更适用于在Akka的Actor系统之间来实现并行计算处理,而TypedActor适用于桥接Actor系统和非Actor系统。TypedActor是基于JDK的Proxy来实现的,与Actor不同的是,Actor一次处理一个消息,而TypedActor一次处理一个调用(Call)。关于更多关于TypedActor,可以查看Akka文档。

Cluster

Akka Cluster提供了一个容错(Fault-Tolerant)、去中心化(Decentralized)、基于P2P的集群服务,而且不会出现单点故障(SPOF, Single Point Of Failure)。Akka基于Gossip实现集群服务,而且支持服务自动失败检测。
关于Gossip协议的说明,维基百科说明如下所示:

Gossip协议是点对点(Computer-to-Computer)通信协议的一种,它受社交网络中的流言传播的特点所启发。现在分布式系统常常使用Gossip协议来解决其他方式所无法解决的问题,或者是由于底层网络的超大特殊结构,或者是因为Gossip方案是解决这类问题最有效的一种方式。

一个Akka集群由一组成员节点组成,每个成员节点通过hostname:port:uid来唯一标识,并且每个成员节点之间是解耦合的(Decoupled)。一个Akka应用程序是一个分布式应用程序,它具有一个Actor的集合S,而每个节点上可以启动这个Akka应用S的集合的的一部分Actor,而不必是全集S。如果一个新的成员节点需要加入到Akka集群,只需要在集群中任意一个成员节点上执行Join命令即可。
Akka集群中各个成员节点之间的状态关系,如下图所示:

Akka集群中任何一个成员节点都有可能成为集群的Leader,这是基于Gossip收敛(Convergence)过程得到的确定性结果,没有经过选举的过程。Leader只是一种角色,在各轮Gossip收敛过程中Leader是不断变化的。Leader的职责是使成员节点进入/离开集群。
一个成员节点开始于joining状态,一旦所有其节点都看到了该新加入Akka集群的节点,则Leader会设置这个节点的状态为up。
如果一个节点安全离开Akka集群,可预期地它的状态会变为leaving状态,当Leader看到该节点为leaving状态,会将其状态修改为exiting,然后当所有节点看到该节点状态为exiting,则Leader将该节点移除,状态修改为removed状态。
如果一个节点处于unreachable状态,基于Gossip协议Leader是无法执行任何操作收敛(Convergence)到该节点的,所以unreachable状态的节点的状态是必须被改变的,它必须变成reachable状态或者down状态。如果该节点想再次加入到Akka集群,它必须需要重新启动,并且重新加入集群(经由joining状态)。

Remoting

Akka Remoting的设计目标是基于P2P风格的网络通信,所以它存在如下限制:

  • 不支持NAT(Network Address Translation)
  • 不支持负载均衡器(Load Balancers)

Akka提供了种方式来使用Remoting功能:

  • 通过调用actorSelection方法搜索一个actor,该方法输入的参数的模式为:akka.<protocol>://<actor system>@<hostname>:<port>/<actor path>
  • 通过actorOf方法创建一个actor

下面看一下Remoting系统中故障恢复模型(Failure Recovery Model),如下图所示:

上图中,连接到一个远程系统的过程中,包括上面4种状态:在进行任何通信之前,系统处于Idle状态;当第一次一个消息尝试向远程系统发送,或者当远程系统连接过来,这时系统状态变为Active;当两个系统通信失败,连接丢失,这时系统变为Gated状态;当系统通信过程中,由于参与通信的系统的状态不一致导致系统无法恢复,这时远程系统变为Quarantined状态,只有重新启动系统才能恢复,重启后系统变为Active状态。

Persistence

Akka的持久性能够使得有状态的Actor实例保存它的内部状态,在Actor重启后能够更快的进行恢复。需要强调的是,持久化的仅仅是Actor的内部状态,而不是Actor当前的状态,Actor内部状态的变化会被一追加的方式存到到指定的存储中,一旦追加完成存储状态,这些数据就不会被更新。有状态的Actor通过重放(Replay)持久化的状态来快速恢复,重建内部状态。
Akka Persistence的架构有如下几个要点:

  • PersistentActor

它是一个持久的、有状态的Actor,能够将持久化消息到一个日志系统中。当一个PersistentActor重启的时候,它能够重放记录到日志系统中的消息,从而基于这些消息来恢复一个Actor的内部状态。

  • PersistentView

持久化视图是一个持久的有状态的Actor,它接收被记录到一个PersistentActor中的消息,但是它本身并不记录消息到日志系统,而是通过复制一个PersistentActor的消息流,来更新自己内部状态。

  • AtLeastOnceDelivery

提供了一个消息至少传递一次(At-Lease-Once)的语义,在发送者和接收者所在的JVM崩溃的时候,将消息传递到目的地。

  • Journal

一个日志系统存储发送给一个PersistentActor的消息序列,可以在应用程序中控制是否一个PersistentActor将消息序列记录到日志中。日志系统是支持插件式的,默认情况下,消息被记录到本地文件系统中。

Akka Camel

Akka提供了一个模块,能够与Apache Camel整合。Apache Camel是一个实现了EIP(Enterprise Integration Patterns)的整合框架,支持通过各种各样的协议进行消息交换。所以Akka的Actor可以通过Scala或Java API与其它系统进行通信,协议比如HTTP、SOAP、TCP、FTP、SMTP、JMS。

Akka适用场景

Akka适用场景非常广泛,这里根据一些已有的使用案例来总结一下,Akka能够在哪些应用场景下投入生产环境:

  • 事务处理(Transaction Processing)

在线游戏系统、金融/银行系统、交易系统、投注系统、社交媒体系统、电信服务系统。

  • 后端服务(Service Backend)

任何行业的任何类型的应用都可以使用,比如提供REST、SOAP等风格的服务,类似于一个服务总线,Akka支持纵向&横向扩展,以及容错/高可用(HA)的特性。

  • 并行计算(Concurrency/Parallelism)

任何具有并发/并行计算需求的行业,基于JVM的应用都可以使用,如使用编程语言Scala、Java、Groovy、JRuby开发。

  • 仿真

Master/Slave架构风格的计算系统、计算网格系统、MapReduce系统。

  • 通信Hub(Communications Hub)

电信系统、Web媒体系统、手机媒体系统。

  • 复杂事件流处理(Complex Event Stream Processing)

Akka本身提供的Actor就适合处理基于事件驱动的应用,所以可以更加容易处理具有复杂事件流的应用。

其它特性

Akka还支持很多其它特性,如下所示:

  • 支持Future,可以同步或异步地获取发送消息的结果
  • 支持基于事件的Dispatcher,将多个Actor与一个线程池绑定
  • 支持消息路由,可以提供不同的消息路由策略,如Akka支持如下策略:RoundRobinRoutingLogic、RandomRoutingLogic、SmallestMailboxRoutingLogic、BroadcastRoutingLogic、ScatterGatherFirstCompletedRoutingLogic、TailChoppingRoutingLogic、ConsistentHashingRoutingLogic
  • 支持FSM,提供基于事件的状态转移
相关实践学习
【涂鸦即艺术】基于云应用开发平台CAP部署AI实时生图绘板
【涂鸦即艺术】基于云应用开发平台CAP部署AI实时生图绘板
目录
相关文章
|
消息中间件 安全 网络协议
Akka事件驱动新选择
在高并发场景解决方案中,多从线程角度出发,以解决线程安全问题,锁范围又需要多业务场景考虑,何时上锁,何时解锁,何时自动过期等,而事件驱动是从执行什么操作驱动的,在软件系统的设计层面,两者关联性不大,一个强调安全,一个强调策略,那么有没有两者结合解决并发编程难的事件驱动解决方案呢?带着场景解决方案我们走进Akka。
490 0
Akka事件驱动新选择
|
JSON 负载均衡 前端开发
一文带你详细了解Open API设计规范
一文带你详细了解Open API设计规范
7960 1
|
SQL 存储 Oracle
6 张图带你彻底搞懂分布式事务 XA 模式
XA 协议是由 X/Open 组织提出的分布式事务处理规范,主要定义了事务管理器 TM 和局部资源管理器 RM 之间的接口。目前主流的数据库,比如 oracle、DB2 都是支持 XA 协议的。
13476 1
6 张图带你彻底搞懂分布式事务 XA 模式
|
前端开发 网络协议 Dubbo
超详细Netty入门,看这篇就够了!
本文主要讲述Netty框架的一些特性以及重要组件,希望看完之后能对Netty框架有一个比较直观的感受,希望能帮助读者快速入门Netty,减少一些弯路。
90697 32
超详细Netty入门,看这篇就够了!
|
Web App开发 XML Java
java.lang.IllegalStateException: Async support must be enabled on a servlet and for all filters invo
Spring MVC 项目在使用 DeferredResult 实现异步接口时出现错误。 完整错误信息如下: 十一月 03, 2017 8:59:53 上午 org.apache.catalina.
2695 0
|
人工智能 算法 Java
AI:互联网程序设计竞赛之蓝桥杯大赛的简介、奖项设置、大赛内容以及蓝桥杯与ACM(ICPC)的四个维度对比之详细攻略
AI:互联网程序设计竞赛之蓝桥杯大赛的简介、奖项设置、大赛内容以及蓝桥杯与ACM(ICPC)的四个维度对比之详细攻略
AI:互联网程序设计竞赛之蓝桥杯大赛的简介、奖项设置、大赛内容以及蓝桥杯与ACM(ICPC)的四个维度对比之详细攻略
|
分布式计算 并行计算 数据库
Schedulerx2.0分布式计算原理&最佳实践
1. 前言 Schedulerx2.0的客户端提供分布式执行、多种任务类型、统一日志等框架,用户只要依赖schedulerx-worker这个jar包,通过schedulerx2.0提供的编程模型,简单几行代码就能实现一套高可靠可运维的分布式执行引擎。
25985 2
|
存储 弹性计算 固态存储
三分钟磁盘存储性能IOPS、I/O及吞吐量指标详解
什么是磁盘I/O?存储IOPS是什么?磁盘性能指标吞吐量是指什么?
17326 39
|
消息中间件 Java 数据库
Spring事务监听机制---使用@TransactionalEventListener处理数据库事务提交成功后再执行操作(附:Spring4.2新特性讲解)【享学Spring】(上)
Spring事务监听机制---使用@TransactionalEventListener处理数据库事务提交成功后再执行操作(附:Spring4.2新特性讲解)【享学Spring】(上)