Akka源码分析-Actor发消息(续)

简介:

上一篇博客我们分析道mailbox同时也是一个forkjointask,run方法中,调用了processMailbox处理一定数量的消息,然后最终调用dispatcher的registerForExecution重新进行线程调度,达到循环处理邮箱消息的功能。接下来我们分析一下processMailbox函数的功能。

/**
 * Process the messages in the mailbox
 */
@tailrec private final def processMailbox(
  left:       Int  = java.lang.Math.max(dispatcher.throughput, 1),
  deadlineNs: Long = if (dispatcher.isThroughputDeadlineTimeDefined == true) System.nanoTime + dispatcher.throughputDeadlineTime.toNanos else 0L): Unit =
  if (shouldProcessMessage) {
    val next = dequeue()
    if (next ne null) {
      if (Mailbox.debug) println(actor.self + " processing message " + next)
      actor invoke next
      if (Thread.interrupted())
        throw new InterruptedException("Interrupted while processing actor messages")
      processAllSystemMessages()
      if ((left > 1) && ((dispatcher.isThroughputDeadlineTimeDefined == false) || (System.nanoTime - deadlineNs) < 0))
        processMailbox(left - 1, deadlineNs)
    }
  }

  上面是processMailbox的代码,很明显这是一个尾递归,它一次性处理dispatcher.throughput个消息,并在dispatcher.throughputDeadlineTime时间内退出此次递归。源码也比较简单,大致的处理过程就是:队列取出消息,调用actor的invoke函数,处理所有系统消息,如果还有剩余消息或者未达到时间长度限制,则继续下一条消息处理。

  代码虽然简单,但有几个隐含的点,需要说明。这是一个尾递归,且在线程池分配的某个线程中执行,则可以确保,actor处理此次批量消息时,一定是在某一个线程中,不会出现并行的情况。且该函数处理完毕后,会再被线程池进行调度,则下一次的线程跟此次应该不同。另外处理完每一条消息后都会把系统消息处理完毕,也就是说系统消息的优先级非常高。

override final def run(): Unit = {
    try {
      if (!isClosed) { //Volatile read, needed here
        processAllSystemMessages() //First, deal with any system messages
        processMailbox() //Then deal with messages
      }
    } finally {
      setAsIdle() //Volatile write, needed here
      dispatcher.registerForExecution(this, false, false)
    }
  }

  我们再回到run方法,会发现,没有对异常进行处理。也就是说如果处理用户消息或者系统消息出现异常时,依然会进入下一次调度。我们接下来看一下ActorCell的invoke代码。

//Memory consistency is handled by the Mailbox (reading mailbox status then processing messages, then writing mailbox status
  final def invoke(messageHandle: Envelope): Unit = {
    val influenceReceiveTimeout = !messageHandle.message.isInstanceOf[NotInfluenceReceiveTimeout]
    try {
      currentMessage = messageHandle
      if (influenceReceiveTimeout)
        cancelReceiveTimeout()
      messageHandle.message match {
        case msg: AutoReceivedMessage ⇒ autoReceiveMessage(messageHandle)
        case msg                      ⇒ receiveMessage(msg)
      }
      currentMessage = null // reset current message after successful invocation
    } catch handleNonFatalOrInterruptedException { e ⇒
      handleInvokeFailure(Nil, e)
    } finally {
      if (influenceReceiveTimeout)
        checkReceiveTimeout // Reschedule receive timeout
    }
  }

  我们暂时先忽略对超时消息的处理,发现invoke最终调用了receiveMessage(msg)。

final def receiveMessage(msg: Any): Unit = actor.aroundReceive(behaviorStack.head, msg)

  receiveMessage又转而调用了actor的aroundReceive方法,actor是什么呢?

private[this] var _actor: Actor = _
  def actor: Actor = _actor
  protected def actor_=(a: Actor): Unit = _actor = a

  通过定义我们知道这就是一个普通的Actor,追踪到这里终于调用了Actor特质的函数了,但仅仅调用了aroundReceive,而不是我们常见的receive函数。

/**
   * INTERNAL API.
   *
   * Can be overridden to intercept calls to this actor's current behavior.
   *
   * @param receive current behavior.
   * @param msg current message.
   */
  @InternalApi
  protected[akka] def aroundReceive(receive: Actor.Receive, msg: Any): Unit = {
    // optimization: avoid allocation of lambda
    if (receive.applyOrElse(msg, Actor.notHandledFun).asInstanceOf[AnyRef] eq Actor.NotHandled) {
      unhandled(msg)
    }
  }

  那就看看aroundReceive的源码喽。通过官方源码说明和代码分析,我们知道这里调用了我们最终定义的receive函数,如果receive没有处理该类型的消息,则调用unhandled。默认情况下unhandled应该会发送给deadletters。

/**
   * User overridable callback.
   * <p/>
   * Is called when a message isn't handled by the current behavior of the actor
   * by default it fails with either a [[akka.actor.DeathPactException]](in
   * case of an unhandled [[akka.actor.Terminated]] message) or publishes an [[akka.actor.UnhandledMessage]]
   * to the actor's system's [[akka.event.EventStream]]
   */
  def unhandled(message: Any): Unit = {
    message match {
      case Terminated(dead) ⇒ throw DeathPactException(dead)
      case _                ⇒ context.system.eventStream.publish(UnhandledMessage(message, sender(), self))
    }
  }

  在invoke方法中,还有一个函数的调用也不得不说:autoReceiveMessage。通过名字我们知道它应该是自动处理的消息,而不需要自己处理。哪些消息不需要我们处理呢?

def autoReceiveMessage(msg: Envelope): Unit = {
    if (system.settings.DebugAutoReceive)
      publish(Debug(self.path.toString, clazz(actor), "received AutoReceiveMessage " + msg))
 
    msg.message match {
      case t: Terminated              ⇒ receivedTerminated(t)
      case AddressTerminated(address) ⇒ addressTerminated(address)
      case Kill                       ⇒ throw ActorKilledException("Kill")
      case PoisonPill                 ⇒ self.stop()
      case sel: ActorSelectionMessage ⇒ receiveSelection(sel)
      case Identify(messageId)        ⇒ sender() ! ActorIdentity(messageId, Some(self))
    }
  }

  Terminated、AddressTerminated、Kill、PoisonPill、ActorSelectionMessage、Identify。怎么样这几个消息是不是比较眼熟呢。当然我们这里就不再展开分析了,不过读者如果对Terminated、AddressTerminated、Kill、PoisonPill几个消息的区别感兴趣的话,可以继续研究一下对应的处理过程。有些是抛异常,有些是调函数,还是有一些区别的。

  另外在invoke方法中,还有一个字段我们也不应该忽略:currentMessage。通过其名称以及相关的代码逻辑,我们知道这仅仅是标志当前消息。那么它有什么用呢?

final def sender(): ActorRef = currentMessage match {
    case null                      ⇒ system.deadLetters
    case msg if msg.sender ne null ⇒ msg.sender
    case _                         ⇒ system.deadLetters
  }

  翻了一下源码,最终定位到以上代码。sender()是不是比较眼熟呢?currentMessage是为了给sender()返回对应的值的。不过有几点需要注意。currentMessage只是一个“临时”值,也就意味着sender()也会是一个“临时”值。临时的意思是在此次消息处理结束之后,对应的currentMessage会变化,sender返回的值也会变化。这也就是告诉读者,sender函数返回的值只有在此次消息处理过程中有效,且只在当前线程有效。如果消息处理时,出现了Future代码块,则在Future代码块内部,不要再调用sender函数!

  invoke方法中还有一段代码同样很重要:handleNonFatalOrInterruptedException。这段代码逻辑比较简单,大概是先中止当前的消息处理,然后中止children的消息处理,最后发送一个系统消息给监督者(也就是父actor)。如果中止当前或children消息过程中也出现了异常,则就是stop所有子actor了。

final def handleInvokeFailure(childrenNotToSuspend: immutable.Iterable[ActorRef], t: Throwable): Unit = {
    // prevent any further messages to be processed until the actor has been restarted
    if (!isFailed) try {
      suspendNonRecursive()
      // suspend children
      val skip: Set[ActorRef] = currentMessage match {
        case Envelope(Failed(_, _, _), child) ⇒ { setFailed(child); Set(child) }
        case _                                ⇒ { setFailed(self); Set.empty }
      }
      suspendChildren(exceptFor = skip ++ childrenNotToSuspend)
      t match {
        // tell supervisor
        case _: InterruptedException ⇒
          //  NEVER SEND THE SAME SYSTEM MESSAGE OBJECT TO TWO ACTORS ⬅⬅⬅
          parent.sendSystemMessage(Failed(self, new ActorInterruptedException(t), uid))
        case _ ⇒
          //  NEVER SEND THE SAME SYSTEM MESSAGE OBJECT TO TWO ACTORS ⬅⬅⬅
          parent.sendSystemMessage(Failed(self, t, uid))
      }
    } catch handleNonFatalOrInterruptedException { e ⇒
      publish(Error(e, self.path.toString, clazz(actor),
        "emergency stop: exception in failure handling for " + t.getClass + Logging.stackTraceFor(t)))
      try children foreach stop
      finally finishTerminate()
    }
  }

  到此为止,我们就把消息的处理过程分析完毕了。但这只是本地actor的大致处理逻辑,并不涉及remote和cluster的处理过程。还有dispatch对线程的调度也并没有深入分析,后面我会根据需要深入分析这些技术细节。

目录
相关文章
|
6月前
|
存储 人工智能 关系型数据库
4年10亿美金,Neon用Serverless PG证明:AI需要的不是“大”,而是“隐形”
AnalyticDB PostgreSQL 版基于Neon架构隆重推出满足 AI 时代应用开发需求的Serverless版本,并且在这之上搭载了结构化分析、向量检索、BM25全文检索和图检索,通过一套引擎满足 AI 应用丰富的数据诉求,支持MCP和OpenAI协议,为企业全面拥抱 AI 配备了数据存储、分析和应用的 “关键” 能力,帮助企业火箭式启动跑赢时代。
|
监控 架构师 Java
JVM 8 调优指南:如何进行JVM调优,JVM调优参数
这篇文章将详细介绍如何进行JVM 8调优,包括JVM 8调优参数及其应用。此外,我将提供12个实用的代码示例,每个示例都会结合JVM启动参数和Java代码。JVM调优是指通过调整Java虚拟机的配置来提升Java应用程序的性能。这包括优化堆内存设置、选择合适的垃圾收集器以及调整其他性能相关的参数。
1180 0
|
小程序
小程序滚动时使标题背景颜色改变
小程序滚动时使标题背景颜色改变
310 0
|
SQL 数据可视化 关系型数据库
【数据库工具】DBeaver:一款免费的通用数据库工具和 SQL 客户端
【数据库工具】DBeaver:一款免费的通用数据库工具和 SQL 客户端
1352 1
|
存储 弹性计算 固态存储
阿里云服务器租用价格参考,2核8G、4核16G、8核32G最新收费标准
阿里云服务器2核8G、4核16G、8核32G配置租用价格参考,2024年阿里云产品再一次降价,降价之后2核8G配置按量收费最低收费标准为0.3375元/小时,按月租用标准收费标准为136.0元/1个月。4核16G配置的阿里云服务器按量收费标准最低为0.675元/小时,按月租用标准收费标准为272.0元/1个月。8核32G配置的阿里云服务器按量收费标准最低为1.35元/小时,按月租用标准收费标准为544.0元/1个月。云服务器实例规格的地域和实例规格不同,收费标准不一样,下面是2024年阿里云服务器2核8G、4核16G、8核32G配置的最新租用收费标准。
阿里云服务器租用价格参考,2核8G、4核16G、8核32G最新收费标准
|
人工智能 自然语言处理 数据库
RAG 技术:让 AI 从 “书呆子” 变身 “开卷小天才”!
鳄叔介绍了RAG(检索增强生成)技术,这是一种让AI既能查资料又能灵活作答的方法,如同“开卷考试”的学霸。RAG结合了检索能力和生成能力,使AI能够实时获取最新信息,提供更专业、精准的回答,广泛应用于企业客服、法律咨询、医疗诊断和教育等领域。
313 0
|
监控 安全 网络安全
inishConnect(..) failed: Connection refused,服务本地正常服务器网关报400,nacos服务实例不能下线
总之,这种问题需要通过多方面的检查和校验来定位和解决,并可能需要结合实际环境的具体情况来进行相应的调整。在处理分布式系统中这类问题时,耐心和细致的调试是必不可少的。
396 13
什么是灰度图像
什么是灰度图像
888 0
|
Unix 数据安全/隐私保护 安全
|
存储 缓存 前端开发
微服务测试:关键策略和工具
开发团队越来越多地选择微服务架构而不是单体结构,以提高应用程序的敏捷性、可扩展性和可维护性。随着决定切换到模块化软件架构——其中每个服务都是一个独立的单元,具有自己的逻辑和数据库,通过 API 与其他单元通信——需要新的测试策略和新的测试工具。
469 0