Akka是使用Scala语言开发一个编程库,基于事件驱动的架构实现异步处理,它能够简化编写分布式应用程序。Akka中最核心的概念是Actor模型,它为编写分布式/并行计算应用程序提供了高层次抽象,在实际编程实践中,开发人员可以从对复杂网络通信细节的处理、多线程应用场景下对锁的管理中解脱出来。
Akka能够给应用程序带来的几个重要的特性是:
- 容错性
- 可伸缩性
- 异步性
- 事件驱动架构(EDA)
- 远程透明性
Actor是Akka中最核心的组件,以至于我们在编写基于Akka的应用程序时,大部分时间都会和Actor打交道,那么Actor到底是怎样的一种抽象呢?一个Actor对象封装了状态和行为,但是它不和外界其它的Actor共享状态,如果一个Actor想要和另一个Actor交互,能且只能通过发送消息来达到信息交换的目的。可见,一个Actor能够很好地保护其内部状态的安全。
与本地Actor通信
下面,我们从最简单的Actor编程来体验Akka的功能。首先,先定义几种类型的消息,后面会基于这些消息来进行通信,代码如下所示:
01 |
package org.shirdrn.scala.akka |
03 |
object Start extends Serializable |
04 |
object Stop extends Serializable |
10 |
case class Shutdown(waitSecs : Int) extends Serializable |
11 |
case class Heartbeat(id : String, magic : Int) extends Message with Serializable |
12 |
case class Header(id : String, len : Int, encrypted : Boolean) extends Message with Serializable |
13 |
case class Packet(id : String, seq : Long, content : String) extends Message with Serializable |
要实现一个Actor,需要继承自特质akka.actor.Actor,然后需要实现Actor特质声明的receive方法即可。另外,可选地可以混入另一个特质akka.actor.ActorLogging,提供记录日志的功能。我们首先实现的是一个Actor对象,然后拿到该Actor的一个引用(ActorRef),通过发送消息来与其进行交互,实现的Actor类为LocalActor ,代码如下所示:
01 |
class LocalActor extends Actor with ActorLogging { |
04 |
case Start = > log.info( "start" ) |
05 |
case Stop = > log.info( "stop" ) |
06 |
case Heartbeat(id, magic) = > log.info( "Heartbeat" + (id, magic)) |
07 |
case Header(id, len, encrypted) = > log.info( "Header" + (id, len, encrypted)) |
08 |
case Packet(id, seq, content) = > log.info( "Packet" + (id, seq, content)) |
然后,实现一个带有main方法的类来与上面的LocalActor对象:
01 |
object LocalClient extends App { |
03 |
val system = ActorSystem( "local-system" ) |
05 |
val localActorRef = system.actorOf(Props( new LocalServer()), name = "local-actor" ) |
06 |
println(localActorRef) |
09 |
localActorRef ! Heartbeat( "3099100" , 0xabcd ) |
12 |
val content = new JSONObject() |
13 |
content.put( "name" , "Stone" ) |
14 |
content.put( "empid" , 51082001 ) |
15 |
content.put( "score" , 89.36581 ) |
16 |
localActorRef ! Packet( "3000001" , System.currentTimeMillis(), content.toString) |
虽然,我们只实现了一个本地Actor,但是这也非常有用,比如,我们在同一个JVM中有多个模块之间需要通过消息通信,完全可以实现多个本地Actor,他们之间进行通信,完成复杂的处理逻辑。
与远程Actor通信
在分布式应用场景中,通常需要跨节点进行通信,或者说交换消息,那么在使用Akka实现的时候就被抽象为在不同节点之上的多个Actor之间的交互。因为Akka提供的高层次抽象,所以在使用Akka编写分布式应用程序的时候,和编写本地应用程序一样简单。下面,我们实现一个伪分布式应用程序,使Actor在不同的JVM之内进行通信,实现上和在不同的节点上是一样的。
我们使用配置文件application.conf来指定通信处理过程中相关Actor的配置,包括远程Actor的主机名(或IP地址)和端口,包括本地Actor的基本配置。然后,只需要将该文件放在CLASSPATH之下即可,Akka会使用typesafe提供的配置解析工具ConfigFactory类来进行处理,配置文件application.conf中配置内容如下所示:
01 |
MyRemoteServerSideActor { |
04 |
provider = "akka.remote.RemoteActorRefProvider" |
07 |
enabled-transports = ["akka.remote.netty.tcp"] |
09 |
hostname = "127.0.0.1" |
16 |
MyRemoteClientSideActor { |
19 |
provider = "akka.remote.RemoteActorRefProvider" |
上面,MyRemoteServerSideActor指定了远程Actor的配置内容,Actor的provider配置为akka.remote.RemoteActorRefProvider,TPC通信配置的主机名为127.0.0.1,端口为2552;MyRemoteClientSideActor指定了本地Actor的配置,Actor的provider配置为akka.remote.RemoteActorRefProvider,下面看看代码实现。
实现远程Actor和实现一个本地Actor的方式是一样的,继承自特质Actor,并实现receive方法。我们实现的RemoteActor的代码如下所示:
01 |
class RemoteActor extends Actor with ActorLogging { |
04 |
val SUCCESS = "SUCCESS" |
05 |
val FAILURE = "FAILURE" |
09 |
log.info( "RECV event: " + Start) |
12 |
log.info( "RECV event: " + Stop) |
14 |
case Shutdown(waitSecs) = > { |
15 |
log.info( "Wait to shutdown: waitSecs=" + waitSecs) |
16 |
Thread.sleep(waitSecs) |
17 |
log.info( "Shutdown this system." ) |
18 |
context.system.shutdown |
20 |
case Heartbeat(id, magic) = > log.info( "RECV heartbeat: " + (id, magic)) |
21 |
case Header(id, len, encrypted) = > log.info( "RECV header: " + (id, len, encrypted)) |
22 |
case Packet(id, seq, content) = > { |
23 |
val originalSender = sender |
24 |
log.info( "RECV packet: " + (id, seq, content)) |
25 |
originalSender ! (seq, SUCCESS) |
上面的Actor实现了接收多种类型的消息:Start、Stop、Shutdown、Heartbeat、Header、Packet,其中一个Shutdown消息是可以将当前远程ActorSystem系统终止的,终止后就无法再处理任何请求,而Packet消息则会给发送方一个返回,告知处理结果。
一个Actor可以在自己内部终止自己,需要通过执行context.system.shutdown就可以实现。
启动我们实现的远程Actor系统,等待接收并处理消息,如下所示:
1 |
object AkkaServerApplication extends App { |
3 |
val system = ActorSystem( "remote-system" , ConfigFactory.load().getConfig( "MyRemoteServerSideActor" )) |
5 |
log.info( "Remote server actor started: " + system) |
7 |
system.actorOf(Props[RemoteActor], "remoteActor" ) |
这里是程序的主入口,启动改程序可以看到控制台输出如下内容:
1 |
[INFO] [08/14/2015 11:52:45.747] [main] [Remoting] Starting remoting |
4 |
[INFO] [08/14/2015 11:52:46.239] [main] [ActorSystem(remote-system)] Remote server actor started: akka://remote-system |
可以看出,这与我们在配置文件,以及在代码中配置的内容相一致:ActorSystem系统名称为remote-system,通信端口为127.0.0.1:2552。
我们再看本地将要与远程Actor通信的客户端Actor的实现,如下所示:
01 |
class ClientActor extends Actor with ActorLogging { |
05 |
val remoteServerRef = context.actorSelection(path) |
07 |
@ volatile var connected = false |
08 |
@ volatile var stop = false |
15 |
log.info( "Actor connected: " + this ) |
23 |
case header : Header = > send(header) |
24 |
case hb : Heartbeat = > sendWithCheck(hb) |
25 |
case pkt : Packet = > sendWithCheck(pkt) |
26 |
case cmd : Shutdown = > send(cmd) |
28 |
case (seq, result) = > log.info( "RESULT: seq=" + seq + ", result=" + result) |
29 |
case m = > log.info( "Unknown message: " + m) |
32 |
private def sendWithCheck(cmd : Serializable) : Unit = { |
35 |
log.info( "Wait to be connected..." ) |
40 |
log.warning( "Actor has stopped!" ) |
44 |
private def send(cmd : Serializable) : Unit = { |
45 |
log.info( "Send command to server: " + cmd) |
49 |
case e : Exception = > { |
51 |
log.info( "Try to connect by sending Start command..." ) |
本地Actor会接收处理本地(当前JVM中)发送过来的消息,一个简单的check,然后进行转发,发送到远程Actor;也用来接收来自远程Actor响应的处理结果。接收并转发本地消息,包括如下类型消息:Start、Stop、Shutdown、Header、Heartbeat、Packet。其中,我们会在本地客户端创建一个单独的线程去周期性地发送心跳消息Heartbeat到远程Actor,同时将大量的Packet消息发送到远程Actor去处理。接收到的远程Actor响应的消息是一个Tuple类型,可以提取出seq和result数据,查看某个消息处理结果。下面是本地客户端的实现逻辑,如下所示:
01 |
object AkkaClientApplication extends App { |
03 |
val system = ActorSystem( "client-system" , ConfigFactory.load().getConfig( "MyRemoteClientSideActor" )) |
05 |
val clientActor = system.actorOf(Props[ClientActor], "clientActor" ) |
06 |
@ volatile var running = true |
09 |
lazy val hbWorker = createHBWorker |
14 |
def createHBWorker : Thread = { |
15 |
new Thread( "HB-WORKER" ) { |
16 |
override def run() : Unit = { |
18 |
clientActor ! Heartbeat( "HB" , 39264 ) |
19 |
Thread.sleep(hbInterval) |
25 |
def format(timestamp : Long, format : String) : String = { |
26 |
val df = new SimpleDateFormat(format) |
27 |
df.format( new Date(timestamp)) |
30 |
def createPacket(packet : Map[String, _ ]) : JSONObject = { |
31 |
val pkt = new JSONObject() |
32 |
packet.foreach(p = > pkt.put(p. _ 1 , p. _ 2 )) |
36 |
val ID = new AtomicLong( 90760000 ) |
37 |
def nextTxID : Long = { |
44 |
clientActor ! Header( "HEADER" , 20 , encrypted = false ) |
50 |
val DT _ FORMAT = "yyyy-MM-dd HH:mm:ss.SSS" |
53 |
val serviceProviders = Seq( "CMCC" , "AKBBC" , "OLE" ) |
54 |
val payServiceProvicers = Seq( "PayPal" , "CMB" , "ICBC" , "ZMB" , "XXB" ) |
56 |
def nextProvider(seq : Seq[String]) : String = { |
57 |
seq(r.nextInt(seq.size)) |
60 |
val startWhen = System.currentTimeMillis() |
61 |
for (i <- 0 until packetCount) { |
62 |
val pkt = createPacket(Map[String, Any]( |
64 |
"pvid" -> nextProvider(serviceProviders), |
65 |
"txtm" -> format(System.currentTimeMillis(), DT _ FORMAT), |
66 |
"payp" -> nextProvider(payServiceProvicers), |
67 |
"amount" -> 1000 * r.nextFloat())) |
68 |
clientActor ! Packet( "PKT" , System.currentTimeMillis, pkt.toString) |
70 |
val finishWhen = System.currentTimeMillis() |
71 |
log.info( "FINISH: timeTaken=" + (finishWhen - startWhen) + ", avg=" + packetCount/(finishWhen - startWhen)) |
76 |
val waitSecs = hbInterval |
77 |
clientActor ! Shutdown(waitSecs) |
80 |
while (hbWorker.isAlive) { |
81 |
log.info( "Wait heartbeat worker to exit..." ) |
上面代码中有详细注释,可以了解具体实现。
使用Akka Future实例
前面的两种情况,我们模拟了Actor如果在本地/远程的上下文中进行通信处理,Akka很好地屏蔽了底层网络通信细节。接下来我们看看看Akka的Future功能,尤其是Future所支持异步Callback特性。
我们基于Akka实现的例子,如下图所示:
上图模拟了一个简易的有趣的爬虫系统,而且在这上面为了演示Akka的使用,我们在各个Actor之间增加了好多消息通信,可以根据上图中箭线上的编号来理解整个实例系统的执行流程。
存储网页链接,以及一个指定网页的出链接(Outlink)信息,我们使用MySQL数据库,创建了2个数据表,数据库及其表定义如下所示:
01 |
GRANT ALL ON *.* TO 'web' @ '%' IDENTIFIED BY 'web' ; |
03 |
CREATE DATABASE `page_db` DEFAULT CHARACTER SET utf8; |
05 |
CREATE TABLE `web_link` ( |
06 |
`link` CHAR (128) NOT NULL UNIQUE , |
07 |
`domain` VARCHAR (64) NOT NULL , |
08 |
`encoding` VARCHAR (11) NOT NULL DEFAULT 'utf-8' , |
09 |
`content_length` INT NOT NULL DEFAULT 0, |
10 |
`create_time` VARCHAR (20) NOT NULL , |
12 |
) ENGINE=InnoDB DEFAULT CHARSET=utf8; |
15 |
CREATE TABLE `web_outlink` ( |
16 |
`link` CHAR (128) NOT NULL , |
17 |
`outlink` VARCHAR (128) NOT NULL , |
18 |
`create_time` VARCHAR (20) NOT NULL , |
19 |
PRIMARY KEY (`link`, `outlink`) |
20 |
) ENGINE=InnoDB DEFAULT CHARSET=utf8; |
另外,Akka中Actor之间通过发送消息进行通信,所以我们首先定义几个case class,如下所示:
1 |
case class WebUrl(link : String) |
2 |
case class ScheduledWebUrl(link : String, config : Map[String, Any]) |
3 |
case class CrawledWeb(link : String, domain : String, encoding : String, contentLength : Int, outlinks : Set[String]) |
4 |
case class Stored(link : String, outlinkCount : Int) |
还有,操作MySQL以及日期时间转换操作,我们实现了两个工具类,如下所示:
03 |
val driverClass = "com.mysql.jdbc.Driver" |
09 |
Class.forName(driverClass) |
11 |
case e : ClassNotFoundException = > throw e |
12 |
case e : Exception = > throw e |
15 |
@ throws(classOf[SQLException]) |
16 |
def getConnection : Connection = { |
17 |
DriverManager.getConnection(jdbcUrl, user, password) |
20 |
@ throws(classOf[SQLException]) |
21 |
def doTrancation(transactions : Set[String]) : Unit = { |
22 |
val connection = getConnection |
23 |
connection.setAutoCommit( false ) |
24 |
transactions.foreach { |
25 |
connection.createStatement.execute( _ ) |
32 |
object DatetimeUtils { |
34 |
val DEFAULT _ DT _ FORMAT = "yyyy-MM-dd HH:mm:ss" |
36 |
def format(timestamp : Long, format : String) : String = { |
37 |
val df = new SimpleDateFormat(format) |
38 |
df.format( new Date(timestamp)) |
41 |
def format(timestamp : Long) : String = { |
42 |
val df = new SimpleDateFormat(DEFAULT _ DT _ FORMAT) |
43 |
df.format( new Date(timestamp)) |
下面详细上图中各个组件的实现:
AkkaCrawlApp是程序入口,读取Seed链接信息,然后发送给CrawlActor,使整个系统开始运行起来。AkkaCrawlApp代码实现如下所示:
03 |
def main(args : Array[String]) { |
04 |
val system = ActorSystem( "crawler-system" ) |
05 |
system.log.info(system.toString) |
07 |
val scheduleActorRef = system.actorOf(Props[ScheduleActor], name = "schedule-actor" ) |
08 |
val storeActorRef = system.actorOf(Props[PageStoreActor], name = "store-actor" ) |
10 |
val crawlActorRef = system.actorOf(Props[CrawlActor], name = "crawl-actor" ) |
24 |
val seeds : Seq[String] = links.split( "\\s+" ).toSeq |
25 |
ScheduleActor.sendFeeds(crawlActorRef, seeds) |
我们在这里创建了一个ActorSystem实例,然后该系统有3个Actor实例:ScheduleActor 、PageStoreActor、CrawlActor。如果是在该ActorSystem上下文中,可以通过Actor的名称检索到该Actor的引用ActorRef实例,然后通过向该引用发送消息来进行通信。
ScheduleActor伴生对象类中,通过传入一个CrawlActor的引用,将传入的Seed链接发送给该系统中的CrawlActor。ScheduleActor伴生对象类代码如下所示:
3 |
def sendFeeds(crawlerActorRef : ActorRef, seeds : Seq[String]) : Unit = { |
4 |
seeds.foreach(crawlerActorRef ! _ ) |
上面代码比较简单,遍历seed链接集合,向CrawlActor发送传入的每一个Seed链接。
CrawlActor是一个Actor,它用来处理链接信息,同时也负责下载网页数据并抽取出链接(Outlink),代码实现如下所示:
01 |
class CrawlActor extends Actor with ActorLogging { |
04 |
private val scheduleActor = context.actorOf(Props[ScheduleActor], "schedule_actor" ) |
05 |
private val storeActor = context.actorOf(Props[PageStoreActor], "store_actor" ) |
06 |
private val q = new LinkedBlockingQueue[String]() |
07 |
implicit val ec : ExecutionContext = ExecutionContext.fromExecutor( new ForkJoinPool()) |
10 |
case link : String = > { |
11 |
if (link ! = null && link.startsWith( "http://" )) { // 简单验证链接合法性 |
12 |
log.info( "Checked: " + link) |
13 |
scheduleActor ! WebUrl(link) |
16 |
case ScheduledWebUrl(link, _ ) = > { |
17 |
var crawledWeb : CrawledWeb = null |
18 |
val crawlFuture = Future { |
20 |
var encoding = "utf-8" |
21 |
var outlinks : Set[String] = Set[String]() |
23 |
val domain = u.getHost |
24 |
val uc = u.openConnection().asInstanceOf[HttpURLConnection] |
25 |
uc.setConnectTimeout( 5000 ) |
27 |
if (uc.getResponseCode == 200 ) { |
29 |
if (uc.getContentEncoding ! = null ) { |
30 |
encoding = uc.getContentEncoding |
33 |
if (uc.getContentLength > 0 ) { |
34 |
val in = uc.getInputStream |
35 |
val buffer = Array.fill[Byte]( 512 )( 0 ) |
36 |
val baos = new ByteArrayOutputStream |
37 |
var bytesRead = in.read(buffer) |
38 |
while (bytesRead > - 1 ) { |
39 |
baos.write(buffer, 0 , bytesRead) |
40 |
bytesRead = in.read(buffer) |
42 |
outlinks = extractOutlinks(link, baos.toString(encoding)) |
45 |
log.info( "Page: link=" + link + ", encoding=" + encoding + ", outlinks=" + outlinks) |
46 |
CrawledWeb(link, domain, encoding, uc.getContentLength, outlinks) |
49 |
case e : Throwable = > { |
50 |
log.error( "Crawl error: " + e.toString) |
57 |
crawlFuture.onSuccess { |
58 |
case crawledWeb : CrawledWeb = > { |
59 |
log.info( "Succeed to crawl: link=" + link + ", crawledWeb=" + crawledWeb) |
60 |
if (crawledWeb ! = null ) { |
61 |
storeActor ! crawledWeb |
62 |
log.info( "Sent crawled data to store actor." ) |
67 |
crawlFuture.onFailure { |
68 |
case exception : Throwable = > log.error( "Fail to crawl: " + exception.toString) |
71 |
case Stored(link, count) = > { |
73 |
scheduleActor ! (link, count) |
77 |
def extractOutlinks(parentUrl : String, content : String) : Set[String] = { |
78 |
val outlinks = "href\\s*=\\s*\"([^\"]+)\"" .r.findAllMatchIn(content).map { m = > |
80 |
if (!url.startsWith( "http" )) { |
81 |
url = new URL( new URL(parentUrl), url).toExternalForm |
87 |
outlinks.filter( url = > !url.isEmpty && (url.endsWith( "html" ) || url.endsWith( "htm" ))) |
这里需要说明的是,在使用Future的回调功能时,有两种方式:
一种是使用onSuccess和onFailure方法,这就要在Future执行代码段中设置好处理逻辑执行成功时,返回值是什么类型,可能有多重成功的情况,如下载页面成功、由于没使用代理无法下载也算成功结束,然后在onSuccess方法中通过case来分别处理;如果Future中代码处理失败,则在onFailure方法中处理异常,例如如果是因为没使用代理无法访问页面,可以在这里选择一个代理地址,然后重新发回给ScheduleActor进行调度下载。
另一种是使用onComplete方法来处理回调,这里面能够同时处理Future代码执行成功或者失败的情况,根据自己的习惯选择使用。
上面代码中,我们使用了下面一个implicit定义:
1 |
implicit val ec : ExecutionContext = ExecutionContext.fromExecutor( new ForkJoinPool()) |
这里指定了一个隐式ExecutionContext变量ec,它提供了一个线程池来执行Future中定义的任务,也可以看看Future的onSuccess方法,它是一个柯里化(Currying)函数,需要传入一个ExecutionContext,如下所示:
1 |
def onSuccess[U](pf : PartialFunction[T, U])( implicit executor : ExecutionContext) : Unit = onComplete { |
3 |
pf.applyOrElse[T, Any](v, Predef.conforms[T]) |
如果每次调用都显式传入一个ExecutionContext实例,代码看起来会非常丑陋,所以通过定义隐式ExecutionContext,使代码看起来更直观清晰。通过上面的onSuccess方法的定义,我们也能看到,它内部实际上调用了onComplete方法。
PageStoreActor主要用来将数据进行持久化,它会直接操作MySQL数据库,代码如下所示:
01 |
class PageStoreActor extends Actor with ActorLogging { |
03 |
implicit val ec : ExecutionContext = ExecutionContext.fromExecutor( new ForkJoinPool()) |
04 |
var crawlerRef = context.actorOf(Props[CrawlActor], name = "crawl-actor" ) |
07 |
case CrawledWeb(link, domain, encoding, contentLength, outlinks) = > { |
09 |
var sqls = Set[String]() |
11 |
val createTime = DatetimeUtils.format(System.currentTimeMillis) |
12 |
val sql = "INSERT INTO web_link VALUES ('" + link + "','" + domain + "','" + encoding + "'," + contentLength + ",'" + createTime + "')" |
13 |
log.info( "Link SQL: " + sql) |
15 |
var outlinksSql = "INSERT INTO web_outlink VALUES " |
16 |
outlinksSql + = outlinks.map( "('" + link + "','" + _ + "','" + createTime + "')" ).mkString( "," ) |
17 |
log.info( "Outlinks SQL: " + outlinksSql) |
21 |
MySQLUtils.doTrancation(sqls) |
24 |
case e : Throwable = > throw e |
30 |
case (link : String, outlinkCount : Int) = > { |
31 |
log.info( "SUCCESS: link=" + link + ", outlinkCount=" + outlinkCount) |
32 |
crawlerRef ! Stored(link, outlinkCount) |
36 |
case e : Throwable = > throw e |
上面代码比较容易理解。
ScheduleActor负责调度任务,将带有调度信息的链接发送给CrawlActor去下载链接对应的页面。同时,页面下载、保存完成以后,ScheduleActor会收集结果信息,保存在内部的一个ConcurrentHashMap中,实现代码如下所示:
01 |
class ScheduleActor extends Actor with ActorLogging { |
04 |
"domain.black.list" -> Seq( "google.com" , "facebook.com" , "twitter.com" ), |
05 |
"crawl.retry.times" -> 3 , |
06 |
"filter.page.url.suffixes" -> Seq( ".zip" , ".avi" , ".mkv" ) |
08 |
val counter = new ConcurrentHashMap[String, Int]() |
12 |
sender ! ScheduledWebUrl(url, config) |
14 |
case (link : String, count : Int) = > { |
15 |
counter.put(link, count) |
16 |
log.info( "Counter: " + counter.toString) |
上面的例子,我们是在单机上运行的,其实可以很容易将其扩展到多机环境。结合上面我们的Akka Remoting实践,可以将各个Actor分别运行在不同的进程之内(单机多进程,或跨机器多进程),通过配置文件的方式,单独搞一个object工具类用来解析配置文件,根据实际需要创建所需要通信的Actor(ActorRef或ActorSelection),其他其他代码的处理几乎不需要做改动,就能运行。
另外,也可以使用Akka提供的内置Routing策略,来实现消息的路由。