《深入理解Spark:核心思想与源码分析》——3.4节SparkUI详解

简介:

本节书摘来自华章社区《深入理解Spark:核心思想与源码分析》一书中的第3章,第3.4节SparkUI详解,作者耿嘉安,更多章节内容可以访问云栖社区“华章社区”公众号查看

3.4 SparkUI详解
任何系统都需要提供监控功能,用浏览器能访问具有样式及布局并提供丰富监控数据的页面无疑是一种简单、高效的方式。SparkUI就是这样的服务,它的架构如图3-1所示。
在大型分布式系统中,采用事件监听机制是最常见的。为什么要使用事件监听机制?假如SparkUI采用Scala的函数调用方式,那么随着整个集群规模的增加,对函数的调用会越来越多,最终会受到Driver所在JVM的线程数量限制而影响监控数据的更新,甚至出现监控数据无法及时显示给用户的情况。由于函数调用多数情况下是同步调用,这就导致线程被阻塞,在分布式环境中,还可能因为网络问题,导致线程被长时间占用。将函数调用更换为发送事件,事件的处理是异步的,当前线程可以继续执行后续逻辑,线程池中的线程还可以被重用,这样整个系统的并发度会大大增加。发送的事件会存入缓存,由定时调度器取出后,分配给监听此事件的监听器对监控数据进行更新。


d9c88cae45adc431164e3ece881fa3d25ee9944f

我们先简单介绍图3-1中的各个组件:DAGScheduler是主要的产生各类SparkListener-Event的源头,它将各种SparkListenerEvent发送到listenerBus的事件队列中,listenerBus通过定时器将SparkListenerEvent事件匹配到具体的SparkListener,改变SparkListener中的统计监控数据,最终由SparkUI的界面展示。从图3-1中还可以看到Spark里定义了很多监听器SparkListener的实现,包括JobProgressListener、EnvironmentListener、StorageListener、ExecutorsListener,它们的类继承体系如图3-2所示。


c3b546f2e965a1d363f71fcd1dade10d483c84d6

3.4.1 listenerBus详解
listenerBus的类型是LiveListenerBus。LiveListenerBus实现了监听器模型,通过监听事件触发对各种监听器监听状态信息的修改,达到UI界面的数据刷新效果。LiveListenerBus由以下部分组成:
事件阻塞队列:类型为LinkedBlockingQueue[SparkListenerEvent],固定大小是10 000;
监听器数组:类型为ArrayBuffer[SparkListener],存放各类监听器SparkListener。
事件匹配监听器的线程:此Thread不断拉取LinkedBlockingQueue中的事件,遍历监听器,调用监听器的方法。任何事件都会在LinkedBlockingQueue中存在一段时间,然后Thread处理了此事件后,会将其清除。因此使用listenerBus这个名字再合适不过了,到站就下车。listenerBus的实现见代码清单3-15。
代码清单3-15 LiveListenerBus的事件处理实现

private val EVENT_QUEUE_CAPACITY = 10000
    private val eventQueue = new LinkedBlockingQueue[SparkListenerEvent](EVENT_QUEUE_CAPACITY)
    private var queueFullErrorMessageLogged = false
    private var started = false
    // A counter that represents the number of events produced and consumed in the queue
    private val eventLock = new Semaphore(0)

    private val listenerThread = new Thread("SparkListenerBus") {
      setDaemon(true)
      override def run(): Unit = Utils.logUncaughtExceptions {
        while (true) {
            eventLock.acquire()
            // Atomically remove and process this event
            LiveListenerBus.this.synchronized {
                val event = eventQueue.poll
                if (event == SparkListenerShutdown) {
                    // Get out of the while loop and shutdown the daemon thread
                    return
                }
                Option(event).foreach(postToAll)
            }
        }
    }
}

def start() {
    if (started) {
        throw new IllegalStateException("Listener bus already started!")
    }
    listenerThread.start()
    started = true
    }
def post(event: SparkListenerEvent) {
    val eventAdded = eventQueue.offer(event)
    if (eventAdded) {
        eventLock.release()
    } else {
        logQueueFullErrorMessage()
    }
}
  
def listenerThreadIsAlive: Boolean = synchronized { listenerThread.isAlive }

def queueIsEmpty: Boolean = synchronized { eventQueue.isEmpty }

def stop() {
   if (!started) {
        throw new IllegalStateException("Attempted to stop a listener bus that has not yet started!")
    }
    post(SparkListenerShutdown)
    listenerThread.join()
}
LiveListenerBus中调用的postToAll方法实际定义在父类SparkListenerBus中,如代码清单3-16所示。
代码清单3-16 SparkListenerBus中的监听器调用
protected val sparkListeners = new ArrayBuffer[SparkListener]
    with mutable.SynchronizedBuffer[SparkListener]

def addListener(listener: SparkListener) {
    sparkListeners += listener
}

def postToAll(event: SparkListenerEvent) {
    event match {
        case stageSubmitted: SparkListenerStageSubmitted =>
            foreachListener(_.onStageSubmitted(stageSubmitted))
        case stageCompleted: SparkListenerStageCompleted =>
            foreachListener(_.onStageCompleted(stageCompleted))
        case jobStart: SparkListenerJobStart =>
            foreachListener(_.onJobStart(jobStart))
        case jobEnd: SparkListenerJobEnd =>
            foreachListener(_.onJobEnd(jobEnd))
        case taskStart: SparkListenerTaskStart =>
            foreachListener(_.onTaskStart(taskStart))
        case taskGettingResult: SparkListenerTaskGettingResult =>
            foreachListener(_.onTaskGettingResult(taskGettingResult))
        case taskEnd: SparkListenerTaskEnd =>
            foreachListener(_.onTaskEnd(taskEnd))
        case environmentUpdate: SparkListenerEnvironmentUpdate =>
            foreachListener(_.onEnvironmentUpdate(environmentUpdate))
        case blockManagerAdded: SparkListenerBlockManagerAdded =>
            foreachListener(_.onBlockManagerAdded(blockManagerAdded))
        case blockManagerRemoved: SparkListenerBlockManagerRemoved =>
            foreachListener(_.onBlockManagerRemoved(blockManagerRemoved))
        case unpersistRDD: SparkListenerUnpersistRDD =>
            foreachListener(_.onUnpersistRDD(unpersistRDD))
        case applicationStart: SparkListenerApplicationStart =>
            foreachListener(_.onApplicationStart(applicationStart))
        case applicationEnd: SparkListenerApplicationEnd =>
            foreachListener(_.onApplicationEnd(applicationEnd))
        case metricsUpdate: SparkListenerExecutorMetricsUpdate =>
            foreachListener(_.onExecutorMetricsUpdate(metricsUpdate))
        case SparkListenerShutdown =>
    }
}

private def foreachListener(f: SparkListener => Unit): Unit = {
    sparkListeners.foreach { listener =>
        try {
            f(listener)
        } catch {
            case e: Exception =>
            logError(s"Listener ${Utils.getFormattedClassName(listener)} threw an exception", e)
       }
    }
}

3.4.2 构造JobProgressListener
我们以JobProgressListener为例来讲解SparkListener。JobProgressListener是SparkContext中一个重要的组成部分,通过监听listenerBus中的事件更新任务进度。SparkStatusTracker和SparkUI实际上也是通过JobProgressLi
stener来实现任务状态跟踪的。创建JobProgressListener的代码如下。

private[spark] val jobProgressListener = new JobProgressListener(conf)
listenerBus.addListener(jobProgressListener)
val statusTracker = new SparkStatusTracker(this)
JobProgressListener的作用是通过HashMap、ListBuffer等数据结构存储JobId及对应的JobUIData信息,并按照激活、完成、失败等job状态统计。对于StageId、StageInfo等信息按照激活、完成、忽略、失败等Stage状态统计,并且存储StageId与JobId的一对多关系。这些统计信息最终会被JobPage和StagePage等页面访问和渲染。JobProgressListener的数据结构见代码清单3-17。
代码清单3-17 JobProgressListener维护的信息
class JobProgressListener(conf: SparkConf) extends SparkListener with Logging {

    import JobProgressListener._

    type JobId = Int
    type StageId = Int
    type StageAttemptId = Int
    type PoolName = String
    type ExecutorId = String

    // Jobs:
    val activeJobs = new HashMap[JobId, JobUIData]
    val completedJobs = ListBuffer[JobUIData]()
    val failedJobs = ListBuffer[JobUIData]()
    val jobIdToData = new HashMap[JobId, JobUIData]

    // Stages:
    val activeStages = new HashMap[StageId, StageInfo]
    val completedStages = ListBuffer[StageInfo]()
    val skippedStages = ListBuffer[StageInfo]()
    val failedStages = ListBuffer[StageInfo]()
    val stageIdToData = new HashMap[(StageId, StageAttemptId), StageUIData]
    val stageIdToInfo = new HashMap[StageId, StageInfo]
    val stageIdToActiveJobIds = new HashMap[StageId, HashSet[JobId]]
    val poolToActiveStages = HashMap[PoolName, HashMap[StageId, StageInfo]]()
    var numCompletedStages = 0     // 总共完成的Stage数量
    var numFailedStages = 0     // 总共失败的Stage数量

    // Misc:
    val executorIdToBlockManagerId = HashMap[ExecutorId, BlockManagerId]()
    def blockManagerIds = executorIdToBlockManagerId.values.toSeq

    var schedulingMode: Option[SchedulingMode] = None

    // number of non-active jobs and stages (there is no limit for active jobs   and stages):
    val retainedStages = conf.getInt("spark.ui.retainedStages", DEFAULT_RETAINED_STAGES)
    val retainedJobs = conf.getInt("spark.ui.retainedJobs", DEFAULT_RETAINED_JOBS)

JobProgressListener 实现了onJobStart、onJobEnd、onStageCompleted、onStageSubmitted、onTaskStart、onTaskEnd等方法,这些方法正是在listenerBus的驱动下,改变JobProgress-Listener中的各种Job、Stage相关的数据。
3.4.3 SparkUI的创建与初始化
SparkUI的创建,见代码清单3-18。
代码清单3-18 SparkUI的声明

private[spark] val ui: Option[SparkUI] =
    if (conf.getBoolean("spark.ui.enabled", true)) {
        Some(SparkUI.createLiveUI(this, conf, listenerBus, jobProgressListener,
            env.securityManager,appName))
    } else {
        None
    }

ui.foreach(_.bind())
可以看到如果不需要提供SparkUI服务,可以将属性spark.ui.enabled修改为false。其中createLiveUI实际是调用了create方法,见代码清单3-19。
代码清单3-19 SparkUI的创建
def createLiveUI(
        sc: SparkContext,
        conf: SparkConf,
        listenerBus: SparkListenerBus,
        jobProgressListener: JobProgressListener,
        securityManager: SecurityManager,
        appName: String): SparkUI =  {
    create(Some(sc), conf, listenerBus, securityManager, appName,
        jobProgressListener = Some(jobProgressListener))
  }
create方法的实现参见代码清单3-20。
代码清单3-20 creat方法的实现
private def create(
        sc: Option[SparkContext],
        conf: SparkConf,
        listenerBus: SparkListenerBus,
        securityManager: SecurityManager,
        appName: String,
        basePath: String = "",
        jobProgressListener: Option[JobProgressListener] = None): SparkUI = {

    val _jobProgressListener: JobProgressListener = jobProgressListener.getOrElse {
        val listener = new JobProgressListener(conf)
        listenerBus.addListener(listener)
        listener
    }

    val environmentListener = new EnvironmentListener
    val storageStatusListener = new StorageStatusListener
    val executorsListener = new ExecutorsListener(storageStatusListener)
    val storageListener = new StorageListener(storageStatusListener)

    listenerBus.addListener(environmentListener)
    listenerBus.addListener(storageStatusListener)
    listenerBus.addListener(executorsListener)
    listenerBus.addListener(storageListener)

    new SparkUI(sc, conf, securityManager, environmentListener, storageStatusListener,
        executorsListener, _jobProgressListener, storageListener, appName, basePath)
}
根据代码清单3-20,可以知道在create方法里除了JobProgressListener是外部传入的之外,又增加了一些SparkListener。例如,用于对JVM参数、Spark属性、Java系统属性、classpath等进行监控的EnvironmentListener;用于维护Executor的存储状态的StorageStatusListener;用于准备将Executor的信息展示在ExecutorsTab的ExecutorsListener;用于准备将Executor相关存储信息展示在BlockManagerUI的StorageListener等。最后创建SparkUI,Spark UI服务默认是可以被杀掉的,通过修改属性spark.ui.killEnabled为false可以保证不被杀死。initialize方法会组织前端页面各个Tab和Page的展示及布局,参见代码清单3-21。
代码清单3-21 SparkUI的初始化
private[spark] class SparkUI private (
    val sc: Option[SparkContext],
    val conf: SparkConf,
    val securityManager: SecurityManager,
    val environmentListener: EnvironmentListener,
    val storageStatusListener: StorageStatusListener,
    val executorsListener: ExecutorsListener,
    val jobProgressListener: JobProgressListener,
    val storageListener: StorageListener,
    var appName: String,
    val basePath: String)
extends WebUI(securityManager, SparkUI.getUIPort(conf), conf, basePath, "SparkUI")
with Logging {

val killEnabled = sc.map(_.conf.getBoolean("spark.ui.killEnabled", true)).getOrElse(false)

/** Initialize all components of the server. */
def initialize() {
    attachTab(new JobsTab(this))
    val stagesTab = new StagesTab(this)
    attachTab(stagesTab)
    attachTab(new StorageTab(this))
    attachTab(new EnvironmentTab(this))
    attachTab(new ExecutorsTab(this))
    attachHandler(createStaticHandler(SparkUI.STATIC_RESOURCE_DIR, "/static"))
    attachHandler(createRedirectHandler("/", "/jobs", basePath = basePath))
    attachHandler(
        createRedirectHandler("/stages/stage/kill", "/stages", stagesTab.handleKillRequest))
}
initialize()

3.4.4 Spark UI的页面布局与展示
SparkUI究竟是如何实现页面布局及展示的?JobsTab展示所有Job的进度、状态信息,这里我们以它为例来说明。JobsTab会复用SparkUI的killEnabled、SparkContext、job-ProgressListener,包括AllJobsPage和JobPage两个页面,见代码清单3-22。
代码清单3-22 JobsTab的实现

private[ui] class JobsTab(parent: SparkUI) extends SparkUITab(parent, "jobs") {
    val sc = parent.sc
    val killEnabled = parent.killEnabled
    def isFairScheduler = listener.schedulingMode.exists(_ == SchedulingMode.FAIR)
    val listener = parent.jobProgressListener

    attachPage(new AllJobsPage(this))
    attachPage(new JobPage(this))
}

AllJobsPage由render方法渲染,利用jobProgressListener中的统计监控数据生成激活、完成、失败等状态的Job摘要信息,并调用jobsTable方法生成表格等html元素,最终使用UIUtils的headerSparkPage封装好css、js、header及页面布局等,见代码清单3-23。
代码清单3-23 AllJobsPage的实现

def render(request: HttpServletRequest): Seq[Node] = {
    listener.synchronized {
        val activeJobs = listener.activeJobs.values.toSeq
        val completedJobs = listener.completedJobs.reverse.toSeq
        val failedJobs = listener.failedJobs.reverse.toSeq
        val now = System.currentTimeMillis

        val activeJobsTable =
            jobsTable(activeJobs.sortBy(_.startTime.getOrElse(-1L)).reverse)
        val completedJobsTable =
            jobsTable(completedJobs.sortBy(_.endTime.getOrElse(-1L)).reverse)
        val failedJobsTable =
            jobsTable(failedJobs.sortBy(_.endTime.getOrElse(-1L)).reverse)

        val summary: NodeSeq =
            <div>
                <ul class="unstyled">
                    {if (startTime.isDefined) {
                        // Total duration is not meaningful unless the UI is live
                        <li>
                            <strong>Total Duration: </strong>
                            {UIUtils.formatDuration(now - startTime.get)}
                        </li>
                    }}
                    <li>
                        <strong>Scheduling Mode: </strong>
                        {listener.schedulingMode.map(_.toString).getOrElse("Unknown")}
                    </li>
                    <li>
                        <a href="#active"><strong>Active Jobs:</strong></a>
                        {activeJobs.size}
                    </li>
                    <li>
                        <a href="#completed"><strong>Completed Jobs:</strong></a>
                        {completedJobs.size}
                    </li>
                    <li>
                        <a href="#failed"><strong>Failed Jobs:</strong></a>
                        {failedJobs.size}
                    </li>
                </ul>
            </div>
jobsTable用来生成表格数据,见代码清单3-24。
代码清单3-24 jobsTable处理表格的实现
private def jobsTable(jobs: Seq[JobUIData]): Seq[Node] = {
    val someJobHasJobGroup = jobs.exists(_.jobGroup.isDefined)

    val columns: Seq[Node] = {
        <th>{if (someJobHasJobGroup) "Job Id (Job Group)" else "Job Id"}</th>
        <th>Description</th>
        <th>Submitted</th>
        <th>Duration</th>
        <th class="sorttable_nosort">Stages: Succeeded/Total</th>
        <th class="sorttable_nosort">Tasks (for all stages): Succeeded/Total</th>
    }

    <table class="table table-bordered table-striped table-condensed sortable">
        <thead>{columns}</thead>
        <tbody>
            {jobs.map(makeRow)}
        </tbody>
    </table>
}
表格中每行数据又是通过makeRow方法渲染的,参见代码清单3-25。
代码清单3-25 生成表格中的行
def makeRow(job: JobUIData): Seq[Node] = {
    val lastStageInfo = Option(job.stageIds)
        .filter(_.nonEmpty)
        .flatMap { ids => listener.stageIdToInfo.get(ids.max) }
    val lastStageData = lastStageInfo.flatMap { s =>
        listener.stageIdToData.get((s.stageId, s.attemptId))
    }
    val isComplete = job.status == JobExecutionStatus.SUCCEEDED
    val lastStageName = lastStageInfo.map(_.name).getOrElse("(Unknown Stage Name)")
    val lastStageDescription = lastStageData.flatMap(_.description).getOrElse("")
    val duration: Option[Long] = {
        job.startTime.map { start =>
            val end = job.endTime.getOrElse(System.currentTimeMillis())
        end - start
        }
    }
    val formattedDuration = duration.map(d => UIUtils.formatDuration(d)).getOrElse("Unknown")
    val formattedSubmissionTime = job.startTime.map(UIUtils.formatDate).getOrElse("Unknown")
    val detailUrl =
        "%s/jobs/job?id=%s".format(UIUtils.prependBaseUri(parent.basePath), job.jobId)
    <tr>
        <td sorttable_customkey={job.jobId.toString}>
            {job.jobId} {job.jobGroup.map(id => s"($id)").getOrElse("")}
        </td>
        <td>
            <div><em>{lastStageDescription}</em></div>
            <a href={detailUrl}>{lastStageName}</a>
        </td>
            <td sorttable_customkey={job.startTime.getOrElse(-1).toString}>
            {formattedSubmissionTime}
        </td>
        <td sorttable_customkey={duration.getOrElse(-1).toString}>{formatted-Duration}</td>
        <td class="stage-progress-cell">
            {job.completedStageIndices.size}/{job.stageIds.size - job.numSkipped-Stages}
            {if (job.numFailedStages > 0) s"(${job.numFailedStages} failed)"}
            {if (job.numSkippedStages > 0) s"(${job.numSkippedStages} skipped)"}
        </td>
        <td class="progress-cell">
            {UIUtils.makeProgressBar(started = job.numActiveTasks, completed = job.numCompletedTasks,
            failed = job.numFailedTasks, skipped = job.numSkippedTasks,
            total = job.numTasks - job.numSkippedTasks)}
        </td>
    </tr>
}
代码清单3-22中的attachPage方法存在于JobsTab的父类WebUITab中,WebUITab维护有ArrayBuffer[WebUIPage]的数据结构,AllJobsPage和JobPage将被放入此ArrayBuffer中,参见代码清单3-26。
代码清单3-26 WebUITab的实现
private[spark] abstract class WebUITab(parent: WebUI, val prefix: String) {
    val pages = ArrayBuffer[WebUIPage]()
    val name = prefix.capitalize

    /** Attach a page to this tab. This prepends the page's prefix with the tab's own prefix. */
    def attachPage(page: WebUIPage) {
        page.prefix = (prefix + "/" + page.prefix).stripSuffix("/")
        pages += page
    }

    /** Get a list of header tabs from the parent UI. */
    def headerTabs: Seq[WebUITab] = parent.getTabs

    def basePath: String = parent.getBasePath
}
JobsTab创建之后,将被attachTab方法加入SparkUI的ArrayBuffer[WebUITab]中,并且通过attachPage方法,给每一个page生成org.eclipse.jetty.servlet.ServletContextHandler,最后调用attachHandler方法将ServletContextHandler绑定到SparkUI,即加入到handlers :ArrayBuffer[ServletContextHandler]和样例类ServerInfo的rootHandler(ContextHandlerCollection)中。SparkUI继承自WebUI,attachTab方法在WebUI中实现,参见代码清单3-27。
代码清单3-27 WebUI的实现
private[spark] abstract class WebUI( securityManager: SecurityManager, port: Int,
        conf: SparkConf, basePath: String = "", name: String = "") extends Logging {

    protected val tabs = ArrayBuffer[WebUITab]()
    protected val handlers = ArrayBuffer[ServletContextHandler]()
    protected var serverInfo: Option[ServerInfo] = None
    protected val localHostName = Utils.localHostName()
    protected val publicHostName = Option(System.getenv("SPARK_PUBLIC_DNS")).getOrElse(localHostName)
    private val className = Utils.getFormattedClassName(this)

    def getBasePath: String = basePath
    def getTabs: Seq[WebUITab] = tabs.toSeq
    def getHandlers: Seq[ServletContextHandler] = handlers.toSeq
    def getSecurityManager: SecurityManager = securityManager

    /** Attach a tab to this UI, along with all of its attached pages. */
    def attachTab(tab: WebUITab) {
        tab.pages.foreach(attachPage)
        tabs += tab
    }

    /** Attach a page to this UI. */
    def attachPage(page: WebUIPage) {
        val pagePath = "/" + page.prefix
        attachHandler(createServletHandler(pagePath,
        (request: HttpServletRequest) => page.render(request), securityManager, basePath))
    attachHandler(createServletHandler(pagePath.stripSuffix("/") + "/json",
        (request: HttpServletRequest) => page.renderJson(request), security-Manager, basePath))
}

    /** Attach a handler to this UI. */
    def attachHandler(handler: ServletContextHandler) {
        handlers += handler
        serverInfo.foreach { info =>
            info.rootHandler.addHandler(handler)
            if (!handler.isStarted) {
                handler.start()
        }
    }
}

由于代码清单3-27所在的类中使用import org.apache.spark.ui.JettyUtils._导入了JettyUtils的静态方法,所以createServletHandler方法实际是JettyUtils 的静态方法createServletHandler。createServletHandler实际创建了javax.servlet.http.HttpServlet的匿名内部类实例,此实例实际使用(request: HttpServletRequest) => page.render(request)函数参数来处理请求,进而渲染页面呈现给用户。有关createServletHandler的实现及Jetty的相关信息,请参阅附录C。
3.4.5 SparkUI的启动
SparkUI创建好后,需要调用父类WebUI的bind方法,绑定服务和端口,bind方法中主要的代码实现如下。
serverInfo = Some(startJettyServer("0.0.0.0", port, handlers, conf, name))
JettyUtils的静态方法startJettyServer的实现请参阅附录C。最终启动了Jetty提供的服务,默认端口是4040。

相关文章
|
Apache 分布式计算 Spark
Apache Spark Delta Lake 事务日志实现源码分析
Apache Spark Delta Lake 事务日志实现源码分析 我们已经在这篇文章详细介绍了 Apache Spark Delta Lake 的事务日志是什么、主要用途以及如何工作的。那篇文章已经可以很好地给大家介绍 Delta Lake 的内部工作原理,原子性保证,本文为了学习的目的,带领大家从源码级别来看看 Delta Lake 事务日志的实现。
1966 0
|
分布式计算 Java Shell
Spark源码分析之Spark Shell(上)
终于开始看Spark源码了,先从最常用的spark-shell脚本开始吧。不要觉得一个启动脚本有什么东东,其实里面还是有很多知识点的。另外,从启动脚本入手,是寻找代码入口最简单的方法,很多开源框架,其实都可以通过这种方式来寻找源码入口。
902 0
如何使用X-Pack Spark的YarnUI、SparkUI、Spark日志、任务运行状况的分析
概述 X-Pack Spark目前是通过Yarn管理资源,在提交Spark 任务后我们经常需要知道任务的运行状况,例如在哪里看日志、怎么查看每个Executor的运行状态、每个task的运行状态,性能瓶颈点在哪里等信息。
3477 0
|
分布式计算 Spark
Spark2.4.0源码分析之WorldCount 事件循环处理器(三)
理解DAG事件循环处理器处理事件流程
1022 0
|
分布式计算 Spark Hadoop
Spark MapOutputTracker源码分析
## 技能标签 - Spark ShuffleMapTask处理完成后,把MapStatus数据(BlockManagerId,[compressSize])发送给MapOutputTrackerMaster.
1665 0
|
分布式计算 搜索推荐 Spark
Spark 源码分析之ShuffleMapTask内存数据Spill和合并
- Spark ShuffleMapTask 内存中的数据Spill到临时文件 - 临时文件中的数据是如何定入的,如何按partition升序排序,再按Key升序排序写入(key,value)数据 - 每个临时文件,都存入对应的每个分区有多少个(key,value)对,有多少次流提交数组,数组中...
1782 0
|
分布式计算 Scala Spark
Spark源码分析之ResultTask处理
ResultTask 执行当前分区的计算,首先从ShuffleMapTask拿到当前分区的数据,会从所有的ShuffleMapTask都拿一遍当前的分区数据,然后调用reduceByKey自定义的函数进行计算,最后合并所有的ResultTask输出结果,进行输出
2278 0
|
分布式计算 Shell Scala
Spark源码分析之ShuffleMapTask处理
Spark源码分析之ShuffleMapTask处理,在map端对数据的处理源码分析
1671 0
|
分布式计算 Apache Spark
Spark Master启动源码分析
Spark Master启动源码分析 更多资源 github: https://github.com/opensourceteams/spark-scala-maven csdn(汇总视频在线看): https://blog.
948 0
|
分布式计算 Spark
Spark Worker启动源码分析
Spark Worker启动源码分析 更多资源 github: https://github.com/opensourceteams/spark-scala-maven csdn(汇总视频在线看): https://blog.
1105 0