每天进步一点点~开搞~
abstract class RDD[T: ClassTag]( //@transient 注解表示将字段标记为瞬态的 @transient private var _sc: SparkContext, // Seq是序列,元素有插入的先后顺序,可以有重复的元素。 @transient private var deps: Seq[Dependency[_]] ) extends Serializable with Logging { if (classOf[RDD[_]].isAssignableFrom(elementClassTag.runtimeClass)) { user programs that } //这里应该是声明sparkContext对象后才能使用RDD的调用 private def sc: SparkContext = { if (_sc == null) { throw new SparkException( "RDD transformations and actions can only be invoked by the driver, not inside of other " + "transformations; for example, rdd1.map(x => rdd2.values.count() * x) is invalid because " + "the values transformation and count action cannot be performed inside of the rdd1.map " + "transformation. For more information, see SPARK-5063.") } _sc } //构建一个RDD应该是一对一的关系,比如子RDD对应唯一的父RDD def this(@transient oneParent: RDD[_]) = this(oneParent.context , List(new OneToOneDependency(oneParent))) private[spark] def conf: SparkConf = _conf //sparkconf的设置 def getConf: SparkConf = conf.clone() //获取相应的配置信息 def jars: Seq[String] = _jars def files: Seq[String] = _files def master: String = _conf.get("spark.master") def appName: String = _conf.get("spark.app.name") private[spark] def isEventLogEnabled: Boolean = _conf.getBoolean("spark.eventLog.enabled", false) private[spark] def eventLogDir: Option[URI] = _eventLogDir private[spark] def eventLogCodec: Option[String] = _eventLogCodec //临时文件夹的名称为spark+随机时间戳 val externalBlockStoreFolderName = "spark-" + randomUUID.toString() //判断是否为local模式 def isLocal: Boolean = (master == "local" || master.startsWith("local["))
//用于触发事件的监听 private[spark] val listenerBus = new LiveListenerBus // 该方法可用于测试用 private[spark] def createSparkEnv( conf: SparkConf, isLocal: Boolean, listenerBus: LiveListenerBus): SparkEnv = { SparkEnv.createDriverEnv(conf, isLocal, listenerBus, SparkContext.numDriverCores(master)) } //加载env配置文件 private[spark] def env: SparkEnv = _env private[spark] val addedFiles = HashMap[String, Long]() private[spark] val addedJars = HashMap[String, Long]() //监听所有调用persist的RDD private[spark] val persistentRdds = new TimeStampedWeakValueHashMap[Int, RDD[_]] //重用配置hadoop Configuration def hadoopConfiguration: Configuration = _hadoopConfiguration //用于设置executorMemory的内存数量 private[spark] def executorMemory: Int = _executorMemory // 将环境参数传递给exeuctor private[spark] val executorEnvs = HashMap[String, String]() // 设置正在使用SparkContext的用户 val sparkUser = Utils.getCurrentUserName() //设置提交的appliaction的唯一标识。就是当提交给yarn或local模式时,申请资源的applaction名称 def applicationId: String = _applicationId def applicationAttemptId: Option[String] = _applicationAttemptId def metricsSystem: MetricsSystem = if (_env != null) _env.metricsSystem else null private[spark] def eventLogger: Option[EventLoggingListener] = _eventLogger private[spark] def executorAllocationManager: Option[ExecutorAllocationManager] = _executorAllocationManager private[spark] def cleaner: Option[ContextCleaner] = _cleaner private[spark] var checkpointDir: Option[String] = None // 用户可以使用本地变量来传递消息 protected[spark] val localProperties = new InheritableThreadLocal[Properties] { override protected def childValue(parent: Properties): Properties = { //clone一下,防止父变量改变从而影响子变量 semantics (SPARK-10563). if (conf.get("spark.localProperties.clone", "false").toBoolean) { SerializationUtils.clone(parent).asInstanceOf[Properties] } else { new Properties(parent) } } override protected def initialValue(): Properties = new Properties() } private def warnSparkMem(value: String): String = { logWarning("Using SPARK_MEM to set amount of memory to use per executor process is " + "deprecated, please use spark.executor.memory instead.") value } //设置log级别,包括ALL, DEBUG, ERROR, FATAL, INFO, OFF, TRACE, WARN def setLogLevel(logLevel: String) { val validLevels = Seq("ALL", "DEBUG", "ERROR", "FATAL", "INFO", "OFF", "TRACE", "WARN") if (!validLevels.contains(logLevel)) { throw new IllegalArgumentException( s"Supplied level $logLevel did not match one of: ${validLevels.mkString(",")}") } Utils.setLogLevel(org.apache.log4j.Level.toLevel(logLevel)) } //不同模式的配置参数 if (!_conf.contains("spark.master")) { throw new SparkException("A master URL must be set in your configuration") } if (!_conf.contains("spark.app.name")) { throw new SparkException("An application name must be set in your configuration") } // System property spark.yarn.app.id must be set if user code ran by AM on a YARN cluster // yarn-standalone is deprecated, but still supported if ((master == "yarn-cluster" || master == "yarn-standalone") && !_conf.contains("spark.yarn.app.id")) { throw new SparkException("Detected yarn-cluster mode, but isn't running on a cluster. " + "Deployment to YARN is not supported directly by SparkContext. Please use spark-submit.") } _conf.setIfMissing("spark.driver.host", Utils.localHostName()) _conf.setIfMissing("spark.driver.port", "0") _conf.set("spark.executor.id", SparkContext.DRIVER_IDENTIFIER) _jars = _conf.getOption("spark.jars").map(_.split(",")).map(_.filter(_.size != 0)).toSeq.flatten _files = _conf.getOption("spark.files").map(_.split(",")).map(_.filter(_.size != 0)) .toSeq.flatten _eventLogDir = if (isEventLogEnabled) { val unresolvedDir = conf.get("spark.eventLog.dir", EventLoggingListener.DEFAULT_LOG_DIR) .stripSuffix("/") Some(Utils.resolveURI(unresolvedDir)) } else { None } _eventLogCodec = { val compress = _conf.getBoolean("spark.eventLog.compress", false) if (compress && isEventLogEnabled) { Some(CompressionCodec.getCodecName(_conf)).map(CompressionCodec.getShortName) } else { None } } //jobProgressListener应该在创建sparkEnv之前,因为当创建sparkEnv时,一些信息将会被发送到jobProgressListener,否则就会丢失啦。 _jobProgressListener = new JobProgressListener(_conf) listenerBus.addListener(jobProgressListener) _env = createSparkEnv(_conf, isLocal, listenerBus) SparkEnv.set(_env) _metadataCleaner = new MetadataCleaner(MetadataCleanerType.SPARK_CONTEXT, this.cleanup, _conf) _statusTracker = new SparkStatusTracker(this) _progressBar = if (_conf.getBoolean("spark.ui.showConsoleProgress", true) && !log.isInfoEnabled) { Some(new ConsoleProgressBar(this)) } else { None } _ui = if (conf.getBoolean("spark.ui.enabled", true)) { Some(SparkUI.createLiveUI(this, _conf, listenerBus, _jobProgressListener, _env.securityManager, appName, startTime = startTime)) } else { None } if (jars != null) { jars.foreach(addJar) } if (files != null) { files.foreach(addFile) } //获取启动app设置的参数变量,如果没有则获取配置文件中的 _executorMemory = _conf.getOption("spark.executor.memory") .orElse(Option(System.getenv("SPARK_EXECUTOR_MEMORY"))) .orElse(Option(System.getenv("SPARK_MEM")) .map(warnSparkMem)) .map(Utils.memoryStringToMb) .getOrElse(1024) //500这里在创建HeartbeatReceiver 之前先创建createTaskScheduler,因为每个Executor在构造函数中检索HeartbeatReceiver _heartbeatReceiver = env.rpcEnv.setupEndpoint( HeartbeatReceiver.ENDPOINT_NAME, new HeartbeatReceiver(this))