开发者社区> shiyanjuncn> 正文
阿里云
为了无法计算的价值
打开APP
阿里云APP内打开

Akka入门编程实践

简介:
+关注继续查看

Akka是使用Scala语言开发一个编程库,基于事件驱动的架构实现异步处理,它能够简化编写分布式应用程序。Akka中最核心的概念是Actor模型,它为编写分布式/并行计算应用程序提供了高层次抽象,在实际编程实践中,开发人员可以从对复杂网络通信细节的处理、多线程应用场景下对锁的管理中解脱出来。
Akka能够给应用程序带来的几个重要的特性是:

  • 容错性
  • 可伸缩性
  • 异步性
  • 事件驱动架构(EDA)
  • 远程透明性

Actor是Akka中最核心的组件,以至于我们在编写基于Akka的应用程序时,大部分时间都会和Actor打交道,那么Actor到底是怎样的一种抽象呢?一个Actor对象封装了状态和行为,但是它不和外界其它的Actor共享状态,如果一个Actor想要和另一个Actor交互,能且只能通过发送消息来达到信息交换的目的。可见,一个Actor能够很好地保护其内部状态的安全。

与本地Actor通信

下面,我们从最简单的Actor编程来体验Akka的功能。首先,先定义几种类型的消息,后面会基于这些消息来进行通信,代码如下所示:

01 package org.shirdrn.scala.akka
02
03 object Start extends Serializable
04 object Stop extends Serializable
05
06 trait Message {
07 val id: String
08 }
09
10 case class Shutdown(waitSecs: Int) extends Serializable
11 case class Heartbeat(id: String, magic:Int) extends Message with Serializable
12 case class Header(id: String, len: Int, encrypted: Boolean) extends Message withSerializable
13 case class Packet(id: String, seq: Long, content: String) extends Message withSerializable

要实现一个Actor,需要继承自特质akka.actor.Actor,然后需要实现Actor特质声明的receive方法即可。另外,可选地可以混入另一个特质akka.actor.ActorLogging,提供记录日志的功能。我们首先实现的是一个Actor对象,然后拿到该Actor的一个引用(ActorRef),通过发送消息来与其进行交互,实现的Actor类为LocalActor ,代码如下所示:

01 class LocalActor extends Actor with ActorLogging {
02
03 def receive = {
04 case Start => log.info("start")
05 case Stop => log.info("stop")
06 case Heartbeat(id, magic) => log.info("Heartbeat" + (id, magic))
07 case Header(id, len, encrypted) => log.info("Header" + (id, len, encrypted))
08 case Packet(id, seq, content) => log.info("Packet" + (id, seq, content))
09 case _ =>
10 }
11 }

然后,实现一个带有main方法的类来与上面的LocalActor对象:

01 object LocalClient extends App {
02 // Local actor
03 val system= ActorSystem("local-system") // 创建一个ActorSystem对象,用来管理Actor实例
04 println(system )
05 val localActorRef =system.actorOf(Props(new LocalServer()), name="local-actor") // 通过ActorSystem对象,获取到一个Actor的引用
06 println(localActorRef)
07
08 localActorRef ! Start // 向LocalActor发送Start消息
09 localActorRef ! Heartbeat("3099100", 0xabcd) // 向LocalActor发送Heartbeat消息
10
11 // 创建一个JSON类型的消息Packet
12 val content = new JSONObject()
13 content.put("name", "Stone")
14 content.put("empid", 51082001)
15 content.put("score", 89.36581)
16 localActorRef ! Packet("3000001", System.currentTimeMillis(), content.toString) // 向LocalActor发送Packet消息
17
18 localActorRef ! Stop // 停止LocalActor实例
19 system shutdown // 终止ActorSystem对象,释放资源
20 }

虽然,我们只实现了一个本地Actor,但是这也非常有用,比如,我们在同一个JVM中有多个模块之间需要通过消息通信,完全可以实现多个本地Actor,他们之间进行通信,完成复杂的处理逻辑。

与远程Actor通信

在分布式应用场景中,通常需要跨节点进行通信,或者说交换消息,那么在使用Akka实现的时候就被抽象为在不同节点之上的多个Actor之间的交互。因为Akka提供的高层次抽象,所以在使用Akka编写分布式应用程序的时候,和编写本地应用程序一样简单。下面,我们实现一个伪分布式应用程序,使Actor在不同的JVM之内进行通信,实现上和在不同的节点上是一样的。
我们使用配置文件application.conf来指定通信处理过程中相关Actor的配置,包括远程Actor的主机名(或IP地址)和端口,包括本地Actor的基本配置。然后,只需要将该文件放在CLASSPATH之下即可,Akka会使用typesafe提供的配置解析工具ConfigFactory类来进行处理,配置文件application.conf中配置内容如下所示:

01 MyRemoteServerSideActor {
02 akka {
03 actor {
04 provider = "akka.remote.RemoteActorRefProvider"
05 }
06 remote {
07 enabled-transports = ["akka.remote.netty.tcp"]
08 netty.tcp {
09 hostname = "127.0.0.1"
10 port = 2552
11 }
12 }
13 }
14 }
15
16 MyRemoteClientSideActor {
17 akka {
18 actor {
19 provider = "akka.remote.RemoteActorRefProvider"
20 }
21 }
22 }

上面,MyRemoteServerSideActor指定了远程Actor的配置内容,Actor的provider配置为akka.remote.RemoteActorRefProvider,TPC通信配置的主机名为127.0.0.1,端口为2552;MyRemoteClientSideActor指定了本地Actor的配置,Actor的provider配置为akka.remote.RemoteActorRefProvider,下面看看代码实现。

  • 远程Actor实现

实现远程Actor和实现一个本地Actor的方式是一样的,继承自特质Actor,并实现receive方法。我们实现的RemoteActor的代码如下所示:

01 class RemoteActor extends Actor with ActorLogging {
02
03 // 模拟处理结果状态,发送给消息的发送方
04 val SUCCESS = "SUCCESS"
05 val FAILURE = "FAILURE"
06
07 def receive = {
08 case Start => { // 处理Start消息
09 log.info("RECV event: " + Start)
10 }
11 case Stop => { // 处理Stop消息
12 log.info("RECV event: " + Stop)
13 }
14 case Shutdown(waitSecs) => { // 处理Shutdown消息
15 log.info("Wait to shutdown: waitSecs=" + waitSecs)
16 Thread.sleep(waitSecs)
17 log.info("Shutdown this system.")
18 context.system.shutdown // 停止当前ActorSystem系统
19 }
20 case Heartbeat(id, magic) => log.info("RECV heartbeat: " + (id, magic)) // 处理Heartbeat消息
21 case Header(id, len, encrypted) => log.info("RECV header: " + (id, len, encrypted)) // 处理Header消息
22 case Packet(id, seq, content) => { // 处理Packet消息
23 val originalSender = sender // 获取到当前发送方的Actor引用
24 log.info("RECV packet: " + (id, seq, content))
25 originalSender ! (seq, SUCCESS) // 响应给发送方消息处理结果,类似发送一个ACK
26 }
27 case _ =>
28 }
29 }

上面的Actor实现了接收多种类型的消息:Start、Stop、Shutdown、Heartbeat、Header、Packet,其中一个Shutdown消息是可以将当前远程ActorSystem系统终止的,终止后就无法再处理任何请求,而Packet消息则会给发送方一个返回,告知处理结果。
一个Actor可以在自己内部终止自己,需要通过执行context.system.shutdown就可以实现。
启动我们实现的远程Actor系统,等待接收并处理消息,如下所示:

1 object AkkaServerApplication extends App {
2
3 val system = ActorSystem("remote-system", ConfigFactory.load().getConfig("MyRemoteServerSideActor")) // 创建名称为remote-system的ActorSystem:从配置文件application.conf中获取该Actor的配置内容
4 val log = system.log
5 log.info("Remote server actor started: " + system)
6
7 system.actorOf(Props[RemoteActor], "remoteActor") // 创建一个名称为remoteActor的Actor,返回一个ActorRef,这里我们不需要使用这个返回值
8
9 }

这里是程序的主入口,启动改程序可以看到控制台输出如下内容:

1 [INFO] [08/14/2015 11:52:45.747] [main] [Remoting] Starting remoting
2 [INFO] [08/14/2015 11:52:46.230] [main] [Remoting] Remoting started; listening on addresses :[akka.tcp://remote-system@127.0.0.1:2552]
3 [INFO] [08/14/2015 11:52:46.232] [main] [Remoting] Remoting now listens on addresses: [akka.tcp://remote-system@127.0.0.1:2552]
4 [INFO] [08/14/2015 11:52:46.239] [main] [ActorSystem(remote-system)] Remote server actor started: akka://remote-system

可以看出,这与我们在配置文件,以及在代码中配置的内容相一致:ActorSystem系统名称为remote-system,通信端口为127.0.0.1:2552。

  • 客户端Actor实现

我们再看本地将要与远程Actor通信的客户端Actor的实现,如下所示:

01 class ClientActor extends Actor with ActorLogging {
02
03 // akka.<protocol>://<actor system>@<hostname>:<port>/<actor path>
04 val path = "akka.tcp://remote-system@127.0.0.1:2552/user/remoteActor" // 远程Actor的路径,通过该路径能够获取到远程Actor的一个引用
05 val remoteServerRef = context.actorSelection(path) // 获取到远程Actor的一个引用,通过该引用可以向远程Actor发送消息
06
07 @volatile var connected = false
08 @volatile var stop = false
09
10 def receive = {
11 case Start => { // 发送Start消息表示要与远程Actor进行后续业务逻辑处理的通信,可以指示远程Actor初始化一些满足业务处理的操作或数据
12 send(Start)
13 if(!connected) {
14 connected = true
15 log.info("Actor connected: " + this)
16 }
17 }
18 case Stop => {
19 send(Stop)
20 stop = true
21 connected = false
22 }
23 case header: Header => send(header)
24 case hb: Heartbeat => sendWithCheck(hb)
25 case pkt: Packet => sendWithCheck(pkt)
26 case cmd: Shutdown => send(cmd)
27
28 case (seq, result) => log.info("RESULT: seq=" + seq + ", result=" + result) // 用于接收远程Actor处理一个Packet消息的结果
29 case m => log.info("Unknown message: " + m)
30 }
31
32 private def sendWithCheck(cmd: Serializable): Unit = {
33 while(!connected) {
34 Thread.sleep(100)
35 log.info("Wait to be connected...")
36 }
37 if(!stop) {
38 send(cmd)
39 } else {
40 log.warning("Actor has stopped!")
41 }
42 }
43
44 private def send(cmd: Serializable): Unit = {
45 log.info("Send command to server: " + cmd)
46 try {
47 remoteServerRef ! cmd // 发送一个消息到远程Actor,消息必须是可序列化的,因为消息对象要经过网络传输
48 } catch {
49 case e: Exception => {
50 connected = false
51 log.info("Try to connect by sending Start command...")
52 send(Start)
53 }
54 }
55 }
56
57 }

本地Actor会接收处理本地(当前JVM中)发送过来的消息,一个简单的check,然后进行转发,发送到远程Actor;也用来接收来自远程Actor响应的处理结果。接收并转发本地消息,包括如下类型消息:Start、Stop、Shutdown、Header、Heartbeat、Packet。其中,我们会在本地客户端创建一个单独的线程去周期性地发送心跳消息Heartbeat到远程Actor,同时将大量的Packet消息发送到远程Actor去处理。接收到的远程Actor响应的消息是一个Tuple类型,可以提取出seq和result数据,查看某个消息处理结果。下面是本地客户端的实现逻辑,如下所示:

01 object AkkaClientApplication extends App {
02
03 val system = ActorSystem("client-system", ConfigFactory.load().getConfig("MyRemoteClientSideActor")) // 通过配置文件application.conf配置创建ActorSystem系统
04 val log = system.log
05 val clientActor = system.actorOf(Props[ClientActor], "clientActor") // 获取到ClientActor的一个引用
06 @volatile var running = true
07 val hbInterval = 1000
08
09 lazy val hbWorker = createHBWorker
10
11 /**
12 * create heartbeat worker thread
13 */
14 def createHBWorker: Thread = { // 心跳发送线程
15 new Thread("HB-WORKER") {
16 override def run(): Unit = {
17 while(running) {
18 clientActor ! Heartbeat("HB", 39264)
19 Thread.sleep(hbInterval)
20 }
21 }
22 }
23 }
24
25 def format(timestamp: Long, format: String): String = {
26 val df = new SimpleDateFormat(format)
27 df.format(new Date(timestamp))
28 }
29
30 def createPacket(packet: Map[String, _]): JSONObject = {
31 val pkt = new JSONObject()
32 packet.foreach(p => pkt.put(p._1, p._2))
33 pkt
34 }
35
36 val ID = new AtomicLong(90760000)
37 def nextTxID: Long = {
38 ID.incrementAndGet()
39 }
40
41 clientActor ! Start // 发送一个Start消息,第一次与远程Actor握手(通过本地ClientActor进行转发)
42 Thread.sleep(2000)
43
44 clientActor ! Header("HEADER", 20, encrypted=false) // 发送一个Header消息到远程Actor(通过本地ClientActor进行转发)
45 Thread.sleep(2000)
46
47 hbWorker.start // 启动心跳线程
48
49 // send some packets
50 val DT_FORMAT = "yyyy-MM-dd HH:mm:ss.SSS"
51 val r = Random
52 val packetCount = 100
53 val serviceProviders = Seq("CMCC", "AKBBC", "OLE")
54 val payServiceProvicers = Seq("PayPal", "CMB", "ICBC", "ZMB", "XXB")
55
56 def nextProvider(seq: Seq[String]): String = {
57 seq(r.nextInt(seq.size))
58 }
59
60 val startWhen = System.currentTimeMillis()
61 for(i <- 0 until packetCount) { // 持续发送packetCount个Packet消息
62 val pkt = createPacket(Map[String, Any](
63 "txid" -> nextTxID,
64 "pvid" -> nextProvider(serviceProviders),
65 "txtm" -> format(System.currentTimeMillis(), DT_FORMAT),
66 "payp" -> nextProvider(payServiceProvicers),
67 "amount" -> 1000 * r.nextFloat()))
68 clientActor ! Packet("PKT", System.currentTimeMillis, pkt.toString)
69 }
70 val finishWhen = System.currentTimeMillis()
71 log.info("FINISH: timeTaken=" + (finishWhen - startWhen) + ", avg=" + packetCount/(finishWhen - startWhen))
72
73 Thread.sleep(2000)
74
75 // ask remote actor to shutdown
76 val waitSecs = hbInterval
77 clientActor ! Shutdown(waitSecs) // 发送Packet消息完成,通知远程Actor终止服务
78
79 running = false
80 while(hbWorker.isAlive) { // 终止心跳线程
81 log.info("Wait heartbeat worker to exit...")
82 Thread.sleep(300)
83 }
84 system.shutdown // 终止本地ActorSystem系统
85 }

上面代码中有详细注释,可以了解具体实现。

使用Akka Future实例

前面的两种情况,我们模拟了Actor如果在本地/远程的上下文中进行通信处理,Akka很好地屏蔽了底层网络通信细节。接下来我们看看看Akka的Future功能,尤其是Future所支持异步Callback特性。
我们基于Akka实现的例子,如下图所示:

eaf2f9d1e55cd2ed7781f74fd70dbc66c28254c1

上图模拟了一个简易的有趣的爬虫系统,而且在这上面为了演示Akka的使用,我们在各个Actor之间增加了好多消息通信,可以根据上图中箭线上的编号来理解整个实例系统的执行流程。
存储网页链接,以及一个指定网页的出链接(Outlink)信息,我们使用MySQL数据库,创建了2个数据表,数据库及其表定义如下所示:

01 GRANT ALL ON *.* TO 'web'@'%' IDENTIFIED BY 'web';
02
03 CREATE DATABASE `page_db` DEFAULT CHARACTER SET utf8;
04 -- 用来存储一个链接,以及该链接对应的页面的相关信息
05 CREATE TABLE `web_link` (
06 `link` CHAR(128) NOT NULL UNIQUE,
07 `domain` VARCHAR(64) NOT NULL,
08 `encoding` VARCHAR(11) NOT NULL DEFAULT 'utf-8',
09 `content_length` INT NOT NULL DEFAULT 0,
10 `create_time` VARCHAR(20) NOT NULL,
11 PRIMARY KEY (`link`)
12 ) ENGINE=InnoDB DEFAULT CHARSET=utf8;
13
14 -- 用来存储一个链接的outlink信息
15 CREATE TABLE `web_outlink` (
16 `link` CHAR(128) NOT NULL,
17 `outlink` VARCHAR(128) NOT NULL,
18 `create_time` VARCHAR(20) NOT NULL,
19 PRIMARY KEY (`link`, `outlink`)
20 ) ENGINE=InnoDB DEFAULT CHARSET=utf8;

另外,Akka中Actor之间通过发送消息进行通信,所以我们首先定义几个case class,如下所示:

1 case class WebUrl(link: String)
2 case class ScheduledWebUrl(link: String, config: Map[String, Any])
3 case class CrawledWeb(link: String, domain: String, encoding: String, contentLength:Int, outlinks: Set[String])
4 case class Stored(link: String, outlinkCount: Int)

还有,操作MySQL以及日期时间转换操作,我们实现了两个工具类,如下所示:

01 object MySQLUtils {
02
03 val driverClass = "com.mysql.jdbc.Driver"
04 val jdbcUrl = "jdbc:mysql://10.10.4.130:3306/page_db"
05 val user = "web"
06 val password = "web"
07
08 try {
09 Class.forName(driverClass)
10 } catch {
11 case e: ClassNotFoundException => throw e
12 case e: Exception => throw e
13 }
14
15 @throws(classOf[SQLException])
16 def getConnection: Connection = {
17 DriverManager.getConnection(jdbcUrl, user, password)
18 }
19
20 @throws(classOf[SQLException])
21 def doTrancation(transactions: Set[String]) : Unit = {
22 val connection = getConnection
23 connection.setAutoCommit(false)
24 transactions.foreach {
25 connection.createStatement.execute(_)
26 }
27 connection.commit
28 connection.close
29 }
30 }
31
32 object DatetimeUtils {
33
34 val DEFAULT_