Akka并发编程——第四节:Actor模型(三)

简介: 本将主要内容: 1. Actor引用、Actor路径1. Actor引用、Actor路径下图是Akka官方文档中给出的一张图该图清晰地说明了ActorPath,ActorRef,Actor及ActorSystem之间的关系,并说明了Actor整体的层次结构。前面我们提到,Akka应用程序会持有一个名称为user的Actor,该Actor被称为guardian

本将主要内容:
1. Actor引用、Actor路径

1. Actor引用、Actor路径

下图是Akka官方文档中给出的一张图

这里写图片描述

该图清晰地说明了ActorPath,ActorRef,Actor及ActorSystem之间的关系,并说明了Actor整体的层次结构。前面我们提到,Akka应用程序会持有一个名称为user的Actor,该Actor被称为guardian supervisor(守卫监督器),无论是ActorSystem创建的Actor还是通过ActorContext创建的Actor都为user的子类,它是最顶级的Actor。

(一)ActorRef

对于ActorRef,我们已经很熟悉了,通过调用ActorSystem.actorOf方法可以创建Actor,返回的便是ActorRef,例如代码

 //创建FirstActor对象
  val myactor = system.actorOf(Props[FirstActor], name = "firstActor")

返回的便是FirstActor的ActorRef对象,ActorRef最重要的作用便是向Actor发送消息,例如

//向myactor发送消息
myactor!"test"
myactor! 123

另外,还可以通过context隐式对象获取父Actor和子Actor的ActorRef,示例代码如下:

/*
*Actor API:成员变量self及sender()方法的使用
*/
object Example_07 extends App{
  import akka.actor.Actor
  import akka.actor.ActorSystem
  import akka.actor.Props

  class FirstActor extends Actor with ActorLogging{
    //通过context.actorOf方法创建Actor
    var child:ActorRef = _
    override def preStart(): Unit ={
      log.info("preStart() in FirstActor")
      //通过context上下文创建Actor
      child = context.actorOf(Props[MyActor], name = "myActor")
    }
    def receive = {
      //向MyActor发送消息
      case x => child ! x;log.info("received "+x)
    }
  }

  class MyActor extends Actor with ActorLogging{
    var parentActorRef:ActorRef=_
    override def preStart(): Unit ={
      //通过context.parent获取其父Actor的ActorRef
      parentActorRef=context.parent
    }
    def receive = {
      case "test" => log.info("received test");parentActorRef!"message from ParentActorRef"
      case _      => log.info("received unknown message");
    }

  }
  val system = ActorSystem("MyActorSystem")
  val systemLog=system.log

  //创建FirstActor对象
  val myactor = system.actorOf(Props[FirstActor], name = "firstActor")
  //获取ActorPath
  val myActorPath=system.child("firstActor")
  //通过system.actorSelection方法获取ActorRef
  val myActor1=system.actorSelection(myActorPath)
  systemLog.info("准备向myactor发送消息")
  //向myActor1发送消息
  myActor1!"test"
  myActor1! 123
  Thread.sleep(5000)
  //关闭ActorSystem,停止程序的运行
  system.shutdown()
}

代码运行结果

[INFO] [04/02/2016 20:28:08.941] [MyActorSystem-akka.actor.default-dispatcher-4] [akka://MyActorSystem/user/firstActor] preStart() in FirstActor
[INFO] [04/02/2016 20:28:08.942] [main] [ActorSystem(MyActorSystem)] 准备向myactor发送消息
[INFO] [04/02/2016 20:28:08.943] [MyActorSystem-akka.actor.default-dispatcher-3] [akka://MyActorSystem/user/firstActor] received test
[INFO] [04/02/2016 20:28:08.943] [MyActorSystem-akka.actor.default-dispatcher-3] [akka://MyActorSystem/user/firstActor] received 123
[INFO] [04/02/2016 20:28:08.943] [MyActorSystem-akka.actor.default-dispatcher-4] [akka://MyActorSystem/user/firstActor/myActor] received test
[INFO] [04/02/2016 20:28:08.943] [MyActorSystem-akka.actor.default-dispatcher-4] [akka://MyActorSystem/user/firstActor/myActor] received unknown message
[INFO] [04/02/2016 20:28:08.943] [MyActorSystem-akka.actor.default-dispatcher-3] [akka://MyActorSystem/user/firstActor] received message from ParentActorRef
[INFO] [04/02/2016 20:28:08.943] [MyActorSystem-akka.actor.default-dispatcher-4] [akka://MyActorSystem/user/firstActor/myActor] received unknown message

代码

 class MyActor extends Actor with ActorLogging{
    var parentActorRef:ActorRef=_
    override def preStart(): Unit ={
      //通过context.parent获取其父Actor的ActorRef
      parentActorRef=context.parent
    }
    def receive = {
      case "test" => log.info("received test");parentActorRef!"message from ParentActorRef"
      case _      => log.info("received unknown message");
    }

  }

中,使用

//通过context.parent获取其父Actor的ActorRef
      parentActorRef=context.parent

获取MyActor 的直接父Actor的ActorRef,在本例中为FirstActor,因为在FirstActor中,我们使用

 //通过context上下文创建Actor
      child = context.actorOf(Props[MyActor], name = "myActor")

创建了MyActor,FirstActor便自动成为MyActor的直接Supervisor。如此便可以通过代码

parentActorRef!"message from ParentActorRef"

发送消息。

另外,还可以通过

//创建FirstActor对象
  val myactor = system.actorOf(Props[FirstActor], name = "firstActor")
  //获取ActorPath
  val myActorPath=system.child("firstActor")
  //通过system.actorSelection方法获取ActorRef
  val myActor1=system.actorSelection(myActorPath)

system.actorSelection(myActorPath)方法获取相应的ActorRef。然后再使用获取到的ActorRef向Acto发送消息

 //向myActor1发送消息
  myActor1!"test"
  myActor1! 123

现在,让我们来总结一下获取ActorRef的方法:
(1)通过ActorSystem的actorOf方法,不过这种方式是通过创建Actor,然后返回其ActorRef
(2)通过context.actorOf方法,这种方式也是通过创建Actor,然后返回其ActorRef
(3)通过context.parent、context.self、context.children方法获取当前Actor的父Actor、当前Actor及子Actor的ActorRef,这种方式是获取已经创建好的Actor的ActorRef
(4)通过val myActor1=system.actorSelection(myActorPath)方法来获取ActorRef,这种方式也是获取已经创建好的Actor的ActorRef

(二)ActorPath

在前面的例子中,我们通过

 val myActorPath=system.child("firstActor")

已经使用到了ActorPath。在Akka中,ActorPath采用统一资源定位符的方式进行组织,例如

//本地ActorPath
"akka://my-sys/user/service-a/worker1" 
//远程ActorPath
"akka.tcp://my-sys@host.example.com:5678/user/service-b"

本地ActorPath是Akka并发编程中的ActorPath表示方式,而远程ActorPath是Akka分布式编程中常用的ActorPath表示方式,”akka.tcp://my-sys@host.example.com:5678/user/service-b”中的TCP表示使用的是TCP协议,也可以使用UPD协议,使用UDP协议的远程ActorPath表示方式有如下形式

"akka.udp://my-sys@host.example.com:5678/user/service-b"

ActorPath当中,akka.udp表示的是使用的协议,my-sys表示的是ActorSytem名称,host.example.com:5678为远程机器地址及端口,user为guardian supervisor,service-b为当前应用程序的顶级Actor(通过system.actorOf方法创建的)

/*
*ActorPath
*/
object Example_08 extends App{
  import akka.actor.Actor
  import akka.actor.ActorSystem
  import akka.actor.Props

  class FirstActor extends Actor with ActorLogging{
    //通过context.actorOf方法创建Actor
    var child:ActorRef = _
    override def preStart(): Unit ={
      log.info("preStart() in FirstActor")
      //通过context上下文创建Actor
      child = context.actorOf(Props[MyActor], name = "myActor")
    }
    def receive = {
      //向MyActor发送消息
      case x => child ! x;log.info("received "+x)
    }
  }

  class MyActor extends Actor with ActorLogging{
    def receive = {
      case "test" => log.info("received test");
      case _      => log.info("received unknown message");
    }

  }
  val system = ActorSystem("MyActorSystem")
  val systemLog=system.log

  //创建FirstActor对象
  val firstactor = system.actorOf(Props[FirstActor], name = "firstActor")

  //获取ActorPath
  val firstActorPath=system.child("firstActor")
  systemLog.info("firstActorPath--->{}",firstActorPath)


  //通过system.actorSelection方法获取ActorRef
  val myActor1=system.actorSelection(firstActorPath)

  //直接指定其路径
  val myActor2=system.actorSelection("/user/firstActor")
  //使用相对路径
  val myActor3=system.actorSelection("../firstActor")


  systemLog.info("准备向myactor发送消息")
  //向myActor1发送消息
  myActor2!"test"
  myActor2! 123
  Thread.sleep(5000)
  //关闭ActorSystem,停止程序的运行
  system.shutdown()
}

代码运行结果:

[INFO] [04/02/2016 21:04:59.612] [main] [ActorSystem(MyActorSystem)] firstActorPath--->akka://MyActorSystem/user/firstActor
[INFO] [04/02/2016 21:04:59.612] [MyActorSystem-akka.actor.default-dispatcher-2] [akka://MyActorSystem/user/firstActor] preStart() in FirstActor
[INFO] [04/02/2016 21:04:59.615] [main] [ActorSystem(MyActorSystem)] 准备向myactor发送消息
[INFO] [04/02/2016 21:04:59.616] [MyActorSystem-akka.actor.default-dispatcher-4] [akka://MyActorSystem/user/firstActor] received test
[INFO] [04/02/2016 21:04:59.616] [MyActorSystem-akka.actor.default-dispatcher-4] [akka://MyActorSystem/user/firstActor] received 123
[INFO] [04/02/2016 21:04:59.616] [MyActorSystem-akka.actor.default-dispatcher-3] [akka://MyActorSystem/user/firstActor/myActor] received test
[INFO] [04/02/2016 21:04:59.616] [MyActorSystem-akka.actor.default-dispatcher-3] [akka://MyActorSystem/user/firstActor/myActor] received unknown message

本例中的重点代码如下

//获取ActorPath
  val firstActorPath=system.child("firstActor")
  systemLog.info("myActorPath--->{}",firstActorPath)


  //通过system.actorSelection方法获取ActorRef
  val myActor1=system.actorSelection(firstActorPath)

  //直接指定其路径
  val myActor2=system.actorSelection("/user/firstActor")
  //使用相对路径
  val myActor3=system.actorSelection("../firstActor")

通过 val firstActorPath=system.child(“firstActor”)构造一个ActorPath,然后使用

val myActor1=system.actorSelection(firstActorPath)

获取路径下的ActorRef,除这种方式外,还可以通过绝对路径 val myActor2=system.actorSelection(“/user/firstActor”)及相对路径 val myActor3=system.actorSelection(“../firstActor”)的方式获取ActorRef
,需要注意的是绝对路径使用的是guardian supevisor,即/user/firstActor的这种方式。

如果要获取myActor,则基方法是类型的,例如

//获取ActorPath
  val myActorPath=system.child("firstActor").child("myActor")
  systemLog.info("firstActorPath--->{}",myActorPath)


  //通过system.actorSelection方法获取ActorRef
  val myActor1=system.actorSelection(myActorPath)

  //直接指定其路径
  val myActor2=system.actorSelection("/user/firstActor/myActor")
  //使用相对路径
  val myActor3=system.actorSelection("../firstActor/myActor")

完整代码如下:

/*
*ActorPath,获取myActor
*/
object Example_09 extends App{
  import akka.actor.Actor
  import akka.actor.ActorSystem
  import akka.actor.Props

  class FirstActor extends Actor with ActorLogging{
    //通过context.actorOf方法创建Actor
    var child:ActorRef = _
    override def preStart(): Unit ={
      log.info("preStart() in FirstActor")
      //通过context上下文创建Actor
      child = context.actorOf(Props[MyActor], name = "myActor")
    }
    def receive = {
      //向MyActor发送消息
      case x => child ! x;log.info("received "+x)
    }
  }

  class MyActor extends Actor with ActorLogging{
    def receive = {
      case "test" => log.info("received test");
      case _      => log.info("received unknown message");
    }

  }
  val system = ActorSystem("MyActorSystem")
  val systemLog=system.log

  //创建FirstActor对象
  val firstactor = system.actorOf(Props[FirstActor], name = "firstActor")

  //获取ActorPath
  val myActorPath=system.child("firstActor").child("myActor")
  systemLog.info("firstActorPath--->{}",myActorPath)


  //通过system.actorSelection方法获取ActorRef
  val myActor1=system.actorSelection(myActorPath)

  //直接指定其路径
  val myActor2=system.actorSelection("/user/firstActor/myActor")
  //使用相对路径
  val myActor3=system.actorSelection("../firstActor/myActor")


  systemLog.info("准备向myactor发送消息")
  //向myActor1发送消息
  myActor1!"test"
  myActor1! 123
  Thread.sleep(5000)
  //关闭ActorSystem,停止程序的运行
  system.shutdown()
}

代码运行结果:

[INFO] [04/02/2016 21:21:14.377] [main] [ActorSystem(MyActorSystem)] firstActorPath--->akka://MyActorSystem/user/firstActor/myActor
[INFO] [04/02/2016 21:21:14.377] [MyActorSystem-akka.actor.default-dispatcher-2] [akka://MyActorSystem/user/firstActor] preStart() in FirstActor
[INFO] [04/02/2016 21:21:14.381] [main] [ActorSystem(MyActorSystem)] 准备向myactor发送消息
[INFO] [04/02/2016 21:21:14.382] [MyActorSystem-akka.actor.default-dispatcher-4] [akka://MyActorSystem/user/firstActor/myActor] received test
[INFO] [04/02/2016 21:21:14.382] [MyActorSystem-akka.actor.default-dispatcher-4] [akka://MyActorSystem/user/firstActor/myActor] received unknown message

关于远程ActorRef的获取,我们将在后续章节中进行讲述。



Scala学习(公众微信号:ScalaLearning)每天为大家带来一点Scala语言、Spark、Kafka、Flink、AKKA等大数据技术干货及相关技术资讯。技术永无止境,勇攀高峰,一往直前!
觉得文章不错?扫描关注
这里写图片描述

目录
相关文章
|
10月前
|
存储 JavaScript 中间件
Vuex 中间件和 Pinia 中间件的性能有何区别?
Vuex 中间件和 Pinia 中间件的性能有何区别?
332 74
|
8月前
|
机器学习/深度学习 人工智能 机器人
Meta AI Research:虚拟/可穿戴/机器人三位一体的AI进化路径
本文阐述了我们对具身AI代理的研究——这些代理以视觉、虚拟或物理形式存在,使其能够与用户及环境互动。这些代理包括虚拟化身、可穿戴设备和机器人,旨在感知、学习并在其周围环境中采取行动。与非具身代理相比,这种特性使它们更接近人类的学习与环境交互方式。我们认为,世界模型的构建是具身AI代理推理与规划的核心,这使代理能够理解并预测环境、解析用户意图及社会背景,从而增强其自主完成复杂任务的能力。世界建模涵盖多模态感知的整合、通过推理进行行动规划与控制,以及记忆机制,以形成对物理世界的全面认知。除物理世界外,我们还提出需学习用户的心理世界模型,以优化人机协作。
669 3
|
关系型数据库 数据库连接 数据库
Python执行PG数据库查询语句:以Markdown格式打印查询结果
使用Python的`psycopg2`和`pandas`库与PostgreSQL交互,执行查询并以Markdown格式打印结果。首先确保安装所需库:`pip install psycopg2 pandas`。接着建立数据库连接,执行查询,将查询结果转换为DataFrame,再用`tabulate`库将DataFrame格式化为Markdown。代码示例包括连接函数、查询函数、转换和打印函数。最后限制列宽以适应输出。
|
12月前
|
弹性计算 运维 监控
操作系统控制台-健康守护我们的系统
阿里云操作系统控制平台作为新一代云端服务器中枢平台,通过创新交互模式重构主机管理体验。用户可通过API、SDK、CLI等方式进行系统管理,采用图形化控制替代传统命令行操作,集智能运维、集群协调、生态扩展于一体,显著提升企业级IT设施管理效能。通过此平台,用户可以轻松实现运维监控、智能助手、扩展插件管理及订阅服务等功能,大幅降低运维复杂度,提高管理效率。
305 11
|
传感器 小程序 开发工具
【规范】小程序发布,『小程序隐私保护指引』填写指南
本文详细解析了小程序隐私保护指引的填写方法,包括开发者处理的信息、第三方插件信息、用户权益等内容,并提供了详细的填写范例,帮助读者在发布小程序时避免常见问题,是一份实用的参考指南。
2559 1
【规范】小程序发布,『小程序隐私保护指引』填写指南
|
运维 监控 安全
linux日志分析与追踪
在Linux中,日志分析涉及检查 `/var/log` 下的不同文件,如`messages`、`auth.log`、`kern.log`等,以及Web服务器和数据库日志。使用`tail`、`grep`、`awk`等工具实时查看和搜索日志,`logrotate`管理日志大小,`journalctl`处理Systemd日志,而`Splunk`等工具则用于集中式分析。分析技巧包括异常检测、时间关联和阈值监控。安全事件追踪结合登录失败日志、网络嗅探和IDS/IPS。日志链路追踪在分布式系统中尤为重要,帮助定位服务调用问题。有效的日志管理和分析能增强系统安全和故障排除能力。
275 7
|
异构计算 机器学习/深度学习 算法
探索FPGA在硬件加速中的应用
【5月更文挑战第31天】本文探讨了FPGA在硬件加速中的应用,阐述了FPGA基于可编程逻辑单元和连接资源实现高效并行处理的优势,如高性能、低功耗、可重构性和灵活性。FPGA广泛用于图像处理、数据压缩、深度学习加速和网络安全等领域。然而,FPGA也面临功耗、散热及开发复杂度的挑战。未来,FPGA将通过优化设计和工具,与CPU、GPU等协同工作,助力异构计算和新兴技术发展。
|
机器学习/深度学习 并行计算 PyTorch
PyTorch与DistributedDataParallel:分布式训练入门指南
【8月更文第27天】随着深度学习模型变得越来越复杂,单一GPU已经无法满足训练大规模模型的需求。分布式训练成为了加速模型训练的关键技术之一。PyTorch 提供了多种工具来支持分布式训练,其中 DistributedDataParallel (DDP) 是一个非常受欢迎且易用的选择。本文将详细介绍如何使用 PyTorch 的 DDP 模块来进行分布式训练,并通过一个简单的示例来演示其使用方法。
2397 2
|
Java Apache
Java将word、excel文件转成pdf文件
【5月更文挑战第26天】Java将word、excel文件转成pdf文件
3448 1
|
消息中间件 存储 缓存
RabbitMQ与Kafka选型对比(一)
RabbitMQ与Kafka选型对比(一)
4997 0
RabbitMQ与Kafka选型对比(一)