Akka框架基本要点介绍

本文涉及的产品
日志服务 SLS,月写入数据量 50GB 1个月
简介:

Akka基于Actor模型,提供了一个用于构建可扩展的(Scalable)、弹性的(Resilient)、快速响应的(Responsive)应用程序的平台。本文基本上是基于Akka的官方文档(版本是2.3.12),通过自己的理解,来阐述Akka提供的一些组件或概念,另外总结了Akka的一些使用场景。

Actor

维基百科这样定义Actor模型:

在计算科学领域,Actor模型是一个并行计算(Concurrent Computation)模型,它把actor作为并行计算的基本元素来对待:为响应一个接收到的消息,一个actor能够自己做出一些决策,如创建更多的actor,或发送更多的消息,或者确定如何去响应接收到的下一个消息。

Actor是Akka中最核心的概念,它是一个封装了状态和行为的对象,Actor之间可以通过交换消息的方式进行通信,每个Actor都有自己的收件箱(Mailbox)。
通过Actor能够简化锁及线程管理,可以非常容易地开发出正确地并发程序和并行系统,Actor具有如下特性:

  • 提供了一种高级抽象,能够简化在并发(Concurrency)/并行(Parallelism)应用场景下的编程开发
  • 提供了异步非阻塞的、高性能的事件驱动编程模型
  • 超级轻量级事件处理(每GB堆内存几百万Actor)

实现一个Actor,可以继承特质akka.actor.Actor,实现一个receive方法,应该在receive方法中定义一系列的case语句,基于标准Scala的模式匹配方法,来实现每一种消息的处理逻辑。
我们先看一下Akka中特质Actor的定义:

01 trait Actor {
02
03 import Actor._
04
05 type Receive = Actor.Receive
06
07 implicit val context: ActorContext = {
08 val contextStack = ActorCell.contextStack.get
09 if ((contextStack.isEmpty) || (contextStack.head eq null))
10 throw ActorInitializationException(
11 s"You cannot create an instance of [${getClass.getName}] explicitly using the constructor (new). " +
12 "You have to use one of the 'actorOf' factory methods to create a new actor. See the documentation.")
13 val c = contextStack.head
14 ActorCell.contextStack.set(null :: contextStack)
15 c
16 }
17
18 implicit final val self = context.self //MUST BE A VAL, TRUST ME
19
20 final def sender(): ActorRef = context.sender()
21
22 def receive: Actor.Receive // 这个是在子类中一定要实现的抽象方法
23
24 protected[akka] def aroundReceive(receive: Actor.Receive, msg: Any): Unit =receive.applyOrElse(msg, unhandled)
25
26 protected[akka] def aroundPreStart(): Unit = preStart()
27
28 protected[akka] def aroundPostStop(): Unit = postStop()
29
30 protected[akka] def aroundPreRestart(reason: Throwable, message: Option[Any]): Unit= preRestart(reason, message)
31
32 protected[akka] def aroundPostRestart(reason: Throwable): Unit = postRestart(reason)
33
34 def supervisorStrategy: SupervisorStrategy = SupervisorStrategy.defaultStrategy
35
36 @throws(classOf[Exception]) // when changing this you MUST also change UntypedActorDocTest
37 def preStart(): Unit = () // 启动Actor之前需要执行的操作,默认为空实现,可以重写该方法
38
39 @throws(classOf[Exception]) // when changing this you MUST also change UntypedActorDocTest
40 def postStop(): Unit = () // 终止Actor之前需要执行的操作,默认为空实现,可以重写该方法
41
42 @throws(classOf[Exception]) // when changing this you MUST also change UntypedActorDocTest
43 def preRestart(reason: Throwable, message: Option[Any]): Unit = { // 重启Actor之前需要执行的操作,默认终止该Actor所监督的所有子Actor,然后调用postStop()方法,可以重写该方法
44 context.children foreach { child ⇒
45 context.unwatch(child)
46 context.stop(child)
47 }
48 postStop()
49 }
50
51 @throws(classOf[Exception]) // when changing this you MUST also change UntypedActorDocTest
52 def postRestart(reason: Throwable): Unit = { // 重启Actor之前需要执行的操作,默认执行preStart()的实现逻辑,可以重写该方法
53 preStart()
54 }
55
56 def unhandled(message: Any): Unit = {
57 message match {
58 case Terminated(dead) ⇒ throw new DeathPactException(dead)
59 case _ ⇒ context.system.eventStream.publish(UnhandledMessage(message, sender(), self))
60 }
61 }
62 }

上面特质中提供了几个Hook,具体说明可以看代码中注释,我们可以在继承该特质时重写Hook方法,实现自己的处理逻辑。
一个Actor是有生命周期(Lifecycle)的,如下图所示:

通过上图我们可以看到,一除了/system路径下面的Actor外,一个Actor初始时路径为空,调用ActorSystem的actorOf方法创建一个Actor实例,返回一个引用ActorRef,它包括一个UID和一个Path,标识了一个Actor,可以通过该引用向该Actor实例发送消息。

ActorSystem

在Akka中,一个ActorSystem是一个重量级的结构,他需要分配多个线程,所以在实际应用中,按照逻辑划分的每个应用对应一个ActorSystem实例。
一个ActorSystem是具有分层结构(Hierarchical Structure)的:一个Actor能够管理(Oversee)某个特定的函数,他可能希望将一个task分解为更小的多个子task,这样它就需要创建多个子Actor(Child Actors),并监督这些子Actor处理任务的进度等详细情况,实际上这个Actor创建了一个Supervisor来监督管理子Actor执行拆分后的多个子task,如果一个子Actor执行子task失败,那么就要向Supervisor发送一个消息说明处理子task失败。需要知道的是,一个Actor能且仅能有一个Supervisor,就是创建它的那个Actor。基于被监控任务的性质和失败的性质,一个Supervisor可以选择执行如下操作选择:

  1. 重新开始(Resume)一个子Actor,保持它内部的状态
  2. 重启一个子Actor,清除它内部的状态
  3. 终止一个子Actor
  4. 扩大失败的影响,从而使这个子Actor失败

将一个Actor以一个监督层次结构视图来看是非常重要的,因为它诠释了上面第4种操作选择的存在性,而且对前3种操作选择也有影响:重新开始(Resume)一个Actor,则该Actor的所有子Actor都继续工作;重启一个Actor,则该Actor的所有子Actor都被重新启动;终止一个Actor,则该Actor的所有子Actor都被终止。另外,一个Actor的preRestart方法的默认行为是终止所有子Actor,如果我们不想这样,可以在继承Actor的实现中重写preRestart方法的逻辑。
一个ActorSystem在创建过程中,至少启动3个Actor,如下图所示:

上图是一个类似树状层次结构,ActorSystem的Top-Level层次结构,与Actor关联起来,称为Actor路径(Actor Path),不同的路径代表了不同的监督范围(Supervision Scope)。下面说明ActorSystem的监督范围:

  • “/”路径:通过根路径可以搜索到所有的Actor
  • “/user”路径:用户创建的Top-Level Actor在该路径下面,通过调用ActorSystem.actorOf来实现Actor的创建
  • “/system”路径:系统创建的Top-Level Actor在该路径下面
  • “/deadLetters”路径:消息被发送到已经终止,或者不存在的Actor,这些Actor都在该路径下面
  • “/temp”路径:被系统临时创建的Actor在该路径下面
  • “/remote”路径:改路径下存在的Actor,它们的Supervisor都是远程Actor的引用

TypedActor

TypedActor是Akka基于Active对象(Active Object)设计模式的一个实现,关于Active对象模式,可以看维基百科的定义:
Active对象模式解耦了在一个对象上执行方法和调用方法的逻辑,执行方法和调用方法分别在各自的线程执行上下文中。该模式的目标是通过使用异步方法调用和一个调度器来处理请求,从而实现并行计算处理,该模式由6个元素组成:

  • 一个Proxy对象,提供一个面向客户端的接口和一组公共的方法
  • 一个接口,定义了请求一个Active对象上的方法的集合
  • 一个来自客户端请求的列表
  • 一个调度器,确定下一次处理哪一个请求
  • Active对象上方法的实现
  • 一个回掉或者变量,供客户端接收请求被处理后的结果

通过前面对Actor的了解,我们知道Actor更适用于在Akka的Actor系统之间来实现并行计算处理,而TypedActor适用于桥接Actor系统和非Actor系统。TypedActor是基于JDK的Proxy来实现的,与Actor不同的是,Actor一次处理一个消息,而TypedActor一次处理一个调用(Call)。关于更多关于TypedActor,可以查看Akka文档。

Cluster

Akka Cluster提供了一个容错(Fault-Tolerant)、去中心化(Decentralized)、基于P2P的集群服务,而且不会出现单点故障(SPOF, Single Point Of Failure)。Akka基于Gossip实现集群服务,而且支持服务自动失败检测。
关于Gossip协议的说明,维基百科说明如下所示:

Gossip协议是点对点(Computer-to-Computer)通信协议的一种,它受社交网络中的流言传播的特点所启发。现在分布式系统常常使用Gossip协议来解决其他方式所无法解决的问题,或者是由于底层网络的超大特殊结构,或者是因为Gossip方案是解决这类问题最有效的一种方式。

一个Akka集群由一组成员节点组成,每个成员节点通过hostname:port:uid来唯一标识,并且每个成员节点之间是解耦合的(Decoupled)。一个Akka应用程序是一个分布式应用程序,它具有一个Actor的集合S,而每个节点上可以启动这个Akka应用S的集合的的一部分Actor,而不必是全集S。如果一个新的成员节点需要加入到Akka集群,只需要在集群中任意一个成员节点上执行Join命令即可。
Akka集群中各个成员节点之间的状态关系,如下图所示:

Akka集群中任何一个成员节点都有可能成为集群的Leader,这是基于Gossip收敛(Convergence)过程得到的确定性结果,没有经过选举的过程。Leader只是一种角色,在各轮Gossip收敛过程中Leader是不断变化的。Leader的职责是使成员节点进入/离开集群。
一个成员节点开始于joining状态,一旦所有其节点都看到了该新加入Akka集群的节点,则Leader会设置这个节点的状态为up。
如果一个节点安全离开Akka集群,可预期地它的状态会变为leaving状态,当Leader看到该节点为leaving状态,会将其状态修改为exiting,然后当所有节点看到该节点状态为exiting,则Leader将该节点移除,状态修改为removed状态。
如果一个节点处于unreachable状态,基于Gossip协议Leader是无法执行任何操作收敛(Convergence)到该节点的,所以unreachable状态的节点的状态是必须被改变的,它必须变成reachable状态或者down状态。如果该节点想再次加入到Akka集群,它必须需要重新启动,并且重新加入集群(经由joining状态)。

Remoting

Akka Remoting的设计目标是基于P2P风格的网络通信,所以它存在如下限制:

  • 不支持NAT(Network Address Translation)
  • 不支持负载均衡器(Load Balancers)

Akka提供了种方式来使用Remoting功能:

  • 通过调用actorSelection方法搜索一个actor,该方法输入的参数的模式为:akka.<protocol>://<actor system>@<hostname>:<port>/<actor path>
  • 通过actorOf方法创建一个actor

下面看一下Remoting系统中故障恢复模型(Failure Recovery Model),如下图所示:

上图中,连接到一个远程系统的过程中,包括上面4种状态:在进行任何通信之前,系统处于Idle状态;当第一次一个消息尝试向远程系统发送,或者当远程系统连接过来,这时系统状态变为Active;当两个系统通信失败,连接丢失,这时系统变为Gated状态;当系统通信过程中,由于参与通信的系统的状态不一致导致系统无法恢复,这时远程系统变为Quarantined状态,只有重新启动系统才能恢复,重启后系统变为Active状态。

Persistence

Akka的持久性能够使得有状态的Actor实例保存它的内部状态,在Actor重启后能够更快的进行恢复。需要强调的是,持久化的仅仅是Actor的内部状态,而不是Actor当前的状态,Actor内部状态的变化会被一追加的方式存到到指定的存储中,一旦追加完成存储状态,这些数据就不会被更新。有状态的Actor通过重放(Replay)持久化的状态来快速恢复,重建内部状态。
Akka Persistence的架构有如下几个要点:

  • PersistentActor

它是一个持久的、有状态的Actor,能够将持久化消息到一个日志系统中。当一个PersistentActor重启的时候,它能够重放记录到日志系统中的消息,从而基于这些消息来恢复一个Actor的内部状态。

  • PersistentView

持久化视图是一个持久的有状态的Actor,它接收被记录到一个PersistentActor中的消息,但是它本身并不记录消息到日志系统,而是通过复制一个PersistentActor的消息流,来更新自己内部状态。

  • AtLeastOnceDelivery

提供了一个消息至少传递一次(At-Lease-Once)的语义,在发送者和接收者所在的JVM崩溃的时候,将消息传递到目的地。

  • Journal

一个日志系统存储发送给一个PersistentActor的消息序列,可以在应用程序中控制是否一个PersistentActor将消息序列记录到日志中。日志系统是支持插件式的,默认情况下,消息被记录到本地文件系统中。

Akka Camel

Akka提供了一个模块,能够与Apache Camel整合。Apache Camel是一个实现了EIP(Enterprise Integration Patterns)的整合框架,支持通过各种各样的协议进行消息交换。所以Akka的Actor可以通过Scala或Java API与其它系统进行通信,协议比如HTTP、SOAP、TCP、FTP、SMTP、JMS。

Akka适用场景

Akka适用场景非常广泛,这里根据一些已有的使用案例来总结一下,Akka能够在哪些应用场景下投入生产环境:

  • 事务处理(Transaction Processing)

在线游戏系统、金融/银行系统、交易系统、投注系统、社交媒体系统、电信服务系统。

  • 后端服务(Service Backend)

任何行业的任何类型的应用都可以使用,比如提供REST、SOAP等风格的服务,类似于一个服务总线,Akka支持纵向&横向扩展,以及容错/高可用(HA)的特性。

  • 并行计算(Concurrency/Parallelism)

任何具有并发/并行计算需求的行业,基于JVM的应用都可以使用,如使用编程语言Scala、Java、Groovy、JRuby开发。

  • 仿真

Master/Slave架构风格的计算系统、计算网格系统、MapReduce系统。

  • 通信Hub(Communications Hub)

电信系统、Web媒体系统、手机媒体系统。

  • 复杂事件流处理(Complex Event Stream Processing)

Akka本身提供的Actor就适合处理基于事件驱动的应用,所以可以更加容易处理具有复杂事件流的应用。

其它特性

Akka还支持很多其它特性,如下所示:

  • 支持Future,可以同步或异步地获取发送消息的结果
  • 支持基于事件的Dispatcher,将多个Actor与一个线程池绑定
  • 支持消息路由,可以提供不同的消息路由策略,如Akka支持如下策略:RoundRobinRoutingLogic、RandomRoutingLogic、SmallestMailboxRoutingLogic、BroadcastRoutingLogic、ScatterGatherFirstCompletedRoutingLogic、TailChoppingRoutingLogic、ConsistentHashingRoutingLogic
  • 支持FSM,提供基于事件的状态转移
相关实践学习
日志服务之使用Nginx模式采集日志
本文介绍如何通过日志服务控制台创建Nginx模式的Logtail配置快速采集Nginx日志并进行多维度分析。
目录
相关文章
|
缓存 负载均衡 网络协议
自己动手从0开始实现一个分布式RPC框架
如果一个程序员能清楚的了解RPC框架所具备的要素,掌握RPC框架中涉及的服务注册发现、负载均衡、序列化协议、RPC通信协议、Socket通信、异步调用、熔断降级等技术,可以全方位的提升基本素质。虽然也有相关源码,但是只看源码容易眼高手低,动手写一个才是自己真正掌握这门技术的最优路径。
自己动手从0开始实现一个分布式RPC框架
|
7月前
|
消息中间件 缓存 Kafka
探究Kafka原理-3.生产者消费者API原理解析(下)
探究Kafka原理-3.生产者消费者API原理解析
177 0
|
7月前
|
前端开发 网络协议 Java
Netty | 工作流程图分析 & 核心组件说明 & 代码案例实践
Netty | 工作流程图分析 & 核心组件说明 & 代码案例实践
373 0
|
存储 缓存 分布式计算
20 MAPREDUCE框架结构及核心运行机制
20 MAPREDUCE框架结构及核心运行机制
59 0
|
负载均衡 Dubbo Java
RPC框架:从原理到选型,一文带你搞懂RPC(三)
RPC系列的文章是我去年写的,当时写的比较散,现在重新进行整理。对于想学习RPC框架的同学,通过这篇文章,让你知其然并知其所以然,便于以后技术选型
471 0
RPC框架:从原理到选型,一文带你搞懂RPC(三)
|
Dubbo 网络协议 JavaScript
RPC框架:从原理到选型,一文带你搞懂RPC(一)
RPC系列的文章是我去年写的,当时写的比较散,现在重新进行整理。对于想学习RPC框架的同学,通过这篇文章,让你知其然并知其所以然,便于以后技术选型
1466 0
RPC框架:从原理到选型,一文带你搞懂RPC(一)
|
XML JSON 运维
RPC框架:从原理到选型,一文带你搞懂RPC(二)
RPC系列的文章是我去年写的,当时写的比较散,现在重新进行整理。对于想学习RPC框架的同学,通过这篇文章,让你知其然并知其所以然,便于以后技术选型
493 0
RPC框架:从原理到选型,一文带你搞懂RPC(二)
|
TensorFlow 算法框架/工具 图计算
如何实现一个图化框架?代码已开源!
大家好,我是不会写代码的纯序员——Chunel Feng[3]。俗话说,人生如码,码如人生。人生中,有些事情是可以同时进行的,有些事情又必须是前后依次进行的;有些事情是可以刚开始就做的,有些事情又必须等待某个时机成熟了才可以开始。
381 0
如何实现一个图化框架?代码已开源!
|
设计模式
akka设计模式系列-基础模式
  本文介绍akka的基本使用方法,由于属于基础功能,想不出一个很高大上的名称,此处就以基础模式命名。下文会介绍actor的使用方法,及其优劣点。 class SimpleActor(name:String) extends Actor { private def doWork(message...
2320 0
|
设计模式 Java 数据处理
反应式编程 RxJava 设计原理解析
本篇文章主要聚焦对RxJava中几种主要的设计模式的理解,通过梳理Observable的相关类图以及讲解这些类之间的关系,让大家能够更清晰的理解RxJava中事件驱动的工作原理。
1097 0