SparkContext源码阅读

简介: SparkContext是spark的入口,通过它来连接集群、创建RDD、广播变量等等。 class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationClient { private val creationSite: CallSite = Utils.

SparkContext是spark的入口,通过它来连接集群、创建RDD、广播变量等等。

class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationClient {

 private val creationSite: CallSite = Utils.getCallSite()

//如果生命了2个sparkContext,则会使用warn来取代exception.防止退出
 private val allowMultipleContexts: Boolean =
    config.getBoolean("spark.driver.allowMultipleContexts", false)

..防止两个sparkcontext同时运行
  SparkContext.markPartiallyConstructed(this, allowMultipleContexts)

  private[spark] var preferredNodeLocationData: Map[String, Set[SplitInfo]] = Map()

  val startTime = System.currentTimeMillis()

//当提交任务执行spark-submit时,加载系统环境变量
  def this() = this(new SparkConf())


  def this(master: String, appName: String, conf: SparkConf) =
    this(SparkContext.updatedConf(conf, master, appName))

//preferredNodeLocationData 用于启动查找nodes,启动相应的container
  def this(
      master: String,
      appName: String,
      sparkHome: String = null,
      jars: Seq[String] = Nil,
      environment: Map[String, String] = Map(),
      preferredNodeLocationData: Map[String, Set[SplitInfo]] = Map()) =
  {
    this(SparkContext.updatedConf(new SparkConf(), master, appName, sparkHome, jars, environment))
    if (preferredNodeLocationData.nonEmpty) {
      logWarning("Passing in preferred locations has no effect at all, see SPARK-8949")
    }
    this.preferredNodeLocationData = preferredNodeLocationData

//构造函数
  private[spark] def this(master: String, appName: String) =
    this(master, appName, null, Nil, Map(), Map())


  private[spark] def this(master: String, appName: String, sparkHome: String) =
    this(master, appName, sparkHome, Nil, Map(), Map())


  private[spark] def this(master: String, appName: String, sparkHome: String, jars: Seq[String]) =
    this(master, appName, sparkHome, jars, Map(), Map())



  private[spark] def conf: SparkConf = _conf

//clone Conf,那么在运行时就不能被修改
def getConf: SparkConf = conf.clone()


def jars: Seq[String] = _jars
  def files: Seq[String] = _files
  def master: String = _conf.get("spark.master")
  def appName: String = _conf.get("spark.app.name")

  private[spark] def isEventLogEnabled: Boolean = _conf.getBoolean("spark.eventLog.enabled", false)
  private[spark] def eventLogDir: Option[URI] = _eventLogDir
  private[spark] def eventLogCodec: Option[String] = _eventLogCodec


//创建schedular
val (sched, ts) = SparkContext.createTaskScheduler(this, master)
    _schedulerBackend = sched
    _taskScheduler = ts
    _dagScheduler = new DAGScheduler(this)
    _heartbeatReceiver.ask[Boolean](TaskSchedulerIsSet)

//启动taskschedular
  _taskScheduler.start()

applicationId = _taskScheduler.applicationId()
    _applicationAttemptId = taskScheduler.applicationAttemptId()
    _conf.set("spark.app.id", _applicationId)
    _env.blockManager.initialize(_applicationId)


//创建一个新的RDD,通过step来增加元素
  def range(
      start: Long,
      end: Long,
      step: Long = 1,
      numSlices: Int = defaultParallelism): RDD[Long] = withScope {
    assertNotStopped()
    // when step is 0, range will run infinitely
    require(step != 0, "step cannot be 0")
    val numElements: BigInt = {
      val safeStart = BigInt(start)
      val safeEnd = BigInt(end)
      if ((safeEnd - safeStart) % step == 0 || safeEnd > safeStart ^ step > 0) {
        (safeEnd - safeStart) / step
      } else {
        (safeEnd - safeStart) / step + 1
      }
    }

    parallelize(0 until numSlices, numSlices).mapPartitionsWithIndex((i, _) => {
      val partitionStart = (i * numElements) / numSlices * step + start
      val partitionEnd = (((i + 1) * numElements) / numSlices) * step + start
      def getSafeMargin(bi: BigInt): Long =
        if (bi.isValidLong) {
          bi.toLong
        } else if (bi > 0) {
          Long.MaxValue
        } else {
          Long.MinValue
        }
      val safePartitionStart = getSafeMargin(partitionStart)
      val safePartitionEnd = getSafeMargin(partitionEnd)

      new Iterator[Long] {
        private[this] var number: Long = safePartitionStart
        private[this] var overflow: Boolean = false

        override def hasNext =
          if (!overflow) {
            if (step > 0) {
              number < safePartitionEnd
            } else {
              number > safePartitionEnd
            }
          } else false

        override def next() = {
          val ret = number
          number += step
          if (number < ret ^ step < 0) {
            overflow = true
          }
          ret
        }
      }
    })
  }

//创建一个RDD
def makeRDD[T: ClassTag](
      seq: Seq[T],
      numSlices: Int = defaultParallelism): RDD[T] = withScope {
    parallelize(seq, numSlices)
  }


//读取本地、HDFS的文件,返回一个String的字符串
  def textFile(
      path: String,
      minPartitions: Int = defaultMinPartitions): RDD[String] = withScope {
    assertNotStopped()
    hadoopFile(path, classOf[TextInputFormat], classOf[LongWritable], classOf[Text],
      minPartitions).map(pair => pair._2.toString)
  }



//加载一个二进制文件,
  @Experimental
  def binaryRecords(
      path: String,
      recordLength: Int,
      conf: Configuration = hadoopConfiguration): RDD[Array[Byte]] = withScope {
    assertNotStopped()
    conf.setInt(FixedLengthBinaryInputFormat.RECORD_LENGTH_PROPERTY, recordLength)
    val br = newAPIHadoopFile[LongWritable, BytesWritable, FixedLengthBinaryInputFormat](path,
      classOf[FixedLengthBinaryInputFormat],
      classOf[LongWritable],
      classOf[BytesWritable],
      conf = conf)
    val data = br.map { case (k, v) =>
      val bytes = v.getBytes
      assert(bytes.length == recordLength, "Byte array does not have correct length")
      bytes
    }
    data
  }


//获得一个为HADOOP sequenceFile给定键值对类型的RDD
  def sequenceFile[K, V](path: String,
      keyClass: Class[K],
      valueClass: Class[V],
      minPartitions: Int
      ): RDD[(K, V)] = withScope {
    assertNotStopped()
    val inputFormatClass = classOf[SequenceFileInputFormat[K, V]]
    hadoopFile(path, inputFormatClass, keyClass, valueClass, minPartitions)
  }

//1300发送一个广播变量到集群的每个节点
  def broadcast[T: ClassTag](value: T): Broadcast[T] = {
    assertNotStopped()
    if (classOf[RDD[_]].isAssignableFrom(classTag[T].runtimeClass)) {
      logWarning("Can not directly broadcast RDDs; instead, call collect() and "
        + "broadcast the result (see SPARK-5063)")
    }
    val bc = env.broadcastManager.newBroadcast[T](value, isLocal)
    val callSite = getCallSite
    logInfo("Created broadcast " + bc.id + " from " + callSite.shortForm)
    cleaner.foreach(_.registerBroadcastForCleanup(bc))
    bc
  }

 

目录
相关文章
|
SQL 分布式计算 调度
Spark入门(一篇就够了)(三)
Spark入门(一篇就够了)(三)
846 0
|
分布式数据库 Hbase
hbase源码系列(六)HMaster启动过程
这一章是server端开始的第一章,有兴趣的朋友先去看一下hbase的架构图,我专门从网上弄下来的。
2214 0
|
存储 缓存 分布式计算
Spark入门(一篇就够了)(一)
Spark入门(一篇就够了)(一)
934 0
|
消息中间件 Cloud Native Java
带你读《2022龙蜥社区全景白皮书》——5.4.2 Alibaba Dragonwell
带你读《2022龙蜥社区全景白皮书》——5.4.2 Alibaba Dragonwell
500 91
|
存储 Java 计算机视觉
|
Java 编译器 测试技术
安谋科技(Arm China)刘庆川:借助Arm SIMD指令提升Java应用性能
2023年9月22日,系列课程收官的最后一节《借助Arm SIMD指令提升Java应用性能》正式上线,由安谋科技(Arm China)高级工程师刘庆川主讲,内容涵盖:SIMD 指令及 Java VM介绍、如何在 Java 应用中使用 SIMD 指令、Java Vector API在 倚天上的案例分析。本期节目在阿里云官网、阿里云微信视频号、阿里云钉钉视频号、InfoQ 官网、阿里云开发者微信视频号、阿里云创新中心直播平台 & 微信视频号同步播出,同时可以点击【https://developer.aliyun.com/topic/ecs-yitian】进入【倚天实例迁移课程官网】了解更多内容。
安谋科技(Arm China)刘庆川:借助Arm SIMD指令提升Java应用性能
|
SQL JSON 分布式计算
Spark入门(一篇就够了)(二)
Spark入门(一篇就够了)(二)
363 0
|
资源调度 分布式计算 Java
YARN and MapReduce的【内存】优化配置详解
在Hadoop2.x中, YARN负责管理MapReduce中的资源(内存, CPU等)并且将其打包成Container。 使之专注于其擅长的数据处理任务, 将无需考虑资源调度.
1672 0
|
消息中间件 Kafka Apache
|
Java 编译器
An Introduction to JWarmup
一、JWarmup背景 二、JWarmup功能 三、案例演示
An Introduction to JWarmup