Akka源码分析-Actor创建(续)

简介:

在上一遍博客中,我们已经分析了actor创建的大致过程,但只是涉及到了Dipatcher/Mailbox/ActorCell/InternalActorRef等对象的创建,并没有介绍我们自定义的继承Actor特质的类如何完成初始化。这篇文章对这一部分内容进行简单的补充。
在akka.actor.dungeon.init代码中,有一段代码我们当时没有分析,此处对此代码进行深入分析,然后才能找到Actor子类完成创建的真实过程。
_
上面是init的代码片段,其中有一个局部变量createMessage,根据前后分析,它的值应该是Create这个case class。最后mailbox.systemEnqueue(self, createMessage)这个代码给actor对应的邮箱发送了该消息。

/**
 * INTERNAL API
 */
@SerialVersionUID(1L)
private[akka] final case class Create(failure: Option[ActorInitializationException]) extends SystemMessage // sent to self from Dispatcher.register

根据Create类名以及前后上下文分析,这应该是指示Actor完成初始化的。那么我们要分析一下actor是如何对该消息响应的。那么究竟是哪段代码对这个消息进行响应的呢?
如果读过之前的文章,你肯定能想起来Mailbox在循环处理消息时,有一个processAllSystemMessages方法,这个方法里面调用了actor的systemInvoke方法。具体源码如下:

/**
   * Will at least try to process all queued system messages: in case of
   * failure simply drop and go on to the next, because there is nothing to
   * restart here (failure is in ActorCell somewhere …). In case the mailbox
   * becomes closed (because of processing a Terminate message), dump all
   * already dequeued message to deadLetters.
   */
  final def processAllSystemMessages() {
    var interruption: Throwable = null
    var messageList = systemDrain(SystemMessageList.LNil)
    while ((messageList.nonEmpty) && !isClosed) {
      val msg = messageList.head
      messageList = messageList.tail
      msg.unlink()
      if (debug) println(actor.self + " processing system message " + msg + " with " + actor.childrenRefs)
      // we know here that systemInvoke ensures that only "fatal" exceptions get rethrown
      actor systemInvoke msg
      if (Thread.interrupted())
        interruption = new InterruptedException("Interrupted while processing system messages")
      // don’t ever execute normal message when system message present!
      if ((messageList.isEmpty) && !isClosed) messageList = systemDrain(SystemMessageList.LNil)
    }
    /*
     * if we closed the mailbox, we must dump the remaining system messages
     * to deadLetters (this is essential for DeathWatch)
     */
    // 忽略剩余源码
  }

我们来研究一下systemInvoke的代码

/*
   * MESSAGE PROCESSING
   */
  //Memory consistency is handled by the Mailbox (reading mailbox status then processing messages, then writing mailbox status
  final def systemInvoke(message: SystemMessage): Unit = {
    /*
     * When recreate/suspend/resume are received while restarting (i.e. between
     * preRestart and postRestart, waiting for children to terminate), these
     * must not be executed immediately, but instead queued and released after
     * finishRecreate returns. This can only ever be triggered by
     * ChildTerminated, and ChildTerminated is not one of the queued message
     * types (hence the overwrite further down). Mailbox sets message.next=null
     * before systemInvoke, so this will only be non-null during such a replay.
     */

    def calculateState: Int =
      if (waitingForChildrenOrNull ne null) SuspendedWaitForChildrenState
      else if (mailbox.isSuspended) SuspendedState
      else DefaultState

    @tailrec def sendAllToDeadLetters(messages: EarliestFirstSystemMessageList): Unit =
      if (messages.nonEmpty) {
        val tail = messages.tail
        val msg = messages.head
        msg.unlink()
        provider.deadLetters ! msg
        sendAllToDeadLetters(tail)
      }

    def shouldStash(m: SystemMessage, state: Int): Boolean =
      (state: @switch) match {
        case DefaultState                  ⇒ false
        case SuspendedState                ⇒ m.isInstanceOf[StashWhenFailed]
        case SuspendedWaitForChildrenState ⇒ m.isInstanceOf[StashWhenWaitingForChildren]
      }

    @tailrec
    def invokeAll(messages: EarliestFirstSystemMessageList, currentState: Int): Unit = {
      val rest = messages.tail
      val message = messages.head
      message.unlink()
      try {
        message match {
          case message: SystemMessage if shouldStash(message, currentState) ⇒ stash(message)
          case f: Failed ⇒ handleFailure(f)
          case DeathWatchNotification(a, ec, at) ⇒ watchedActorTerminated(a, ec, at)
          case Create(failure) ⇒ create(failure)
          case Watch(watchee, watcher) ⇒ addWatcher(watchee, watcher)
          case Unwatch(watchee, watcher) ⇒ remWatcher(watchee, watcher)
          case Recreate(cause) ⇒ faultRecreate(cause)
          case Suspend() ⇒ faultSuspend()
          case Resume(inRespToFailure) ⇒ faultResume(inRespToFailure)
          case Terminate() ⇒ terminate()
          case Supervise(child, async) ⇒ supervise(child, async)
          case NoMessage ⇒ // only here to suppress warning
        }
      } catch handleNonFatalOrInterruptedException { e ⇒
        handleInvokeFailure(Nil, e)
      }
      val newState = calculateState
      // As each state accepts a strict subset of another state, it is enough to unstash if we "walk up" the state
      // chain
      val todo = if (newState < currentState) unstashAll() reverse_::: rest else rest

      if (isTerminated) sendAllToDeadLetters(todo)
      else if (todo.nonEmpty) invokeAll(todo, newState)
    }

    invokeAll(new EarliestFirstSystemMessageList(message), calculateState)
  }

由于我们只是准备分析actor的创建过程,所以上面的代码,我们只关注对Create消息的处理:create(failure)。也就是说调用了create函数。

protected def create(failure: Option[ActorInitializationException]): Unit = {

    def clearOutActorIfNonNull(): Unit = {
      if (actor != null) {
        clearActorFields(actor, recreate = false)
        actor = null // ensure that we know that we failed during creation
      }
    }

    failure.foreach { throw _ }

    try {
      val created = newActor()
      actor = created
      created.aroundPreStart()
      checkReceiveTimeout
      if (system.settings.DebugLifecycle) publish(Debug(self.path.toString, clazz(created), "started (" + created + ")"))
    } catch {
      case e: InterruptedException ⇒
        clearOutActorIfNonNull()
        Thread.currentThread().interrupt()
        throw ActorInitializationException(self, "interruption during creation", e)
      case NonFatal(e) ⇒
        clearOutActorIfNonNull()
        e match {
          case i: InstantiationException ⇒ throw ActorInitializationException(
            self,
            """exception during creation, this problem is likely to occur because the class of the Actor you tried to create is either,
               a non-static inner class (in which case make it a static inner class or use Props(new ...) or Props( new Creator ... )
               or is missing an appropriate, reachable no-args constructor.
              """, i.getCause)
          case x ⇒ throw ActorInitializationException(self, "exception during creation", x)
        }
    }
  }

我们来分析一下这个create函数。其中主要的逻辑都在try中,首先调用newActor函数,创建了Actor实例,然后赋值给actor字段。actor字段我们已经知道,这是ActorCell的最终actor实例。

/*
   * ACTOR INSTANCE HANDLING
   */

  //This method is in charge of setting up the contextStack and create a new instance of the Actor
  protected def newActor(): Actor = {
    contextStack.set(this :: contextStack.get)
    try {
      behaviorStack = emptyBehaviorStack
      val instance = props.newActor()

      if (instance eq null)
        throw ActorInitializationException(self, "Actor instance passed to actorOf can't be 'null'")

      // If no becomes were issued, the actors behavior is its receive method
      behaviorStack = if (behaviorStack.isEmpty) instance.receive :: behaviorStack else behaviorStack
      instance
    } finally {
      val stackAfter = contextStack.get
      if (stackAfter.nonEmpty)
        contextStack.set(if (stackAfter.head eq null) stackAfter.tail.tail else stackAfter.tail) // pop null marker plus our context
    }
  }

newActor函数源码如上,抛去其他代码,该函数调用了props.newActor创建了最终的Actor实例,也就是我们自定义的Actor子类。通过源码注释我们知道behaviorStack是actor当前行为的一个栈。如果读者用过become的话,对这段代码应该比较好理解。我们在actor内部使用become方法改变当前actor实例的时候,其实是把新的receive函数压入栈顶,mailbox在调用receive时,其实是取出当前栈顶的receive函数进行处理的。当然这是akka以前版本的默认行为。为什么这样说呢?因为新版本默认行为就是简单的把最新的receive函数替换旧receive函数,如果想恢复旧receive函数,需要开发者在编码时,再次调用become用旧receive函数替换当前receive。为什么要这么做?当然是为了防止开发者恶意或者无意中胡乱调用become,造成栈溢出喽。
props.newActor我们不再深入分析,这应该就是通过反射创建Actor特质的子类,也就是我们自定义的actor。
至此,我们自定义的actor就真正完成了初始化。细心的读者一定会发现,就连actor最终的实例化,都是异步的。因为newActor是通过Create消息触发的,而Mailbox对所有消息的处理都是在单独的线程处理的。如果actor的创建过程中有一些线程不安全的代码,就需要注意喽。

目录
相关文章
|
运维 Cloud Native Go
解决Nacos服务器连接问题:一次完整的排查经验分享
解决Nacos服务器连接问题:一次完整的排查经验分享
2475 1
Arkts http数据请求
Arkts http数据请求
972 0
|
JavaScript 前端开发 安全
开发业务需求有必要引入 TypeScript 吗?
随着前端技术的不断更新和发展,TypeScript作为一种静态类型的JavaScript超集语言,逐渐在业界崭露头角,尤其是在当今快速发展的软件开发环境中,选择适合的开发工具和技术变得至关重要。在项目规模和复杂性的增加的同时,保证代码质量、可读性和可维护性成为开发团队的重要任务。这样的背景下,引入TypeScript作为一种开发工具来弥补JavaScript的某些弱点,已经成为许多开发团队的选择。那么TypeScript是否值得在业务中引入?它是否会取代JavaScript?那么本文就来聊聊在业务开发过程中是否有必要引入TypeScript,并讨论一下对于现代前端框架发展的看法和期待。
359 0
开发业务需求有必要引入 TypeScript 吗?
|
人工智能 JSON API
使用 Qwen 生成数据模型和进行结构化输出
本教程展示如何使用CAMEL框架和Qwen模型生成结构化数据。CAMEL是一个强大的多智能体框架,支持复杂的AI任务;Qwen由阿里云开发,具备自然语言处理等先进能力。教程涵盖安装、API密钥设置、定义Pydantic模型,并演示了通过Qwen生成JSON格式的学生信息。最后,介绍了如何利用Qwen生成多个随机学生信息的JSON格式数据。欢迎在[CAMEL GitHub](https://github.com/camel-ai/camel)上为项目点星支持。
4246 70
|
API 数据安全/隐私保护 开发者
京东商品评论数据接口(JD.item_review)丨京东API接口指南
京东商品评论数据接口(JD.item_review)让开发者获取京东商品的评论列表、内容、时间、买家昵称等详细信息,助力产品优化和市场研究。使用步骤包括注册京东开发者账号、创建应用并申请API权限、获取API密钥、阅读API文档,最后通过HTTP请求调用接口获取数据。示例代码展示了如何使用Python进行请求。
1243 9
|
传感器 人工智能 物联网
探索智能家居技术:现状与未来
本文深入探讨了智能家居技术的发展历程、当前主要技术和应用,并展望了其未来的发展趋势。通过对现有技术的详细解析和案例分析,揭示了智能家居在提升生活品质、节能减排等方面的潜力,同时指出了目前面临的挑战和可能的解决方案。
|
域名解析 缓存 负载均衡
怎么设置腾讯云CDN缓存规则
CDN主要功能是在不同的地点缓存内容,通过负载均衡技术,将用户的请求定向到最合适的缓存服务器上去获取内容,比如说,是北京的用户,我们让他访问北京的节点,深圳的用户,我们让他访问深圳的节点。通过就近访问,加速用户对网站的访问。解决Internet网络拥堵状况,提高用户访问网络的响应速度。
805 156
怎么设置腾讯云CDN缓存规则
|
SQL 机器学习/深度学习 缓存
Go语言Web应用实战与案例分析
【2月更文挑战第21天】本文将通过实战案例的方式,深入探讨Go语言在Web应用开发中的应用。我们将分析一个实际项目的开发过程,展示Go语言在构建高性能、可扩展Web应用方面的优势,并分享在开发过程中遇到的问题和解决方案,为读者提供宝贵的实战经验。
|
小程序 前端开发 JavaScript
微信小程序|校园失物招领系统的设计与实现
微信小程序|校园失物招领系统的设计与实现
466 0
|
NoSQL Java Redis
SpringBoot整合Redis及StringRedisTemplate的使用
SpringBoot整合Redis及StringRedisTemplate的使用
495 0

热门文章

最新文章

下一篇
开通oss服务