讨喜的隔离可变性(四)收发消息

简介:

我们可以向角色发送任何类型的消息——String、Integer、Long、Double、List、Map、元组(tuples)、Scala的case类…但其中有一点需要注意的是,上述所有类型的消息都必须是不可变的。在上述这些类型中,我对于元组有着特殊的偏好,这并非因为我听到别人把元组误读成“two-ples”时感到很有趣,而是由于元组是轻量的、不可变的并且是最容易创建的实例之一。例如,在Scala中,我们可以简单地用(number1,number2)来创建一个含有两个数字的元组。除了元组之外,Scala的case类也是用来定义消息的理想类型——因为case类是不可变的、可以进行模式匹配并且还很容易进行复制。在Java中,我们可以通过将消息定义为一个不可修改(unmodifiable)的Collection的方式来将多个对象塞到一个消息中。当我们向角色传递消息时,如果发送者和接收者都在同一个JVM里[1],则默认情况下我们传递的是消息的引用。需要注意的是,保证所传递消息的不可变性是程序员自己的责任,尤其是当所发送的消息是我们自定义的类时则更需要加倍小心。为了解决这个问题,我们可以让Akka替我们先将消息序列化,然后将序列化出来的拷贝而不是原对象的引用发送出去,这样就可以避免由于类定义不严谨所造成的问题。

与角色交互最简单的方式莫过于“发送并忘记(fire and forget)”,即先将消息发出去,然后不等响应继续做下面的事。这种做法从性能角度考虑也是最好的选择。其中,发送动作是由调用方角色/线程发起的一个非阻塞操作。我们可以使用sendOneWay()函数或Scala的!函数来发送一个单向消息。

除了“发送并忘记”交互模式之外,Akka还提供了双向消息交互模式,以应对我们在发出消息之后需要等待对端角色响应的情况。在这种模式下,调用线程将被阻塞,直至收到对方响应或达到超时时间为止。下面让我们一起来看看如何在Java和Scala中收发消息。

在Java中收发消息

我们可以通过sendRequestReply()函数来发送消息并等待接收方响应。如果接收方的响应未在(可配置的)超时时间内到达,则系统将抛出ActorTimeoutException异常。下面让我们通过一个示例来学习这种双向消息通信方式:


01 public class FortuneTeller extends UntypedActor {
02 public void onReceive(final Object name) {
03 getContext().replyUnsafe(String.format("%s you'll rock", name));
04 }
05 public static void main(final String[] args) {
06 final ActorRef fortuneTeller =
07 Actors.actorOf(FortuneTeller.class).start();
08 try {
09 final Object response = fortuneTeller.sendRequestReply("Joe");
10 System.out.println(response);
11 } catch(ActorTimeoutException ex) {
12 System.out.println("Never got a response before timeout");
13 } finally {
14 fortuneTeller.stop();
15 }
16 }
17 }

在上面的代码中,我们定义了一个名为FortuneTeller的角色,它对收到的消息都会直接进行回复。为了发送回复消息,我们需要先调用getContext()函数获取调用上下文,然后再调用其replyUnsafe()函数来发送消息内容。调用replyUnsafe()函数所执行的发送动作是非阻塞的,并且请注意,在发送响应消息的过程中我们没有调用任何与角色有关的代码。在main()函数中我们调用了sendRequestReply()函数,该函数会在内部创建一个Future类并等待对方响应或超时抛出异常。下面让我们通过运行上述代码来看看Joe的命运如何:

Joe you’ll rock

我们上面实现的这个FortuneTeller实际上还有个问题没解决,即该角色依赖于消息发送者的发送方式。当消息发送方调用sendRequestReply()函数时,该函数会创建一个内部的Future用于等待对方响应。而如果我们换用sendOneWay()来发送消息的话,则replyUnsafe()函数将会失败。为了避免这种情况的发生,我们需要在调用replyUnsafe()函数之前先检查一下是否能匹配到一个处于阻塞状态的发送者。我们可以通过从上下文中读取发送者引用的方式来进行这个检查,也可以通过replySafe()函数的返回值来进行判断。因为当能取到发送者的引用时该函数会返回true,反之则返回false。下面我们就着手对FortuneTeller进行修改,以使其可以处理发送者没有阻塞地等待响应消息的情况:


01 public class FortuneTeller extends UntypedActor {
02 public void onReceive(final Object name) {
03 if(getContext().replySafe(String.format("%s you'll rock", name)))
04 System.out.println("Message sent for " + name);
05 else
06 System.out.println("Sender not found for " + name);
07 }
08 public static void main(final String[] args) {
09 final ActorRef fortuneTeller =
10 Actors.actorOf(FortuneTeller.class).start();
11 try {
12 fortuneTeller.sendOneWay("Bill");
13 final Object response = fortuneTeller.sendRequestReply("Joe");
14 System.out.println(response);
15 } catch(ActorTimeoutException ex) {
16 System.out.println("Never got a response before timeout");
17 } finally {
18 fortuneTeller.stop();
19 }
20 }
21 }

如上所示,新版的FortuneTeller代码很优雅地处理了我们之前提到的那些问题,即使接收方没找到发送者也不会导致处理失败。


1 Sender not found for Bill
2 Message sent for Joe
3 Joe you'll rock

我们知道,sendRequestReply()函数是需要等待对方响应的阻塞式函数,而sendOneWay()函数则是单向且非阻塞的。而如果既想要接收响应又不想被阻塞,则可以使用更复杂一些的sendRequestReplyFuture()函数。该函数可以返回一个Future对象,而拿到Future对象之后我们就可以继续干其他的事,直到我们真正需要用到对方的响应时,再选择阻塞式地等待或通过之前拿到的那个Future对象来查询对方的响应是否已经可用。类似地,在角色这一侧我们可以从上下文引用中取到senderFuture,并通过它来与发送方进行通信。在后面的示例中,我们会看到上述这些函数的具体用法,这里就不再赘述了。

请务必谨慎使用sendRequestReply()和sendRequestReplyFuture()函数,因为这两个函数都是阻塞的,所以调用它们对程序的性能和可扩展性都会造成负面影响。

在Scala中收发消息

如果想要在Scala中与角色进行消息收发,我们需要有些心理准备,因为在Scala中我们所采用的方法将会与Java API有不小的差别:

在Scala中,我们可以直接使用self属性来访问actor。通过该属性,我们可以调用reply()函数或replySafe()函数,其中reply()就是replyUnsafe()在Scala侧的等价函数。

在Scala中,我们既可以调用sendRequestReply()函数,也可以调用更优雅的 !! 函数——当然美是人者见仁智者见智的。同样地,!!!也可以被用来替换sendRequestReplyFuture()函数。

在Scala中,sendRequestReply()函数不再返回一个Object,而是返回一个Scala的Option。当接收方的响应抵达时,这个Option将是一个Some[T]的实例。该实例中存有响应的具体内容,而在超时的情况下则响应内容为None。所以,与Java版本所不同的是,在Scala中调用sendRequestReply()函数在超时的时候不会抛异常。

下面让我们先用不安全的reply()函数实现Scala版的FortuneTeller:


01 class FortuneTeller extends Actor {
02 def receive = {
03 case name : String =>
04 self.reply(String.format("%s you'll rock", name))
05 }
06 }
07 176 • Chapter 8. Favoring Isolated Mutability
08 object FortuneTeller {
09 def main(args : Array[String]) : Unit = {
10 val fortuneTeller = Actor.actorOf[FortuneTeller].start()
11 val response = fortuneTeller !! "Joe"
12 response match {
13 case Some(responseMessage) => println(responseMessage)
14 case None => println("Never got a response before timeout")
15 }
16 fortuneTeller.stop()
17 }
18 }

在角色的实现代码中,我们可以看到与Java版本的两点不同:其一是这里中我们用self代替了getContext()函数,另一个则是用reply()代替了replyUnsafe()函数。在调用方这一侧,我们使用了!!,即java中的sendRequestReply()函数来给角色发送消息,并在所收到的响应内容上应用了模式匹配。如果发送方收到对端的响应,则第一个case语句将被执行,而如果响应超时则第二个case语句将被执行。不出所料,该示例代码的运行结果与Java版完全相同:


1 Joe you'll rock

除了我们之前所讨论过的那些变更之外,安全版reply()函数的使用方式与Java版本差别不大。在Scala中,我们使用的是reply_?()或replySafe()。


01 class FortuneTeller extends Actor {
02 def receive = {
03 case name : String =>
04 if(self.reply_?(String.format("%s you'll rock", name)))
05 println("Message sent for " + name)
06 else
07 println("Sender not found for " + name)
08 }
09 }
10 object FortuneTeller {
11 def main(args : Array[String]) : Unit = {
12 val fortuneTeller = Actor.actorOf[FortuneTeller].start()
13 fortuneTeller ! "Bill"
14 val response = fortuneTeller !! "Joe"
15 response match {
16 case Some(responseMessage) => println(responseMessage)
17 case None => println("Never got a response before timeout")
18 }
19 fortuneTeller.stop()
20 }
21 }

通过上述修改,即使是在发送者未知的情况下,新版本的FortureTeller也不会失败:


1 Sender not found for Bill
2 Message sent for Joe
3 Joe you'll rock

Akka有一点很方便的行为就是,当我们用Akka发送消息时,它会将发送者的引用透明地传递过去。于是我们就无需显式地将发送者作为消息的一部分传递出去,从而省去了很多繁冗的代码。

如果不习惯使用像!、!!、!!!以及reply_?这样的函数名,我们也可以分别用sendOneWay()、sendRequestReply()、sendRequestReplyFuture()以及replySafe()这些函数来替换使用。



[1] Akka同样支持远程角色,以便使我们可以在不同机器的离散进程之间发送消息。

目录
相关文章
uniapp组件库uview1的u-button的问题,u-button多次点击只触发事件一次
uniapp组件库uview1的u-button的问题,u-button多次点击只触发事件一次
625 0
|
存储 人工智能 架构师
ChatGPT 与软件架构 (4) - 架构师提示工程指南
ChatGPT 与软件架构 (4) - 架构师提示工程指南
444 0
|
23天前
|
并行计算 数据挖掘 5G
MATLAB R2024b 数据分析软件,安装详细步骤,附安装包
MATLAB R2024b 发布,聚焦性能提升与稳定性优化,支持GPU加速、5G/6G工具链及HDL代码生成,新增NPU硬件支持,配合深色界面与调试增强,助力高效科学计算与工程设计。
637 3
|
1月前
|
机器学习/深度学习 人工智能 自然语言处理
34_GPT系列:从1到5的架构升级_深度解析
大型语言模型(LLM)的发展历程中,OpenAI的GPT系列无疑扮演着至关重要的角色。自2018年GPT-1问世以来,每一代GPT模型都在架构设计、预训练策略和性能表现上实现了质的飞跃。本专题将深入剖析GPT系列从1.17亿参数到能够处理百万级token上下文的技术演进,特别关注2025年8月8日发布的GPT-5如何引领大模型技术迈向通用人工智能(AGI)的重要一步。
|
7月前
|
人工智能 算法
要创新,怎少得了智能体?新鲜出炉的TRIZ发明原则AI助手,你不来试试?
TRIZ发明原则AI助手是一款专注于技术创新领域的智能工具,由法思诺创新团队开发。它结合了TRIZ理论中的矛盾矩阵和发明原则,旨在帮助用户聚焦具体技术问题并快速生成大量有针对性的创意解决方案。相比通用AI,该助手在专业性、准确性和实用性上更具优势,尤其适合解决技术和产品研发中的复杂问题。通过将常规问题转化为TRIZ句型、匹配工程参数、查询矛盾矩阵及提供创意思路,助手实现了从问题定义到解决方案的全流程支持。尽管仍存在一些局限性(如偶尔输出不稳定或不够专业),但通过与研发人员的协作,其潜力可得到最大化发挥。目前,团队邀请用户试用并反馈,以持续优化这一创新工具。
352 0
|
Java 编译器 Go
终于弄懂Go语言变量逃逸分析 新手不能错过这篇指南
终于弄懂Go语言变量逃逸分析 新手不能错过这篇指南
417 0
|
监控 安全 Nacos
MSE-Nacos测评报告
个人测评
386 0
|
JavaScript 网络架构
Vue 动态添加路由及生成菜单
Vue 动态添加路由及生成菜单
434 0
|
算法 5G 芯片
Beam Failure Detection
正如上篇所述NR中所有的上下行信道的发送和接收都是基于波束。基站通过对信道质量的测量来动态选择UE和基站之间波束的方向和频率,进而完成通信。NR中无线链路检测可以分为两种,一种是4G中常见的radio link monitoring,失败后对应的就是radio link failure ,主要是RRC层控制触发;另一种就是这篇提及beam 相关的Beam Failure Detection(BFD),主要是MAC层控制触发。
|
缓存 监控 安全
我开源了团队内部基于SpringBoot Web快速开发的API脚手架v1.6.0更新
什么是 rest-api-spring-boot-starter rest-api-spring-boot-starter 适用于SpringBoot Web API 快速构建让开发人员快速构建统一规范的业务RestFull API 不在去关心一些繁琐。重复工作,而是把重点聚焦到业务。