附录A Spark2.1核心工具类Utils

简介: 版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/beliefer/article/details/77449771 注:本文是为了配合《Spark内核设计的艺术 架构设计与实现》一书的内容而编写,目的是为了节省成本、方便读者查阅。
版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/beliefer/article/details/77449771

注:本文是为了配合《Spark内核设计的艺术 架构设计与实现》一书的内容而编写,目的是为了节省成本、方便读者查阅。书中附录A的内容都在本文呈现。

Utils是Spark最常用的工具类之一,Spark Core大量使用了此类提供的基础功能。即使不关心其实现也不会对理解本书对Spark源码的分析有太多影响。下面将逐个介绍Utils提供的方法。

getSystemProperties

功能描述:获取系统属性的键值对。

  def getSystemProperties: Map[String, String] = {
    System.getProperties.stringPropertyNames().asScala
      .map(key => (key, System.getProperty(key))).toMap
  }

localHostName

功能描述:获取本地机器名。

  def localHostName(): String = {
    customHostname.getOrElse(localIpAddress.getHostAddress)
  }

getDefaultPropertiesFile

功能描述:获取默认的Spark属性文件。

  def getDefaultPropertiesFile(env: Map[String, String] = sys.env): String = {
    env.get("SPARK_CONF_DIR")
      .orElse(env.get("SPARK_HOME").map { t => s"$t${File.separator}conf" })
      .map { t => new File(s"$t${File.separator}spark-defaults.conf")}
      .filter(_.isFile)
      .map(_.getAbsolutePath)
      .orNull
  }

getPropertiesFromFile

功能描述:从文件中获取属性。

  def getPropertiesFromFile(filename: String): Map[String, String] = {
    val file = new File(filename)
    require(file.exists(), s"Properties file $file does not exist")
    require(file.isFile(), s"Properties file $file is not a normal file")

    val inReader = new InputStreamReader(new FileInputStream(file), StandardCharsets.UTF_8)
    try {
      val properties = new Properties()
      properties.load(inReader)
      properties.stringPropertyNames().asScala.map(
        k => (k, properties.getProperty(k).trim)).toMap
    } catch {
      case e: IOException =>
        throw new SparkException(s"Failed when loading Spark properties from $filename", e)
    } finally {
      inReader.close()
    }
  }

loadDefaultSparkProperties

功能描述:加载指定文件中的Spark属性,如果没有指定文件,则加载默认Spark属性文件的属性。

  def loadDefaultSparkProperties(conf: SparkConf, filePath: String = null): String = {
    val path = Option(filePath).getOrElse(getDefaultPropertiesFile())
    Option(path).foreach { confFile =>
      getPropertiesFromFile(confFile).filter { case (k, v) =>
        k.startsWith("spark.")
      }.foreach { case (k, v) =>
        conf.setIfMissing(k, v)
        sys.props.getOrElseUpdate(k, v)
      }
    }
    path
  }

getCallSite

功能描述:获取当前SparkContext的当前调用堆栈,将栈里最靠近栈底的属于Spark或者Scala核心的类压入callStack的栈顶,并将此类的方法存入lastSparkMethod;将栈里最靠近栈顶的用户类放入callStack,将此类的行号存入firstUserLine,类名存入firstUserFile,最终返回的样例类CallSite存储了最短栈和长度默认为20的最长栈的样例类。在JavaWordCount例子中,获得的数据如下:

最短栈:getOrCreate atJavaWordCount.java:48;

最长栈:org.apache.spark.sql.SparkSession$Builder.getOrCreate(SparkSession.scala:860)

org.apache.spark.examples.JavaWordCount.main(JavaWordCount.java:48)

  def getCallSite(skipClass: String => Boolean = sparkInternalExclusionFunction): CallSite = {
    var lastSparkMethod = "<unknown>"
    var firstUserFile = "<unknown>"
    var firstUserLine = 0
    var insideSpark = true
    var callStack = new ArrayBuffer[String]() :+ "<unknown>"

    Thread.currentThread.getStackTrace().foreach { ste: StackTraceElement =>
      if (ste != null && ste.getMethodName != null
        && !ste.getMethodName.contains("getStackTrace")) {
        if (insideSpark) {
          if (skipClass(ste.getClassName)) {
            lastSparkMethod = if (ste.getMethodName == "<init>") {
              // Spark method is a constructor; get its class name
              ste.getClassName.substring(ste.getClassName.lastIndexOf('.') + 1)
            } else {
              ste.getMethodName
            }
            callStack(0) = ste.toString // Put last Spark method on top of the stack trace.
          } else {
            if (ste.getFileName != null) {
              firstUserFile = ste.getFileName
              if (ste.getLineNumber >= 0) {
                firstUserLine = ste.getLineNumber
              }
            }
            callStack += ste.toString
            insideSpark = false
          }
        } else {
          callStack += ste.toString
        }
      }
    }

    val callStackDepth = System.getProperty("spark.callstack.depth", "20").toInt
    val shortForm =
      if (firstUserFile == "HiveSessionImpl.java") {
        // To be more user friendly, show a nicer string for queries submitted from the JDBC
        // server.
        "Spark JDBC Server Query"
      } else {
        s"$lastSparkMethod at $firstUserFile:$firstUserLine"
      }
    val longForm = callStack.take(callStackDepth).mkString("\n")

    CallSite(shortForm, longForm)
  }

tryOrStopSparkContext

功能描述:用于在执行目标方法抛出异常后新启一个用于异步停止SparkContext的线程。

  def tryOrStopSparkContext(sc: SparkContext)(block: => Unit) {
    try {
      block
    } catch {
      case e: ControlThrowable => throw e
      case t: Throwable =>
        val currentThreadName = Thread.currentThread().getName
        if (sc != null) {
          logError(s"uncaught error in thread $currentThreadName, stopping SparkContext", t)
          sc.stopInNewThread()
        }
        if (!NonFatal(t)) {
          logError(s"throw uncaught fatal error in thread $currentThreadName", t)
          throw t
        }
    }
  }

getCurrentUserName

功能描述:用于获取当前用户的用户名。此用户名默认为当前系统的登录用户,也可以通过系统环境变量SPARK_USER进行设置。

  def getCurrentUserName(): String = {
    Option(System.getenv("SPARK_USER"))
      .getOrElse(UserGroupInformation.getCurrentUser().getShortUserName())
  }

getUserJars

功能描述:获取用户设置的jar文件。当用户选择的部署模式是yarn时,_jars是由spark.jars属性指定的jar文件和spark.yarn.dist.jars属性指定的jar文件的并集。其它模式下只采用由spark.jars属性指定的jar文件。

  def getUserJars(conf: SparkConf, isShell: Boolean = false): Seq[String] = {
    val sparkJars = conf.getOption("spark.jars")
    if (conf.get("spark.master") == "yarn" && isShell) {
      val yarnJars = conf.getOption("spark.yarn.dist.jars")
      unionFileLists(sparkJars, yarnJars).toSeq
    } else {
      sparkJars.map(_.split(",")).map(_.filter(_.nonEmpty)).toSeq.flatten
    }
  }

startServiceOnPort

功能描述:Scala跟其它脚本语言一样,函数也可以传递,此方法正是通过回调startService这个函数来启动服务,并最终返回startService返回的service地址及端口。如果启动过程有异常,还会多次重试,直到达到maxRetries表示的最大次数。

def startServiceOnPort[T](
      startPort: Int,
      startService: Int => (T, Int),
      conf: SparkConf,
      serviceName: String = ""): (T, Int) = {
    require(startPort == 0 || (1024 <= startPort && startPort < 65536),
      "startPort should be between 1024 and 65535 (inclusive), or 0 for a random free port.")
    val serviceString = if (serviceName.isEmpty) "" else s" '$serviceName'"
    val maxRetries = portMaxRetries(conf)
    for (offset <- 0 to maxRetries) {
      val tryPort = if (startPort == 0) {
        startPort
      } else {
        ((startPort + offset - 1024) % (65536 - 1024)) + 1024
      }
      try {
        val (service, port) = startService(tryPort)
        logInfo(s"Successfully started service$serviceString on port $port.")
        return (service, port)
      } catch {
        case e: Exception if isBindCollision(e) =>
          if (offset >= maxRetries) {
            val exceptionMessage =
              s"${e.getMessage}: Service$serviceString failed after $maxRetries retries!"
            val exception = new BindException(exceptionMessage)
            exception.setStackTrace(e.getStackTrace)
            throw exception
          }
          logWarning(s"Service$serviceString could not bind on port $tryPort. " +
            s"Attempting port ${tryPort + 1}.")
      }
    }
    throw new SparkException(s"Failed to start service$serviceString on port $startPort")
  }

createDirectory

功能描述:用spark+UUID的方式创建临时文件目录,如果创建失败会多次重试,最多重试10次。
  def createDirectory(root: String, namePrefix: String = "spark"): File = {
    var attempts = 0
    val maxAttempts = MAX_DIR_CREATION_ATTEMPTS
    var dir: File = null
    while (dir == null) {
      attempts += 1
      if (attempts > maxAttempts) {
        throw new IOException("Failed to create a temp directory (under " + root + ") after " +
          maxAttempts + " attempts!")
      }
      try {
        dir = new File(root, namePrefix + "-" + UUID.randomUUID.toString)
        if (dir.exists() || !dir.mkdirs()) {
          dir = null
        }
      } catch { case e: SecurityException => dir = null; }
    }

    dir.getCanonicalFile
  }

isRunningInYarnContainer

功能描述:判断是否运行在Yarn的Container中。实际是根据系统环境变量是否包含CONTAINER_ID为依据。CONTAINER_ID是由Yarn所设置的。

  private[spark] def isRunningInYarnContainer(conf: SparkConf): Boolean = {
    conf.getenv("CONTAINER_ID") != null
  }

getYarnLocalDirs

功能描述:获取Yarn所批准的本地目录。Yarn会向系统环境变量中设置LOCAL_DIRS来指定所批准的本地目录。
  private def getYarnLocalDirs(conf: SparkConf): String = {
    val localDirs = Option(conf.getenv("LOCAL_DIRS")).getOrElse("")

    if (localDirs.isEmpty) {
      throw new Exception("Yarn Local dirs can't be empty")
    }
    localDirs
  }

getConfiguredLocalDirs

功能描述:获取所配置的本地目录。如果当前Executor或者Driver运行在Yarn的Container中,则获取Yarn所批准的本地目录。否则如果系统环境变量包含SPARK_EXECUTOR_DIRS,那就获取SPARK_EXECUTOR_DIRS所指定的目录。否则如果系统环境变量包含SPARK_LOCAL_DIRS那就获取SPARK_LOCAL_DIRS所指定的目录。否则如果系统环境变量包含MESOS_DIRECTORY,并且当前不允许shuffle(由于Mesos中的任务运行结束后,会自动清除所有的临时文件,因此任务结束后的中间输出将不复存在,使shuffle无法正常工作),那么就获取MESOS_DIRECTORY指定的目录。否则将获取spark.local.dir属性或者系统属性java.io.tmpdir指定的目录。
  def getConfiguredLocalDirs(conf: SparkConf): Array[String] = {
    val shuffleServiceEnabled = conf.getBoolean("spark.shuffle.service.enabled", false)
    if (isRunningInYarnContainer(conf)) {
      getYarnLocalDirs(conf).split(",")
    } else if (conf.getenv("SPARK_EXECUTOR_DIRS") != null) {
      conf.getenv("SPARK_EXECUTOR_DIRS").split(File.pathSeparator)
    } else if (conf.getenv("SPARK_LOCAL_DIRS") != null) {
      conf.getenv("SPARK_LOCAL_DIRS").split(",")
    } else if (conf.getenv("MESOS_DIRECTORY") != null && !shuffleServiceEnabled) {
      Array(conf.getenv("MESOS_DIRECTORY"))
    } else {
      if (conf.getenv("MESOS_DIRECTORY") != null && shuffleServiceEnabled) {
        logInfo("MESOS_DIRECTORY available but not using provided Mesos sandbox because " +
          "spark.shuffle.service.enabled is enabled.")
      }
      conf.get("spark.local.dir", System.getProperty("java.io.tmpdir")).split(",")
    }
  }

createTempDir

功能描述:在Spark一级目录下创建临时目录,并将目录注册到关闭钩子管理器ShutdownHookManager中,这样当JVM退出时,就可以对这些目录进行删除。
  def createTempDir(
      root: String = System.getProperty("java.io.tmpdir"),
      namePrefix: String = "spark"): File = {
    val dir = createDirectory(root, namePrefix)
    ShutdownHookManager.registerShutdownDeleteDir(dir)
    dir
  }

chmod700

功能描述:给文件所有者授予文件的读、写、执行的权限,而同组和其他用户无任何权限。
  def chmod700(file: File): Boolean = {
    file.setReadable(false, false) &&
    file.setReadable(true, true) &&
    file.setWritable(false, false) &&
    file.setWritable(true, true) &&
    file.setExecutable(false, false) &&
    file.setExecutable(true, true)
  }

getOrCreateLocalRootDirsImpl

功能描述:在获取的本地目录下创建临时目录,并给临时目录进行授权。
  private def getOrCreateLocalRootDirsImpl(conf: SparkConf): Array[String] = {
    getConfiguredLocalDirs(conf).flatMap { root =>
      try {
        val rootDir = new File(root)
        if (rootDir.exists || rootDir.mkdirs()) {
          val dir = createTempDir(root)
          chmod700(dir)
          Some(dir.getAbsolutePath)
        } else {
          logError(s"Failed to create dir in $root. Ignoring this directory.")
          None
        }
      } catch {
        case e: IOException =>
          logError(s"Failed to create local root dir in $root. Ignoring this directory.")
          None
      }
    }
  }

getOrCreateLocalRootDirs

功能描述:在获取的本地目录下创建临时目录,并给临时目录进行授权。
  private[spark] def getOrCreateLocalRootDirs(conf: SparkConf): Array[String] = {
    if (localRootDirs == null) {
      this.synchronized {
        if (localRootDirs == null) {
          localRootDirs = getOrCreateLocalRootDirsImpl(conf)
        }
      }
    }
    localRootDirs
  }

getLocalDir

功能描述:查询Spark本地文件的一级目录。
def getLocalDir(conf: SparkConf): String = {
    getOrCreateLocalRootDirs(conf)(0)
  }

getFormattedClassName

功能描述:过滤类的简单名称中的$符号。
  def getFormattedClassName(obj: AnyRef): String = {
    obj.getClass.getSimpleName.replace("$", "")
  }

nonNegativeHash

功能描述:根据指定对象,获取非负的哈希值。
  def nonNegativeHash(obj: AnyRef): Int = {
    // Required ?
    if (obj eq null) return 0

    val hash = obj.hashCode
    // math.abs fails for Int.MinValue
    val hashAbs = if (Int.MinValue != hash) math.abs(hash) else 0

    // Nothing else to guard against ?
    hashAbs
  }

nonNegativeMod

功能描述:对输入参数x和mod进行取模运算,如果取模的结果rawMod小于0,会以rawMod与mod的和作为返回值,否则以rawMod作为返回值。
  def nonNegativeMod(x: Int, mod: Int): Int = {
    val rawMod = x % mod
    rawMod + (if (rawMod < 0) mod else 0)
  }

tryWithSafeFinally

功能描述:以安全方式调用block函数,其实质为当在finally中调用finallyBlock函数发生异常时,不要压制外层catch块里的异常。
  def tryWithSafeFinally[T](block: => T)(finallyBlock: => Unit): T = {
    var originalThrowable: Throwable = null
    try {
      block
    } catch {
      case t: Throwable =>
        // Purposefully not using NonFatal, because even fatal exceptions
        // we don't want to have our finallyBlock suppress
        originalThrowable = t
        throw originalThrowable
    } finally {
      try {
        finallyBlock
      } catch {
        case t: Throwable =>
          if (originalThrowable != null) {
            originalThrowable.addSuppressed(t)
            logWarning(s"Suppressing exception in finally: " + t.getMessage, t)
            throw originalThrowable
          } else {
            throw t
          }
      }
    }
  }

tryOrIOException

功能描述:执行带有返回值的代码块,并将任何非致命的未捕获异常转换为IOException抛出。

  def tryOrIOException[T](block: => T): T = {
    try {
      block
    } catch {
      case e: IOException =>
        logError("Exception encountered", e)
        throw e
      case NonFatal(e) =>
        logError("Exception encountered", e)
        throw new IOException(e)
    }
  }

deleteRecursively

功能描述:用于删除文件或者删除目录及其子目录、子文件,并且从关闭钩子管理器ShutdownHookManager中移除此文件或目录。
  def deleteRecursively(file: File) {
    if (file != null) {
      try {
        if (file.isDirectory && !isSymlink(file)) {
          var savedIOException: IOException = null
          for (child <- listFilesSafely(file)) {
            try {
              deleteRecursively(child)
            } catch {
              // In case of multiple exceptions, only last one will be thrown
              case ioe: IOException => savedIOException = ioe
            }
          }
          if (savedIOException != null) {
            throw savedIOException
          }
          ShutdownHookManager.removeShutdownDeleteDir(file)
        }
      } finally {
        if (!file.delete()) {
          // Delete can also fail if the file simply did not exist
          if (file.exists()) {
            throw new IOException("Failed to delete: " + file.getAbsolutePath)
          }
        }
      }
    }
  }

getSparkClassLoader

功能描述:获取加载当前class的ClassLoader。
  def getSparkClassLoader = getClass.getClassLoader

bytesToString

功能描述:将字节数转换为人类可读的字符串,例如"4.0 MB"。
  def bytesToString(size: Long): String = {
    val TB = 1L << 40
    val GB = 1L << 30
    val MB = 1L << 20
    val KB = 1L << 10

    val (value, unit) = {
      if (size >= 2*TB) {
        (size.asInstanceOf[Double] / TB, "TB")
      } else if (size >= 2*GB) {
        (size.asInstanceOf[Double] / GB, "GB")
      } else if (size >= 2*MB) {
        (size.asInstanceOf[Double] / MB, "MB")
      } else if (size >= 2*KB) {
        (size.asInstanceOf[Double] / KB, "KB")
      } else {
        (size.asInstanceOf[Double], "B")
      }
    }
    "%.1f %s".formatLocal(Locale.US, value, unit)
  }

classForName

功能描述:加载指定的Class。
  def classForName(className: String): Class[_] = {
    Class.forName(className, true, getContextOrSparkClassLoader)
  }

getThreadDump

功能描述:获取线程转储。
  def getThreadDump(): Array[ThreadStackTrace] = {
    val threadInfos = ManagementFactory.getThreadMXBean.dumpAllThreads(true, true).filter(_ != null)
    threadInfos.sortBy(_.getThreadId).map(threadInfoToThreadStackTrace)
  }

CallerContext

功能描述:Utils工具类中提供的保存调用者上下文信息的类型。
private[spark] class CallerContext(
   from: String,
   appId: Option[String] = None,
   appAttemptId: Option[String] = None,
   jobId: Option[Int] = None,
   stageId: Option[Int] = None,
   stageAttemptId: Option[Int] = None,
   taskId: Option[Long] = None,
   taskAttemptNumber: Option[Int] = None) extends Logging {

   val appIdStr = if (appId.isDefined) s"_${appId.get}" else ""
   val appAttemptIdStr = if (appAttemptId.isDefined) s"_${appAttemptId.get}" else ""
   val jobIdStr = if (jobId.isDefined) s"_JId_${jobId.get}" else ""
   val stageIdStr = if (stageId.isDefined) s"_SId_${stageId.get}" else ""
   val stageAttemptIdStr = if (stageAttemptId.isDefined) s"_${stageAttemptId.get}" else ""
   val taskIdStr = if (taskId.isDefined) s"_TId_${taskId.get}" else ""
   val taskAttemptNumberStr =
     if (taskAttemptNumber.isDefined) s"_${taskAttemptNumber.get}" else ""

   val context = "SPARK_" + from + appIdStr + appAttemptIdStr +
     jobIdStr + stageIdStr + stageAttemptIdStr + taskIdStr + taskAttemptNumberStr

  def setCurrentContext(): Unit = {
    if (CallerContext.callerContextSupported) {
      try {
        val callerContext = Utils.classForName("org.apache.hadoop.ipc.CallerContext")
        val builder = Utils.classForName("org.apache.hadoop.ipc.CallerContext$Builder")
        val builderInst = builder.getConstructor(classOf[String]).newInstance(context)
        val hdfsContext = builder.getMethod("build").invoke(builderInst)
        callerContext.getMethod("setCurrent", callerContext).invoke(null, hdfsContext)
      } catch {
        case NonFatal(e) =>
          logWarning("Fail to set Spark caller context", e)
      }
    }
  }
}

memoryStringToMb

功能描述:将字符串转换为字节,然后将字节转换为Mb。
  def memoryStringToMb(str: String): Int = {
    (JavaUtils.byteStringAsBytes(str) / 1024 / 1024).toInt
  }

getMaxResultSize

功能描述:用于获取结果总大小的字节限制。
  def getMaxResultSize(conf: SparkConf): Long = {
    memoryStringToMb(conf.get("spark.driver.maxResultSize", "1g")).toLong << 20
  }

tempFileWith

功能描述:用于根据指定的路径,返回临时文件的路径。
  def tempFileWith(path: File): File = {
    new File(path.getAbsolutePath + "." + UUID.randomUUID())
  }

copyStream

功能描述:用于将输入流中的字节拷贝到输出流中。
  def copyStream(in: InputStream,
                 out: OutputStream,
                 closeStreams: Boolean = false,
                 transferToEnabled: Boolean = false): Long =
  {
    var count = 0L
    tryWithSafeFinally {
      if (in.isInstanceOf[FileInputStream] && out.isInstanceOf[FileOutputStream]
        && transferToEnabled) {
        val inChannel = in.asInstanceOf[FileInputStream].getChannel()
        val outChannel = out.asInstanceOf[FileOutputStream].getChannel()
        val initialPos = outChannel.position()
        val size = inChannel.size()

        while (count < size) {
          count += inChannel.transferTo(count, size - count, outChannel)
        }

        val finalPos = outChannel.position()
        assert(finalPos == initialPos + size,
          s"""
             |Current position $finalPos do not equal to expected position ${initialPos + size}
             |after transferTo, please check your kernel version to see if it is 2.6.32,
             |this is a kernel bug which will lead to unexpected behavior when using transferTo.
             |You can set spark.file.transferTo = false to disable this NIO feature.
           """.stripMargin)
      } else {
        val buf = new Array[Byte](8192)
        var n = 0
        while (n != -1) {
          n = in.read(buf)
          if (n != -1) {
            out.write(buf, 0, n)
            count += n
          }
        }
      }
      count
    } {
      if (closeStreams) {
        try {
          in.close()
        } finally {
          out.close()
        }
      }
    }
  }

logUncaughtExceptions

功能描述:执行函数f,并捕获异常后打印错误日志。
  def logUncaughtExceptions[T](f: => T): T = {
    try {
      f
    } catch {
      case ct: ControlThrowable =>
        throw ct
      case t: Throwable =>
        logError(s"Uncaught exception in thread ${Thread.currentThread().getName}", t)
        throw t
    }
  }

getContextOrSparkClassLoader

功能描述:用于获取线程上下文的ClassLoader,没有设置时获取加载Spark的ClassLoader。
def getContextOrSparkClassLoader =
    Option(Thread.currentThread().getContextClassLoader).getOrElse(getSparkClassLoader)

doFetchFile

功能描述:使用URLConnection通过http协议下载文件。
  private def doFetchFile(
      url: String,
      targetDir: File,
      filename: String,
      conf: SparkConf,
      securityMgr: SecurityManager,
      hadoopConf: Configuration) {
    val targetFile = new File(targetDir, filename)
    val uri = new URI(url)
    val fileOverwrite = conf.getBoolean("spark.files.overwrite", defaultValue = false)
    Option(uri.getScheme).getOrElse("file") match {
      case "spark" =>
        if (SparkEnv.get == null) {
          throw new IllegalStateException(
            "Cannot retrieve files with 'spark' scheme without an active SparkEnv.")
        }
        val source = SparkEnv.get.rpcEnv.openChannel(url)
        val is = Channels.newInputStream(source)
        downloadFile(url, is, targetFile, fileOverwrite)
      case "http" | "https" | "ftp" =>
        var uc: URLConnection = null
        if (securityMgr.isAuthenticationEnabled()) {
          logDebug("fetchFile with security enabled")
          val newuri = constructURIForAuthentication(uri, securityMgr)
          uc = newuri.toURL().openConnection()
          uc.setAllowUserInteraction(false)
        } else {
          logDebug("fetchFile not using security")
          uc = new URL(url).openConnection()
        }
        Utils.setupSecureURLConnection(uc, securityMgr)

        val timeoutMs =
          conf.getTimeAsSeconds("spark.files.fetchTimeout", "60s").toInt * 1000
        uc.setConnectTimeout(timeoutMs)
        uc.setReadTimeout(timeoutMs)
        uc.connect()
        val in = uc.getInputStream()
        downloadFile(url, in, targetFile, fileOverwrite)
      case "file" =>
        // In the case of a local file, copy the local file to the target directory.
        // Note the difference between uri vs url.
        val sourceFile = if (uri.isAbsolute) new File(uri) else new File(url)
        copyFile(url, sourceFile, targetFile, fileOverwrite)
      case _ =>
        val fs = getHadoopFileSystem(uri, hadoopConf)
        val path = new Path(uri)
        fetchHcfsFile(path, targetDir, fs, conf, hadoopConf, fileOverwrite,
                      filename = Some(filename))
    }
  }

fetchFile

功能描述:如果文件在本地有缓存,则从本地获取,否则通过HTTP远程下载。最后对.tar、.tar.gz等格式的文件解压缩后,调用shell命令行的chmod命令给文件增加a+x的权限。
  def fetchFile(
      url: String,
      targetDir: File,
      conf: SparkConf,
      securityMgr: SecurityManager,
      hadoopConf: Configuration,
      timestamp: Long,
      useCache: Boolean) {
    val fileName = decodeFileNameInURI(new URI(url))
    val targetFile = new File(targetDir, fileName)
    val fetchCacheEnabled = conf.getBoolean("spark.files.useFetchCache", defaultValue = true)
    if (useCache && fetchCacheEnabled) {
      val cachedFileName = s"${url.hashCode}${timestamp}_cache"
      val lockFileName = s"${url.hashCode}${timestamp}_lock"
      val localDir = new File(getLocalDir(conf))
      val lockFile = new File(localDir, lockFileName)
      val lockFileChannel = new RandomAccessFile(lockFile, "rw").getChannel()
      val lock = lockFileChannel.lock()
      val cachedFile = new File(localDir, cachedFileName)
      try {
        if (!cachedFile.exists()) {
          doFetchFile(url, localDir, cachedFileName, conf, securityMgr, hadoopConf)
        }
      } finally {
        lock.release()
        lockFileChannel.close()
      }
      copyFile(
        url,
        cachedFile,
        targetFile,
        conf.getBoolean("spark.files.overwrite", false)
      )
    } else {
      doFetchFile(url, targetDir, fileName, conf, securityMgr, hadoopConf)
    }

    // Decompress the file if it's a .tar or .tar.gz
    if (fileName.endsWith(".tar.gz") || fileName.endsWith(".tgz")) {
      logInfo("Untarring " + fileName)
      executeAndGetOutput(Seq("tar", "-xzf", fileName), targetDir)
    } else if (fileName.endsWith(".tar")) {
      logInfo("Untarring " + fileName)
      executeAndGetOutput(Seq("tar", "-xf", fileName), targetDir)
    }
    // Make the file executable - That's necessary for scripts
    FileUtil.chmod(targetFile.getAbsolutePath, "a+x")

    // Windows does not grant read permission by default to non-admin users
    // Add read permission to owner explicitly
    if (isWindows) {
      FileUtil.chmod(targetFile.getAbsolutePath, "u+r")
    }
  }

executeAndGetOutput

功能描述:执行一条command命令,并且获取它的输出。调用stdoutThread的join方法,让当前线程等待stdoutThread执行完成。
  def executeAndGetOutput(
      command: Seq[String],
      workingDir: File = new File("."),
      extraEnvironment: Map[String, String] = Map.empty,
      redirectStderr: Boolean = true): String = {
    val process = executeCommand(command, workingDir, extraEnvironment, redirectStderr)
    val output = new StringBuilder
    val threadName = "read stdout for " + command(0)
    def appendToOutput(s: String): Unit = output.append(s).append("\n")
    val stdoutThread = processStreamByLine(threadName, process.getInputStream, appendToOutput)
    val exitCode = process.waitFor()
    stdoutThread.join()   // Wait for it to finish reading output
    if (exitCode != 0) {
      logError(s"Process $command exited with code $exitCode: $output")
      throw new SparkException(s"Process $command exited with code $exitCode")
    }
    output.toString
  }

memoryStringToMb

功能描述:将内存大小字符串转换为以MB为单位的整型值。
  def memoryStringToMb(str: String): Int = {
    (JavaUtils.byteStringAsBytes(str) / 1024 / 1024).toInt
  }

extractHostPortFromSparkUrl

功能描述:从格式为spark://host:port的Spark URL中抽取出host和port。
  @throws(classOf[SparkException])
  def extractHostPortFromSparkUrl(sparkUrl: String): (String, Int) = {
    try {
      val uri = new java.net.URI(sparkUrl)
      val host = uri.getHost
      val port = uri.getPort
      if (uri.getScheme != "spark" ||
        host == null ||
        port < 0 ||
        (uri.getPath != null && !uri.getPath.isEmpty) || // uri.getPath returns "" instead of null
        uri.getFragment != null ||
        uri.getQuery != null ||
        uri.getUserInfo != null) {
        throw new SparkException("Invalid master URL: " + sparkUrl)
      }
      (host, port)
    } catch {
      case e: java.net.URISyntaxException =>
        throw new SparkException("Invalid master URL: " + sparkUrl, e)
    }
  }

isDynamicAllocationEnabled

功能描述:判断是否启用了动态分配。
  def isDynamicAllocationEnabled(conf: SparkConf): Boolean = {
    val dynamicAllocationEnabled = conf.getBoolean("spark.dynamicAllocation.enabled", false)
    dynamicAllocationEnabled &&
      (!isLocalMaster(conf) || conf.getBoolean("spark.dynamicAllocation.testing", false))
  }



关于Spark内核设计的艺术 架构设计与实现

经过近一年的准备,《 Spark内核设计的艺术 架构设计与实现 》一书现已出版发行,图书如图:


纸质版售卖链接如下:
电子版售卖链接如下:

相关文章
|
分布式计算 Spark
Spark2.1命令工具类CommandUtils的源码分析
版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/beliefer/article/details/77450103 注:本文是为了配合《Spark内核设计的艺术 架构设计与实现》一书的内容而编写,目的是为了节省成本、方便读者查阅。
1370 0
|
分布式计算 Spark
Spark2.1 RPC工具类RpcUtils
版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/beliefer/article/details/77450157 注:本文是为了配合《Spark内核设计的艺术 架构设计与实现》一书的内容而编写,目的是为了节省成本、方便读者查阅。
1258 0
|
Web App开发 分布式计算 Java
Spark中常用工具类Utils的简明介绍
版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/beliefer/article/details/50904662 《深入理...
1254 0
|
1月前
|
分布式计算 大数据 Apache
ClickHouse与大数据生态集成:Spark & Flink 实战
【10月更文挑战第26天】在当今这个数据爆炸的时代,能够高效地处理和分析海量数据成为了企业和组织提升竞争力的关键。作为一款高性能的列式数据库系统,ClickHouse 在大数据分析领域展现出了卓越的能力。然而,为了充分利用ClickHouse的优势,将其与现有的大数据处理框架(如Apache Spark和Apache Flink)进行集成变得尤为重要。本文将从我个人的角度出发,探讨如何通过这些技术的结合,实现对大规模数据的实时处理和分析。
130 2
ClickHouse与大数据生态集成:Spark & Flink 实战
|
2月前
|
存储 分布式计算 算法
大数据-106 Spark Graph X 计算学习 案例:1图的基本计算、2连通图算法、3寻找相同的用户
大数据-106 Spark Graph X 计算学习 案例:1图的基本计算、2连通图算法、3寻找相同的用户
72 0
|
2月前
|
消息中间件 分布式计算 NoSQL
大数据-104 Spark Streaming Kafka Offset Scala实现Redis管理Offset并更新
大数据-104 Spark Streaming Kafka Offset Scala实现Redis管理Offset并更新
45 0
|
2月前
|
消息中间件 存储 分布式计算
大数据-103 Spark Streaming Kafka Offset管理详解 Scala自定义Offset
大数据-103 Spark Streaming Kafka Offset管理详解 Scala自定义Offset
101 0
|
1月前
|
SQL 机器学习/深度学习 分布式计算
Spark快速上手:揭秘大数据处理的高效秘密,让你轻松应对海量数据
【10月更文挑战第25天】本文全面介绍了大数据处理框架 Spark,涵盖其基本概念、安装配置、编程模型及实际应用。Spark 是一个高效的分布式计算平台,支持批处理、实时流处理、SQL 查询和机器学习等任务。通过详细的技术综述和示例代码,帮助读者快速掌握 Spark 的核心技能。
82 6
|
1月前
|
存储 分布式计算 Hadoop
数据湖技术:Hadoop与Spark在大数据处理中的协同作用
【10月更文挑战第27天】在大数据时代,数据湖技术凭借其灵活性和成本效益成为企业存储和分析大规模异构数据的首选。Hadoop和Spark作为数据湖技术的核心组件,通过HDFS存储数据和Spark进行高效计算,实现了数据处理的优化。本文探讨了Hadoop与Spark的最佳实践,包括数据存储、处理、安全和可视化等方面,展示了它们在实际应用中的协同效应。
110 2
|
1月前
|
存储 分布式计算 Hadoop
数据湖技术:Hadoop与Spark在大数据处理中的协同作用
【10月更文挑战第26天】本文详细探讨了Hadoop与Spark在大数据处理中的协同作用,通过具体案例展示了两者的最佳实践。Hadoop的HDFS和MapReduce负责数据存储和预处理,确保高可靠性和容错性;Spark则凭借其高性能和丰富的API,进行深度分析和机器学习,实现高效的批处理和实时处理。
74 1
下一篇
DataWorks