Spark修炼之道(高级篇)——Spark源码阅读:第七节 resourceOffers方法与launchTasks方法解析

简介: 在上一节中,我们提到Task提交通过makeOffers提交到Executor上 // Make fake resource offers on just one executor private def makeOffers(executorId: String) { // Filter out executors under killin

在上一节中,我们提到Task提交通过makeOffers提交到Executor上

    // Make fake resource offers on just one executor
    private def makeOffers(executorId: String) {
      // Filter out executors under killing
      if (!executorsPendingToRemove.contains(executorId)) {
        val executorData = executorDataMap(executorId)
        val workOffers = Seq(
          new WorkerOffer(executorId, executorData.executorHost, executorData.freeCores))
        launchTasks(scheduler.resourceOffers(workOffers))
      }
    }

上面的代码依赖于两个重要方法,它们分别是TaskSchedulerImpl resourceOffers方法及CoarseGrainedSchedulerBackend的launchTasks方法

//TaskSchedulerImpl resourceOffers方法
 /**
   * Called by cluster manager to offer resources on slaves. We respond by asking our active task
   * sets for tasks in order of priority. We fill each node with tasks in a round-robin manner so
   * that tasks are balanced across the cluster.
   */
  def resourceOffers(offers: Seq[WorkerOffer]): Seq[Seq[TaskDescription]] = synchronized {
    // Mark each slave as alive and remember its hostname
    // Also track if new executor is added
    // 处理新的executor加入
    var newExecAvail = false
    for (o <- offers) {
      executorIdToHost(o.executorId) = o.host
      activeExecutorIds += o.executorId
      if (!executorsByHost.contains(o.host)) {
        executorsByHost(o.host) = new HashSet[String]()
        executorAdded(o.executorId, o.host)
        newExecAvail = true
      }
      for (rack <- getRackForHost(o.host)) {
        hostsByRack.getOrElseUpdate(rack, new HashSet[String]()) += o.host
      }
    }

    // Randomly shuffle offers to avoid always placing tasks on the same set of workers.
    //随机打散,使Task均匀分配各Worker节点上
    val shuffledOffers = Random.shuffle(offers)
    // Build a list of tasks to assign to each worker.
    val tasks = shuffledOffers.map(o => new ArrayBuffer[TaskDescription](o.cores))
    val availableCpus = shuffledOffers.map(o => o.cores).toArray

    //根据调度策略获取ArrayBuffer[TaskSetManager]
    val sortedTaskSets = rootPool.getSortedTaskSetQueue
    for (taskSet <- sortedTaskSets) {
      logDebug("parentName: %s, name: %s, runningTasks: %s".format(
        taskSet.parent.name, taskSet.name, taskSet.runningTasks))
      if (newExecAvail) {
        taskSet.executorAdded()
      }
    }

    // Take each TaskSet in our scheduling order, and then offer it each node in increasing order
    // of locality levels so that it gets a chance to launch local tasks on all of them.
    // NOTE: the preferredLocality order: PROCESS_LOCAL, NODE_LOCAL, NO_PREF, RACK_LOCAL, ANY
    //按就近原则进行Task调度
    var launchedTask = false
    for (taskSet <- sortedTaskSets; maxLocality <- taskSet.myLocalityLevels) {
      do {
        launchedTask = resourceOfferSingleTaskSet(
            taskSet, maxLocality, shuffledOffers, availableCpus, tasks)
      } while (launchedTask)
    }

    if (tasks.size > 0) {
      hasLaunchedTask = true
    }
    return tasks
  }

调用完resourceOffers方法后,再调用launchTasks方法,最终在Worker节点上启动任务的运行

//CoarseGrainedSchedulerBackend中的launchTasks方法
 // Launch tasks returned by a set of resource offers
    private def launchTasks(tasks: Seq[Seq[TaskDescription]]) {
      for (task <- tasks.flatten) {
        val serializedTask = ser.serialize(task)
        //序列化后的任何不能超过设定的大小
        if (serializedTask.limit >= akkaFrameSize - AkkaUtils.reservedSizeBytes) {
          scheduler.taskIdToTaskSetManager.get(task.taskId).foreach { taskSetMgr =>
            try {
              var msg = "Serialized task %s:%d was %d bytes, which exceeds max allowed: " +
                "spark.akka.frameSize (%d bytes) - reserved (%d bytes). Consider increasing " +
                "spark.akka.frameSize or using broadcast variables for large values."
              msg = msg.format(task.taskId, task.index, serializedTask.limit, akkaFrameSize,
                AkkaUtils.reservedSizeBytes)
              taskSetMgr.abort(msg)
            } catch {
              case e: Exception => logError("Exception in error callback", e)
            }
          }
        }
        else {
          val executorData = executorDataMap(task.executorId)
          executorData.freeCores -= scheduler.CPUS_PER_TASK
          //Worker节点上的CoarseGrainedExecutorBackend对象将接受LaunchTask消息,在Worker节点的Executor上启动Task的执行
          executorData.executorEndpoint.send(LaunchTask(new SerializableBuffer(serializedTask)))
        }
      }
    }
目录
相关文章
|
8月前
|
监控 安全 网络安全
深入解析PDCERF:网络安全应急响应的六阶段方法
PDCERF是网络安全应急响应的六阶段方法,涵盖准备、检测、抑制、根除、恢复和跟进。本文详细解析各阶段目标与操作步骤,并附图例,助读者理解与应用,提升组织应对安全事件的能力。
1055 89
|
11月前
|
人工智能
歌词结构的巧妙安排:写歌词的方法与技巧解析,妙笔生词AI智能写歌词软件
歌词创作是一门艺术,关键在于巧妙的结构安排。开头需迅速吸引听众,主体部分要坚实且富有逻辑,结尾则应留下深刻印象。《妙笔生词智能写歌词软件》提供多种 AI 功能,帮助创作者找到灵感,优化歌词结构,写出打动人心的作品。
|
11月前
|
存储 算法 Java
解析HashSet的工作原理,揭示Set如何利用哈希算法和equals()方法确保元素唯一性,并通过示例代码展示了其“无重复”特性的具体应用
在Java中,Set接口以其独特的“无重复”特性脱颖而出。本文通过解析HashSet的工作原理,揭示Set如何利用哈希算法和equals()方法确保元素唯一性,并通过示例代码展示了其“无重复”特性的具体应用。
179 3
|
9月前
|
存储 Java 开发者
浅析JVM方法解析、创建和链接
上一篇文章《你知道Java类是如何被加载的吗?》分析了HotSpot是如何加载Java类的,本文再来分析下Hotspot又是如何解析、创建和链接类方法的。
471 132
|
7月前
|
编解码 缓存 Prometheus
「ximagine」业余爱好者的非专业显示器测试流程规范,同时也是本账号输出内容的数据来源!如何测试显示器?荒岛整理总结出多种测试方法和注意事项,以及粗浅的原理解析!
本期内容为「ximagine」频道《显示器测试流程》的规范及标准,我们主要使用Calman、DisplayCAL、i1Profiler等软件及CA410、Spyder X、i1Pro 2等设备,是我们目前制作内容数据的重要来源,我们深知所做的仍是比较表面的活儿,和工程师、科研人员相比有着不小的差距,测试并不复杂,但是相当繁琐,收集整理测试无不花费大量时间精力,内容不完善或者有错误的地方,希望大佬指出我们好改进!
431 16
「ximagine」业余爱好者的非专业显示器测试流程规范,同时也是本账号输出内容的数据来源!如何测试显示器?荒岛整理总结出多种测试方法和注意事项,以及粗浅的原理解析!
|
9月前
|
安全 Ubuntu Shell
深入解析 vsftpd 2.3.4 的笑脸漏洞及其检测方法
本文详细解析了 vsftpd 2.3.4 版本中的“笑脸漏洞”,该漏洞允许攻击者通过特定用户名和密码触发后门,获取远程代码执行权限。文章提供了漏洞概述、影响范围及一个 Python 脚本,用于检测目标服务器是否受此漏洞影响。通过连接至目标服务器并尝试登录特定用户名,脚本能够判断服务器是否存在该漏洞,并给出相应的警告信息。
510 84
|
11月前
|
人工智能
写歌词的技巧和方法全解析:开启你的音乐创作之旅,妙笔生词智能写歌词软件
怀揣音乐梦想,渴望用歌词抒发情感?掌握关键技巧,你也能踏上创作之旅。灵感来自生活点滴,主题明确,语言简洁,韵律和谐。借助“妙笔生词智能写歌词软件”,AI辅助创作,轻松写出动人歌词,实现音乐梦想。
|
6月前
|
JSON 监控 网络协议
Bilibili直播信息流:连接方法与数据解析
本文详细介绍了自行实现B站直播WebSocket连接的完整流程。解析了基于WebSocket的应用层协议结构,涵盖认证包构建、心跳机制维护及数据包解析步骤,为开发者定制直播数据监控提供了完整技术方案。
|
6月前
|
安全 IDE Java
重学Java基础篇—Java Object类常用方法深度解析
Java中,Object类作为所有类的超类,提供了多个核心方法以支持对象的基本行为。其中,`toString()`用于对象的字符串表示,重写时应包含关键信息;`equals()`与`hashCode()`需成对重写,确保对象等价判断的一致性;`getClass()`用于运行时类型识别;`clone()`实现对象复制,需区分浅拷贝与深拷贝;`wait()/notify()`支持线程协作。此外,`finalize()`已过时,建议使用更安全的资源管理方式。合理运用这些方法,并遵循最佳实践,可提升代码质量与健壮性。
171 1
|
6月前
|
传感器 监控 Java
Java代码结构解析:类、方法、主函数(1分钟解剖室)
### Java代码结构简介 掌握Java代码结构如同拥有程序世界的建筑蓝图,类、方法和主函数构成“黄金三角”。类是独立的容器,承载成员变量和方法;方法实现特定功能,参数控制输入环境;主函数是程序入口。常见错误包括类名与文件名不匹配、忘记static修饰符和花括号未闭合。通过实战案例学习电商系统、游戏角色控制和物联网设备监控,理解类的作用、方法类型和主函数任务,避免典型错误,逐步提升编程能力。 **脑图速记法**:类如太空站,方法即舱段;main是发射台,static不能换;文件名对仗,括号要成双;参数是坐标,void不返航。
231 5

推荐镜像

更多
  • DNS