akka actors默认邮箱介绍

简介: 1. UnboundedMailbox is the default unbounded MailboxType used by Akka Actors  ”无界邮箱“ 是akka actors默认使用的邮箱, UnboundedMailbox继承了MailboxType /** *...

 1. UnboundedMailbox is the default unbounded MailboxType used by Akka Actors

 ”无界邮箱“ 是akka actors默认使用的邮箱,

UnboundedMailbox继承了MailboxType

/**
 * MailboxType is a factory to create MessageQueues for an optionally
 * provided ActorContext.
 *
 * <b>Possibly Important Notice</b>
 *
 * When implementing a custom mailbox type, be aware that there is special
 * semantics attached to `system.actorOf()` in that sending to the returned
 * ActorRef may—for a short period of time—enqueue the messages first in a
 * dummy queue. Top-level actors are created in two steps, and only after the
 * guardian actor has performed that second step will all previously sent
 * messages be transferred from the dummy queue into the real mailbox.
 */
trait MailboxType {
  def create(owner: Option[ActorRef], system: Option[ActorSystem]): MessageQueue
}

 和ProducesMessageQueue

trait ProducesMessageQueue[T <: MessageQueue]

 MailboxType特质的create方法接受两个参数owner和system,owner表示“消息队列”的所有者,system表示 ???

UnboundedMailbox的定义如下:

/**
 * UnboundedMailbox is the default unbounded MailboxType used by Akka Actors.
 */
case class UnboundedMailbox() extends MailboxType with ProducesMessageQueue[UnboundedMailbox.MessageQueue] {

  // 使用this关键字来调用构造函数。
  // 它演示了如何从其他构造函数调用构造函数。必须确保this必须放在构造函数中的第一个语句,同时调用其他构造函数this(),否则编译器会抛出错误。
  // “=”等号右边调用的this()是指向的UnboundedMailBox()类
  def this(settings: ActorSystem.Settings, config: Config) = this()

  // 重写MailboxType的create()方法,并创建UnboundedMailbox.MessgaeQueue对象
  final override def create(owner: Option[ActorRef], system: Option[ActorSystem]): MessageQueue =
    new UnboundedMailbox.MessageQueue
}

 UnboundedMailbox中的create()方法中创建的UnboundedMailbox.MessgaeQueue对象是

object UnboundedMailbox {
  class MessageQueue extends ConcurrentLinkedQueue[Envelope] with UnboundedQueueBasedMessageQueue {
    final def queue: Queue[Envelope] = this
  }
}  

   MessageQueue继承了ConCurrentLinkedQueue,并且ConCurrentLinkedQueue是存放的Envelope

final case class Envelope private (val message: Any, val sender: ActorRef)

(信封)对象,信封对象中包含一个message消息和发送者sender的ActorRef。

此外,MessageQueue还with了一个UnboundedQueueBasedMessageQueue

/**
 * BoundedMessageQueueSemantics adds bounded semantics to a QueueBasedMessageQueue,
 * i.e. blocking enqueue with timeout.
 */
trait BoundedMessageQueueSemantics {
  def pushTimeOut: Duration
}

trait UnboundedQueueBasedMessageQueue extends QueueBasedMessageQueue with UnboundedMessageQueueSemantics {
  // 入队,这里是入队了一个信封,貌似receiver没有用到?不明白为什么要有这个receiver,可能是为了明显标记是发给那个Actor的吧
  def enqueue(receiver: ActorRef, handle: Envelope): Unit = queue add handle
  // 出队,出队就是从队列中取出一个元素(一封信)
  def dequeue(): Envelope = queue.poll()
}

  UnboundedQueueBasedMessageQueue中实现了从QueueBasedMessageQueue继承来的enqueue()和dequeue()方法。更确切的话是从MessageQueue继承来的,因为QueueBasedMessageQueue是继承了MessageQueue,代码如下:

/**
 * This is a marker trait for message queues which support multiple consumers,
 * as is required by the BalancingDispatcher.
 */
trait MultipleConsumerSemantics

/**
 * A QueueBasedMessageQueue is a MessageQueue backed by a java.util.Queue.
 */
trait QueueBasedMessageQueue extends MessageQueue with MultipleConsumerSemantics {
  def queue: Queue[Envelope]
  // 消息的数量 (队列的元素个数)
  def numberOfMessages = queue.size
  // 是否包含消息 (队列是否为空)
  def hasMessages = !queue.isEmpty
  // 清空消息 (清空队列)
  def cleanUp(owner: ActorRef, deadLetters: MessageQueue): Unit = {
    if (hasMessages) {
      // 获取第一个消息
      var envelope = dequeue
      while (envelope ne null) {
        // 如果消息不为空,则不断向deadLetters发送消息
        // deadLetters也是一个消息队列actor.dispatcher.mailboxes.deadLetterMailbox.messageQueue
        deadLetters.enqueue(owner, envelope)
        envelope = dequeue
      }
    }
  }
}

  从QueueBasedMessageQueue的定义来看,其中包含了几个重要的方法。还定义了一个queue,回顾一下UnboundedMailbox的定义

object UnboundedMailbox {
  class MessageQueue extends ConcurrentLinkedQueue[Envelope] with UnboundedQueueBasedMessageQueue {
    final def queue: Queue[Envelope] = this
  }
}  

   其中的queue是重写了QueueBasedMessageQueue中定义的queue,def queue: Queue[Envelope]。

 

3. 邮箱是akka里面是怎么使用的? 

如果我们是使用的ActorRef去创建一个actor,例如:

    val testActor: ActorRef = context.actorOf(
      TestActor.props().withDispatcher("test-actor-dispatcher"),
      TestActor.Name)

   

目录
相关文章
|
数据格式 JSON 存储
Lua模块的加载与内存释放
今天早上听说一件事情让我觉得很诡异的事情:公司线上的一款游戏,加载一份配置资源后,内存涨了几十M,然后内存再也下不来了。因为好奇,所以要来了最大的一个配置文件(4.5M,去除空格与换行后的大小),进行测试。
2285 0
|
SQL Oracle 关系型数据库
Oracle-index索引解读
Oracle-index索引解读
374 0
|
12月前
|
数据安全/隐私保护
vuex数据持久化、加密(vuex-persistedstate、secure-ls)
本文介绍了如何在Vuex中使用`vuex-persistedstate`和`secure-ls`库进行数据的持久化和加密,确保在Vite打包上线后,Vuex中的数据安全。
414 1
|
11月前
|
测试技术
产品测试
【10月更文挑战第10天】产品测试
306 2
|
11月前
|
存储 安全 Java
Java HashSet详解
`HashSet` 是 Java 中基于哈希表实现的 `Set` 接口集合,主要用于存储不重复元素,提供快速查找、插入和删除操作。它具有以下特点:不允许重复元素,元素无序,允许一个 `null` 元素,常用操作包括创建、添加、删除、检查元素及清空集合。由于其内部使用哈希表,基本操作的时间复杂度接近 O(1),性能高效。然而,`HashSet` 不保证元素顺序,也不是线程安全的,适用于需要快速访问和操作的场景。
401 12
|
12月前
|
存储 Shell 开发工具
8-8|windows上Git报错
8-8|windows上Git报错
|
SQL 数据挖掘 数据库
SQL计算班级语文平均分:详细步骤与技巧
在数据库管理中,经常需要统计和查询各种汇总信息,如班级某科目的平均分
|
存储 缓存 监控
X86架构服务器硬件设计
8月更文挑战第16天
474 0
|
SQL 监控 druid
MySQL线程池导致的延时卡顿排查
## 问题描述 简单小表的主键点查SQL,单条执行很快,但是放在业务端,有时快有时慢,取了一条慢sql,在MySQL侧查看,执行时间很短。 通过Tomcat业务端监控有显示慢SQL,取slow.log里显示有12秒执行时间的SQL,但是这次12秒的执行在MySQL上记录下来的执行时间都不到1ms。 所在节点的tsar监控没有异常,Tomcat manager监控上没有fgc,Tomcat实
2148 0
MySQL线程池导致的延时卡顿排查
|
编解码
Netty Review - 优化Netty通信:如何应对粘包和拆包挑战_自定义长度分包编解码码器
Netty Review - 优化Netty通信:如何应对粘包和拆包挑战_自定义长度分包编解码码器
236 0

热门文章

最新文章