开发者社区> 微服务> 正文


简介:   上一篇博客我们介绍了ActorSystem的创建过程,下面我们就研究一下actor的创建过程。 val system = ActorSystem("firstActorSystem",ConfigFactory.load()) val helloActor= system.actorOf(Props(new HelloActor),"HelloActor") helloActor ! "Hello"    普通情况下,我们一般使用ActorSystem的actorOf来创建actor,当然通过上一篇博客的介绍,我们已经知道actorOf是继承自ActorRefFactory的函数。


val system = ActorSystem("firstActorSystem",ConfigFactory.load())
val helloActor= system.actorOf(Props(new HelloActor),"HelloActor")
helloActor ! "Hello"


def actorOf(props: Props, name: String): ActorRef =
    if (guardianProps.isEmpty) guardian.underlying.attachChild(props, name, systemService = false)
    else throw new UnsupportedOperationException(
      s"cannot create top-level actor [$name] from the outside on ActorSystem with custom user guardian")

   也比较简单,就是判断一下guardianProps是不是为空,为空则调用guardian.underlying.attachChild方法创建一个ActorRef。new ActorSystemImpl(name, appConfig, cl, defaultEC, None, setup).start() 这段代码显示在创建ActorSystemImpl时,guardianProps一定为空,具体guardianProps的作用我们暂时先忽略。

def guardian: LocalActorRef = provider.guardian
   * Reference to the supervisor used for all top-level user actors.
def guardian: LocalActorRef


 *  Local (serializable) ActorRef that is used when referencing the Actor on its "home" node.
private[akka] class LocalActorRef private[akka] (
  _system:           ActorSystemImpl,
  _props:            Props,
  _dispatcher:       MessageDispatcher,
  _mailboxType:      MailboxType,
  _supervisor:       InternalActorRef,
  override val path: ActorPath)
  extends ActorRefWithCell with LocalRef 


 override lazy val guardian: LocalActorRef = {
    val cell = rootGuardian.underlying
    val ref = new LocalActorRef(system, system.guardianProps.getOrElse(Props(classOf[LocalActorRefProvider.Guardian], guardianStrategy)),
      defaultDispatcher, defaultMailbox, rootGuardian, rootPath / "user")

   分析上面的代码我们看到,LocalActorRef创建时传入了几个非常重要的参数:defaultDispatcher、defaultMailbox、rootGuardian和rootPath / "user"。之所以重要,是因为通过它们我们可以再深入actor的创建过程。Dispatcher和mailbox都是actor运行非常重要的概念,其中mailbox负责存储actor收到的消息,dispatcher负责从mailbox取消息,分配线程给actor执行具体的业务逻辑。我们逐一进行简要分析。

   * The one and only default dispatcher.
  def defaultGlobalDispatcher: MessageDispatcher = lookup(DefaultDispatcherId)


   * The id of the default dispatcher, also the full key of the
   * configuration of the default dispatcher.
  final val DefaultDispatcherId = "akka.actor.default-dispatcher"


default-dispatcher {
      # Must be one of the following
      # Dispatcher, PinnedDispatcher, or a FQCN to a class inheriting
      # MessageDispatcherConfigurator with a public constructor with
      # both com.typesafe.config.Config parameter and
      # akka.dispatch.DispatcherPrerequisites parameters.
      # PinnedDispatcher must be used together with executor=thread-pool-executor.
      type = "Dispatcher"

      # Which kind of ExecutorService to use for this dispatcher
      # Valid options:
      #  - "default-executor" requires a "default-executor" section
      #  - "fork-join-executor" requires a "fork-join-executor" section
      #  - "thread-pool-executor" requires a "thread-pool-executor" section
      #  - "affinity-pool-executor" requires an "affinity-pool-executor" section
      #  - A FQCN of a class extending ExecutorServiceConfigurator
      executor = "default-executor"

      # This will be used if you have set "executor = "default-executor"".
      # If an ActorSystem is created with a given ExecutionContext, this
      # ExecutionContext will be used as the default executor for all
      # dispatchers in the ActorSystem configured with
      # executor = "default-executor". Note that "default-executor"
      # is the default value for executor, and therefore used if not
      # specified otherwise. If no ExecutionContext is given,
      # the executor configured in "fallback" will be used.
      default-executor {
        fallback = "fork-join-executor"


abstract class MessageDispatcherConfigurator(_config: Config, val prerequisites: DispatcherPrerequisites) {

  val config: Config = new CachingConfig(_config)

   * Returns an instance of MessageDispatcher given the configuration.
   * Depending on the needs the implementation may return a new instance for
   * each invocation or return the same instance every time.
  def dispatcher(): MessageDispatcher

  def configureExecutor(): ExecutorServiceConfigurator = {
    def configurator(executor: String): ExecutorServiceConfigurator = executor match {
      case null | "" | "fork-join-executor" ⇒ new ForkJoinExecutorConfigurator(config.getConfig("fork-join-executor"), prerequisites)
      case "thread-pool-executor"           ⇒ new ThreadPoolExecutorConfigurator(config.getConfig("thread-pool-executor"), prerequisites)
      case "affinity-pool-executor"         ⇒ new AffinityPoolConfigurator(config.getConfig("affinity-pool-executor"), prerequisites)

      case fqcn ⇒
        val args = List(
          classOf[Config] → config,
          classOf[DispatcherPrerequisites] → prerequisites)
        prerequisites.dynamicAccess.createInstanceFor[ExecutorServiceConfigurator](fqcn, args).recover({
          case exception ⇒ throw new IllegalArgumentException(
            ("""Cannot instantiate ExecutorServiceConfigurator ("executor = [%s]"), defined in [%s],
                make sure it has an accessible constructor with a [%s,%s] signature""")
              .format(fqcn, config.getString("id"), classOf[Config], classOf[DispatcherPrerequisites]), exception)

    config.getString("executor") match {
      case "default-executor" ⇒ new DefaultExecutorServiceConfigurator(config.getConfig("default-executor"), prerequisites, configurator(config.getString("default-executor.fallback")))
      case other              ⇒ configurator(other)


  private def lookupConfigurator(id: String): MessageDispatcherConfigurator = {
    dispatcherConfigurators.get(id) match {
      case null ⇒
        // It doesn't matter if we create a dispatcher configurator that isn't used due to concurrent lookup.
        // That shouldn't happen often and in case it does the actual ExecutorService isn't
        // created until used, i.e. cheap.
        val newConfigurator =
          if (cachingConfig.hasPath(id)) configuratorFrom(config(id))
          else throw new ConfigurationException(s"Dispatcher [$id] not configured")

        dispatcherConfigurators.putIfAbsent(id, newConfigurator) match {
          case null     ⇒ newConfigurator
          case existing ⇒ existing

      case existing ⇒ existing


  private def configuratorFrom(cfg: Config): MessageDispatcherConfigurator = {
    if (!cfg.hasPath("id")) throw new ConfigurationException("Missing dispatcher 'id' property in config: " + cfg.root.render)

    cfg.getString("type") match {
      case "Dispatcher" ⇒ new DispatcherConfigurator(cfg, prerequisites)
      case "BalancingDispatcher" ⇒
        // FIXME remove this case in 2.4
        throw new IllegalArgumentException("BalancingDispatcher is deprecated, use a BalancingPool instead. " +
          "During a migration period you can still use BalancingDispatcher by specifying the full class name: " +
      case "PinnedDispatcher" ⇒ new PinnedDispatcherConfigurator(cfg, prerequisites)
      case fqn ⇒
        val args = List(classOf[Config] → cfg, classOf[DispatcherPrerequisites] → prerequisites)
        prerequisites.dynamicAccess.createInstanceFor[MessageDispatcherConfigurator](fqn, args).recover({
          case exception ⇒
            throw new ConfigurationException(
              ("Cannot instantiate MessageDispatcherConfigurator type [%s], defined in [%s], " +
                "make sure it has constructor with [com.typesafe.config.Config] and " +
                "[akka.dispatch.DispatcherPrerequisites] parameters")
                .format(fqn, cfg.getString("id")), exception)


 * Configurator for creating [[akka.dispatch.Dispatcher]].
 * Returns the same dispatcher instance for each invocation
 * of the `dispatcher()` method.
class DispatcherConfigurator(config: Config, prerequisites: DispatcherPrerequisites)
  extends MessageDispatcherConfigurator(config, prerequisites) {

  private val instance = new Dispatcher(

   * Returns the same dispatcher instance for each invocation
  override def dispatcher(): MessageDispatcher = instance


private lazy val defaultMailbox = system.mailboxes.lookup(Mailboxes.DefaultMailboxId)

  跟dispatcher有点类似,也是同样的lookup创建的,当然这也是为了可配置(DefaultMailboxId = "akka.actor.default-mailbox")。跟踪lookup来到以下代码。

 private def lookupConfigurator(id: String): MailboxType = {
    mailboxTypeConfigurators.get(id) match {
      case null ⇒
        // It doesn't matter if we create a mailbox type configurator that isn't used due to concurrent lookup.
        val newConfigurator = id match {
          // TODO RK remove these two for Akka 2.3
          case "unbounded" ⇒ UnboundedMailbox()
          case "bounded"   ⇒ new BoundedMailbox(settings, config(id))
          case _ ⇒
            if (!settings.config.hasPath(id)) throw new ConfigurationException(s"Mailbox Type [${id}] not configured")
            val conf = config(id)

            val mailboxType = conf.getString("mailbox-type") match {
              case "" ⇒ throw new ConfigurationException(s"The setting mailbox-type, defined in [$id] is empty")
              case fqcn ⇒
                val args = List(classOf[ActorSystem.Settings] → settings, classOf[Config] → conf)
                dynamicAccess.createInstanceFor[MailboxType](fqcn, args).recover({
                  case exception ⇒
                    throw new IllegalArgumentException(
                      s"Cannot instantiate MailboxType [$fqcn], defined in [$id], make sure it has a public" +
                        " constructor with [akka.actor.ActorSystem.Settings, com.typesafe.config.Config] parameters",

            if (!mailboxNonZeroPushTimeoutWarningIssued) {
              mailboxType match {
                case m: ProducesPushTimeoutSemanticsMailbox if m.pushTimeOut.toNanos > 0L ⇒
                  warn(s"Configured potentially-blocking mailbox [$id] configured with non-zero pushTimeOut (${m.pushTimeOut}), " +
                    s"which can lead to blocking behavior when sending messages to this mailbox. " +
                    s"Avoid this by setting `$id.mailbox-push-timeout-time` to `0`.")
                  mailboxNonZeroPushTimeoutWarningIssued = true
                case _ ⇒ // good; nothing to see here, move along, sir.


        mailboxTypeConfigurators.putIfAbsent(id, newConfigurator) match {
          case null     ⇒ newConfigurator
          case existing ⇒ existing

      case existing ⇒ existing


default-mailbox {
      # FQCN of the MailboxType. The Class of the FQCN must have a public
      # constructor with
      # (akka.actor.ActorSystem.Settings, com.typesafe.config.Config) parameters.
      mailbox-type = "akka.dispatch.UnboundedMailbox"

      # If the mailbox is bounded then it uses this setting to determine its
      # capacity. The provided value must be positive.
      # NOTICE:
      # Up to version 2.1 the mailbox type was determined based on this setting;
      # this is no longer the case, the type must explicitly be a bounded mailbox.
      mailbox-capacity = 1000

      # If the mailbox is bounded then this is the timeout for enqueueing
      # in case the mailbox is full. Negative values signify infinite
      # timeout, which should be avoided as it bears the risk of dead-lock.
      mailbox-push-timeout-time = 10s

      # For Actor with Stash: The default capacity of the stash.
      # If negative (or zero) then an unbounded stash is used (default)
      # If positive then a bounded stash is used and the capacity is set using
      # the property
      stash-capacity = -1

   在lookupConfigurator函数中有一段很重要的代码:dynamicAccess.createInstanceFor[MailboxType](fqcn, args)。它同样调用了dynamicAccess创建了一个MailboxType的实例,实例的类型就是mailbox-type的值。那么akka.dispatch.UnboundedMailbox究竟又是怎么样的呢?

 * MailboxType is a factory to create MessageQueues for an optionally
 * provided ActorContext.
 * <b>Possibly Important Notice</b>
 * When implementing a custom mailbox type, be aware that there is special
 * semantics attached to `system.actorOf()` in that sending to the returned
 * ActorRef may—for a short period of time—enqueue the messages first in a
 * dummy queue. Top-level actors are created in two steps, and only after the
 * guardian actor has performed that second step will all previously sent
 * messages be transferred from the dummy queue into the real mailbox.
trait MailboxType {
  def create(owner: Option[ActorRef], system: Option[ActorSystem]): MessageQueue

trait ProducesMessageQueue[T <: MessageQueue]

 * UnboundedMailbox is the default unbounded MailboxType used by Akka Actors.
final case class UnboundedMailbox() extends MailboxType with ProducesMessageQueue[UnboundedMailbox.MessageQueue] {

  def this(settings: ActorSystem.Settings, config: Config) = this()

  final override def create(owner: Option[ActorRef], system: Option[ActorSystem]): MessageQueue =
    new UnboundedMailbox.MessageQueue








   * Initialize: make a dummy cell which holds just a mailbox, then tell our
   * supervisor that we exist so that he can create the real Cell in
   * handleSupervise().
   * Call twice on your own peril!
   * This is protected so that others can have different initialization.
  def initialize(async: Boolean): this.type =
    underlying match {
      case null ⇒
        swapCell(new UnstartedCell(system, this, props, supervisor))
        supervisor.sendSystemMessage(Supervise(this, async))
        if (!async) point(false)
      case other ⇒ throw new IllegalStateException("initialize called more than once!")

   * This method is supposed to be called by the supervisor in handleSupervise()
   * to replace the UnstartedCell with the real one. It assumes no concurrent
   * modification of the `underlying` field, though it is safe to send messages
   * at any time.
  def point(catchFailures: Boolean): this.type


  到这里system.actorOf基本就算执行结束,它返回了一个InternalActorRef,这是ActorRef的一个子类。这样,后续的代码就可以使用 ! 或tell给actor发送消息了。不过我们虽然大致研究了actor的创建过程,但并没有进入深入的研究,比如,我们自身的actor的实现类是在什么时候初始化的并不知道。当然这并不妨碍我们继续研究akka的源码。


+ 订阅