Spark 触发Job提交

简介: Spark 触发Job提交更多资源github: https://github.com/opensourceteams/spark-scala-mavencsdn(汇总视频在线看): https://blog.

Spark 触发Job提交

更多资源

youtube 视频说明

bilibili 视频说明

客户端源码

package com.opensource.bigdata.spark.standalone.base


import org.apache.spark.{SparkConf, SparkContext}

class BaseScalaSparkContext {

  var appName = "standalone"
  var master = "spark://standalone.com:7077" //本地模式:local     standalone:spark://master:7077


  def sparkContext(): SparkContext = {
    val conf = new SparkConf().setAppName(appName).setMaster(master)
    conf.set("spark.eventLog.enabled","true")
   // conf.set("spark.ui.port","10002")
    conf.set("spark.history.fs.logDirectory","hdfs://standalone.com:9000/spark/log/historyEventLog")
    conf.set("spark.eventLog.dir","hdfs://standalone.com:9000/spark/log/eventLog")
    //executor debug,是在提交作的地方读取
    //conf.set("spark.executor.extraJavaOptions","-Xdebug -Xrunjdwp:transport=dt_socket,server=y,suspend=y,address=10002")
    conf.setJars(Array("/opt/n_001_workspaces/bigdata/spark-scala-maven/target/spark-scala-maven-1.0-SNAPSHOT.jar"))

    val sc = new SparkContext(conf)

    //设置日志级别
    //sc.setLogLevel("ERROR")
    sc
  }
}

  • WorldCount.scala
package com.opensource.bigdata.spark.standalone.wordcount

import com.opensource.bigdata.spark.standalone.base.BaseScalaSparkContext

object WorldCount extends BaseScalaSparkContext{


  def main(args: Array[String]): Unit = {

    val startTime = System.currentTimeMillis()

    appName = "HelloWorld-standalone"
    //master="spark://10.211.55.2:7077"
    val sc = sparkContext

    println("SparkContext加载完成")


    val distFile:org.apache.spark.rdd.RDD[String] = sc.textFile("hdfs://standalone.com:9000/opt/data/a.txt")
    println(distFile)

   val result = distFile.flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_)
    println(s"结果:${result.collect().mkString}")

    val threadName = Thread.currentThread().getId + Thread.currentThread().getName

    println(s"${threadName}===================结果:执行了毫秒:${System.currentTimeMillis() - startTime}")


    sc.stop()

  }
}

源码分析

worldCount.scala

RDD之间的依赖关系

val distFile:org.apache.spark.rdd.RDD[String] = sc.textFile("hdfs://standalone.com:9000/opt/data/a.txt")
val result = distFile.flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_)
-----------------------------------------------------------------
val rdd0 =  HadoopRDD
val rdd1 =  distFile = sc.textFile 内部进行了一次map操作,hadoopRDD.map(pair => pair._2.toString)
val rdd2 =  distFile.flatMap(_.split(" "))
val rdd3 =  distFile.flatMap(_.split(" ")).map((_,1)
val rdd4 =  distFile.flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_)

-----------------------------------------------------------------
ShuffledRDD[4]     ->      ShuffleDependency        ->   rdd4
MapPartitionsRDD[3]     ->      OneToOneDependency(NarrowDependency)		->   rdd3
MapPartitionsRDD[2]     ->      OneToOneDependency(NarrowDependency)		->   rdd2
MapPartitionsRDD[1]     ->      OneToOneDependency(NarrowDependency)		->   rdd1
HadoopRDD[0]     ->      Nil		->   rdd0
-----------------------------------------------------------------


WorldCount中RDD之间的关系

SparkContext中runJob调用

  • RDD的collect方法,调用SparkContext的runJob方法
  /**
   * Return an array that contains all of the elements in this RDD.
   */
  def collect(): Array[T] = withScope {
    val results = sc.runJob(this, (iter: Iterator[T]) => iter.toArray)
    Array.concat(results: _*)
  }
  • SparkContext runJob方法调用
  /**
   * Run a job on all partitions in an RDD and return the results in an array.
   */
  def runJob[T, U: ClassTag](rdd: RDD[T], func: Iterator[T] => U): Array[U] = {
    runJob(rdd, func, 0 until rdd.partitions.length)
  }
  /**
   * Run a job on a given set of partitions of an RDD, but take a function of type
   * `Iterator[T] => U` instead of `(TaskContext, Iterator[T]) => U`.
   */
  def runJob[T, U: ClassTag](
      rdd: RDD[T],
      func: Iterator[T] => U,
      partitions: Seq[Int]): Array[U] = {
    val cleanedFunc = clean(func)
    runJob(rdd, (ctx: TaskContext, it: Iterator[T]) => cleanedFunc(it), partitions)
  }
  /**
   * Run a function on a given set of partitions in an RDD and return the results as an array.
   */
  def runJob[T, U: ClassTag](
      rdd: RDD[T],
      func: (TaskContext, Iterator[T]) => U,
      partitions: Seq[Int]): Array[U] = {
    val results = new Array[U](partitions.size)
    runJob[T, U](rdd, func, partitions, (index, res) => results(index) = res)
    results
  }
  /**
   * Run a function on a given set of partitions in an RDD and pass the results to the given
   * handler function. This is the main entry point for all actions in Spark.
   */
  def runJob[T, U: ClassTag](
      rdd: RDD[T],
      func: (TaskContext, Iterator[T]) => U,
      partitions: Seq[Int],
      resultHandler: (Int, U) => Unit): Unit = {
    if (stopped.get()) {
      throw new IllegalStateException("SparkContext has been shutdown")
    }
    val callSite = getCallSite
    val cleanedFunc = clean(func)
    logInfo("Starting job: " + callSite.shortForm)
    if (conf.getBoolean("spark.logLineage", false)) {
      logInfo("RDD's recursive dependencies:\n" + rdd.toDebugString)
    }
    dagScheduler.runJob(rdd, cleanedFunc, partitions, callSite, resultHandler, localProperties.get)
    progressBar.foreach(_.finishAll())
    rdd.doCheckpoint()
  }

DagScheduler方法调用

  • DagScheduler中runJob方法调用
 /**
   * Run an action job on the given RDD and pass all the results to the resultHandler function as
   * they arrive.
   *
   * @param rdd target RDD to run tasks on
   * @param func a function to run on each partition of the RDD
   * @param partitions set of partitions to run on; some jobs may not want to compute on all
   *   partitions of the target RDD, e.g. for operations like first()
   * @param callSite where in the user program this job was called
   * @param resultHandler callback to pass each result to
   * @param properties scheduler properties to attach to this job, e.g. fair scheduler pool name
   *
   * @throws Exception when the job fails
   */
  def runJob[T, U](
      rdd: RDD[T],
      func: (TaskContext, Iterator[T]) => U,
      partitions: Seq[Int],
      callSite: CallSite,
      resultHandler: (Int, U) => Unit,
      properties: Properties): Unit = {
    val start = System.nanoTime
    val waiter = submitJob(rdd, func, partitions, callSite, resultHandler, properties)
    waiter.awaitResult() match {
      case JobSucceeded =>
        logInfo("Job %d finished: %s, took %f s".format
          (waiter.jobId, callSite.shortForm, (System.nanoTime - start) / 1e9))
      case JobFailed(exception: Exception) =>
        logInfo("Job %d failed: %s, took %f s".format
          (waiter.jobId, callSite.shortForm, (System.nanoTime - start) / 1e9))
        // SPARK-8644: Include user stack trace in exceptions coming from DAGScheduler.
        val callerStackTrace = Thread.currentThread().getStackTrace.tail
        exception.setStackTrace(exception.getStackTrace ++ callerStackTrace)
        throw exception
    }
  }
  • DagScheduler submitJob 方法调用
 /**
   * Submit an action job to the scheduler.
   *
   * @param rdd target RDD to run tasks on
   * @param func a function to run on each partition of the RDD
   * @param partitions set of partitions to run on; some jobs may not want to compute on all
   *   partitions of the target RDD, e.g. for operations like first()
   * @param callSite where in the user program this job was called
   * @param resultHandler callback to pass each result to
   * @param properties scheduler properties to attach to this job, e.g. fair scheduler pool name
   *
   * @return a JobWaiter object that can be used to block until the job finishes executing
   *         or can be used to cancel the job.
   *
   * @throws IllegalArgumentException when partitions ids are illegal
   */
  def submitJob[T, U](
      rdd: RDD[T],
      func: (TaskContext, Iterator[T]) => U,
      partitions: Seq[Int],
      callSite: CallSite,
      resultHandler: (Int, U) => Unit,
      properties: Properties): JobWaiter[U] = {
    // Check to make sure we are not launching a task on a partition that does not exist.
    val maxPartitions = rdd.partitions.length
    partitions.find(p => p >= maxPartitions || p < 0).foreach { p =>
      throw new IllegalArgumentException(
        "Attempting to access a non-existent partition: " + p + ". " +
          "Total number of partitions: " + maxPartitions)
    }

    val jobId = nextJobId.getAndIncrement()
    if (partitions.size == 0) {
      // Return immediately if the job is running 0 tasks
      return new JobWaiter[U](this, jobId, 0, resultHandler)
    }

    assert(partitions.size > 0)
    val func2 = func.asInstanceOf[(TaskContext, Iterator[_]) => _]
    val waiter = new JobWaiter(this, jobId, partitions.size, resultHandler)
    eventProcessLoop.post(JobSubmitted(
      jobId, rdd, func2, partitions.toArray, callSite, waiter,
      SerializationUtils.clone(properties)))
    waiter
  }

DAGSchedulerEventProcessLoop 中runJob方法调用

  • DAGScheduler事件循环器中发送事件:JobSubmitted
    eventProcessLoop.post(JobSubmitted(
      jobId, rdd, func2, partitions.toArray, callSite, waiter,
      SerializationUtils.clone(properties)))
相关文章
|
SQL 分布式计算 Hadoop
通过Job Committer保证Mapreduce/Spark任务数据一致性
通过对象存储系统普遍提供的Multipart Upload功能,实现的No-Rename Committer在数据一致性和性能方面相对于FileOutputCommitter V1/V2版本均有较大提升,在使用MapRedcue和Spark写入数据到S3/Oss的场景中更加推荐使用。
通过Job Committer保证Mapreduce/Spark任务数据一致性
|
SQL 分布式计算 资源调度
Spark on Yarn Job的执行流程简介
2017-12-19-Hadoop2.0架构及HA集群配置(1) 2017-12-24-Hadoop2.0架构及HA集群配置(2) 2017-12-25-Spark集群搭建 2017-12-29-Hadoop和Spark的异同 2017-12-28-Spark-HelloWorld(Spark开发环境搭建)
|
分布式计算 Java Spark
Optimizing Spark job parameters
Optimizing Spark job parameters
261 0
|
分布式计算 大数据 调度
Spark 原理_运行过程_Job 和 Stage 的关系 | 学习笔记
快速学习 Spark 原理_运行过程_Job 和 Stage 的关系
163 0
Spark 原理_运行过程_Job 和 Stage 的关系 | 学习笔记
|
分布式计算 Hadoop 分布式数据库
通过Job Committer保证Mapreduce/Spark任务数据一致性
并发地向目标存储系统写数据是分布式任务的一个天然特性,通过在节点/进程/线程等级别的并发写数据,充分利用集群的磁盘和网络带宽,实现高容量吞吐。并发写数据的一个主要需要解决的问题就是如何保证数据一致性的问题,本文主要介绍MapReduce/Spark如何通过Job Committer机制解决写数据一致性的问题,以及在OSS等对象存储上的解决方案。
472 0
|
分布式计算 Spark
Spark Job的提交与task本地化分析(源码阅读八)
  我们又都知道,Spark中任务的处理也要考虑数据的本地性(locality),Spark目前支持PROCESS_LOCAL(本地进程)、NODE_LOCAL(本地节点)、NODE_PREF、RACK_LOCAL(本地机架)、ANY(任何)几种。
998 0
|
分布式计算 调度 Spark
Spark源码分析之一:Job提交运行总流程概述
        Spark是一个基于内存的分布式计算框架,运行在其上的应用程序,按照Action被划分为一个个Job,而Job提交运行的总流程,大致分为两个阶段:         1、Stage划分与提交         (1)Job按照RDD之间的依赖关系是否为宽依赖,由DAGSche...
1224 0
|
17天前
|
分布式计算 大数据 Apache
ClickHouse与大数据生态集成:Spark & Flink 实战
【10月更文挑战第26天】在当今这个数据爆炸的时代,能够高效地处理和分析海量数据成为了企业和组织提升竞争力的关键。作为一款高性能的列式数据库系统,ClickHouse 在大数据分析领域展现出了卓越的能力。然而,为了充分利用ClickHouse的优势,将其与现有的大数据处理框架(如Apache Spark和Apache Flink)进行集成变得尤为重要。本文将从我个人的角度出发,探讨如何通过这些技术的结合,实现对大规模数据的实时处理和分析。
52 2
ClickHouse与大数据生态集成:Spark & Flink 实战