版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/beliefer/article/details/77449771
注:本文是为了配合《Spark内核设计的艺术 架构设计与实现》一书的内容而编写,目的是为了节省成本、方便读者查阅。书中附录A的内容都在本文呈现。
Utils是Spark最常用的工具类之一,Spark Core大量使用了此类提供的基础功能。即使不关心其实现也不会对理解本书对Spark源码的分析有太多影响。下面将逐个介绍Utils提供的方法。
getSystemProperties
功能描述:获取系统属性的键值对。
def getSystemProperties: Map[String, String] = {
System.getProperties.stringPropertyNames().asScala
.map(key => (key, System.getProperty(key))).toMap
}
localHostName
功能描述:获取本地机器名。
def localHostName(): String = {
customHostname.getOrElse(localIpAddress.getHostAddress)
}
getDefaultPropertiesFile
功能描述:获取默认的Spark属性文件。
def getDefaultPropertiesFile(env: Map[String, String] = sys.env): String = {
env.get("SPARK_CONF_DIR")
.orElse(env.get("SPARK_HOME").map { t => s"$t${File.separator}conf" })
.map { t => new File(s"$t${File.separator}spark-defaults.conf")}
.filter(_.isFile)
.map(_.getAbsolutePath)
.orNull
}
getPropertiesFromFile
功能描述:从文件中获取属性。
def getPropertiesFromFile(filename: String): Map[String, String] = {
val file = new File(filename)
require(file.exists(), s"Properties file $file does not exist")
require(file.isFile(), s"Properties file $file is not a normal file")
val inReader = new InputStreamReader(new FileInputStream(file), StandardCharsets.UTF_8)
try {
val properties = new Properties()
properties.load(inReader)
properties.stringPropertyNames().asScala.map(
k => (k, properties.getProperty(k).trim)).toMap
} catch {
case e: IOException =>
throw new SparkException(s"Failed when loading Spark properties from $filename", e)
} finally {
inReader.close()
}
}
loadDefaultSparkProperties
功能描述:加载指定文件中的Spark属性,如果没有指定文件,则加载默认Spark属性文件的属性。
def loadDefaultSparkProperties(conf: SparkConf, filePath: String = null): String = {
val path = Option(filePath).getOrElse(getDefaultPropertiesFile())
Option(path).foreach { confFile =>
getPropertiesFromFile(confFile).filter { case (k, v) =>
k.startsWith("spark.")
}.foreach { case (k, v) =>
conf.setIfMissing(k, v)
sys.props.getOrElseUpdate(k, v)
}
}
path
}
getCallSite
功能描述:获取当前SparkContext的当前调用堆栈,将栈里最靠近栈底的属于Spark或者Scala核心的类压入callStack的栈顶,并将此类的方法存入lastSparkMethod;将栈里最靠近栈顶的用户类放入callStack,将此类的行号存入firstUserLine,类名存入firstUserFile,最终返回的样例类CallSite存储了最短栈和长度默认为20的最长栈的样例类。在JavaWordCount例子中,获得的数据如下:
最短栈:getOrCreate atJavaWordCount.java:48;
最长栈:org.apache.spark.sql.SparkSession$Builder.getOrCreate(SparkSession.scala:860)
org.apache.spark.examples.JavaWordCount.main(JavaWordCount.java:48)
def getCallSite(skipClass: String => Boolean = sparkInternalExclusionFunction): CallSite = {
var lastSparkMethod = "<unknown>"
var firstUserFile = "<unknown>"
var firstUserLine = 0
var insideSpark = true
var callStack = new ArrayBuffer[String]() :+ "<unknown>"
Thread.currentThread.getStackTrace().foreach { ste: StackTraceElement =>
if (ste != null && ste.getMethodName != null
&& !ste.getMethodName.contains("getStackTrace")) {
if (insideSpark) {
if (skipClass(ste.getClassName)) {
lastSparkMethod = if (ste.getMethodName == "<init>") {
// Spark method is a constructor; get its class name
ste.getClassName.substring(ste.getClassName.lastIndexOf('.') + 1)
} else {
ste.getMethodName
}
callStack(0) = ste.toString // Put last Spark method on top of the stack trace.
} else {
if (ste.getFileName != null) {
firstUserFile = ste.getFileName
if (ste.getLineNumber >= 0) {
firstUserLine = ste.getLineNumber
}
}
callStack += ste.toString
insideSpark = false
}
} else {
callStack += ste.toString
}
}
}
val callStackDepth = System.getProperty("spark.callstack.depth", "20").toInt
val shortForm =
if (firstUserFile == "HiveSessionImpl.java") {
// To be more user friendly, show a nicer string for queries submitted from the JDBC
// server.
"Spark JDBC Server Query"
} else {
s"$lastSparkMethod at $firstUserFile:$firstUserLine"
}
val longForm = callStack.take(callStackDepth).mkString("\n")
CallSite(shortForm, longForm)
}
tryOrStopSparkContext
功能描述:用于在执行目标方法抛出异常后新启一个用于异步停止SparkContext的线程。 def tryOrStopSparkContext(sc: SparkContext)(block: => Unit) {
try {
block
} catch {
case e: ControlThrowable => throw e
case t: Throwable =>
val currentThreadName = Thread.currentThread().getName
if (sc != null) {
logError(s"uncaught error in thread $currentThreadName, stopping SparkContext", t)
sc.stopInNewThread()
}
if (!NonFatal(t)) {
logError(s"throw uncaught fatal error in thread $currentThreadName", t)
throw t
}
}
}
getCurrentUserName
功能描述:用于获取当前用户的用户名。此用户名默认为当前系统的登录用户,也可以通过系统环境变量SPARK_USER进行设置。 def getCurrentUserName(): String = {
Option(System.getenv("SPARK_USER"))
.getOrElse(UserGroupInformation.getCurrentUser().getShortUserName())
}
getUserJars
功能描述:获取用户设置的jar文件。当用户选择的部署模式是yarn时,_jars是由spark.jars属性指定的jar文件和spark.yarn.dist.jars属性指定的jar文件的并集。其它模式下只采用由spark.jars属性指定的jar文件。 def getUserJars(conf: SparkConf, isShell: Boolean = false): Seq[String] = {
val sparkJars = conf.getOption("spark.jars")
if (conf.get("spark.master") == "yarn" && isShell) {
val yarnJars = conf.getOption("spark.yarn.dist.jars")
unionFileLists(sparkJars, yarnJars).toSeq
} else {
sparkJars.map(_.split(",")).map(_.filter(_.nonEmpty)).toSeq.flatten
}
}
startServiceOnPort
功能描述:Scala跟其它脚本语言一样,函数也可以传递,此方法正是通过回调startService这个函数来启动服务,并最终返回startService返回的service地址及端口。如果启动过程有异常,还会多次重试,直到达到maxRetries表示的最大次数。def startServiceOnPort[T](
startPort: Int,
startService: Int => (T, Int),
conf: SparkConf,
serviceName: String = ""): (T, Int) = {
require(startPort == 0 || (1024 <= startPort && startPort < 65536),
"startPort should be between 1024 and 65535 (inclusive), or 0 for a random free port.")
val serviceString = if (serviceName.isEmpty) "" else s" '$serviceName'"
val maxRetries = portMaxRetries(conf)
for (offset <- 0 to maxRetries) {
val tryPort = if (startPort == 0) {
startPort
} else {
((startPort + offset - 1024) % (65536 - 1024)) + 1024
}
try {
val (service, port) = startService(tryPort)
logInfo(s"Successfully started service$serviceString on port $port.")
return (service, port)
} catch {
case e: Exception if isBindCollision(e) =>
if (offset >= maxRetries) {
val exceptionMessage =
s"${e.getMessage}: Service$serviceString failed after $maxRetries retries!"
val exception = new BindException(exceptionMessage)
exception.setStackTrace(e.getStackTrace)
throw exception
}
logWarning(s"Service$serviceString could not bind on port $tryPort. " +
s"Attempting port ${tryPort + 1}.")
}
}
throw new SparkException(s"Failed to start service$serviceString on port $startPort")
}
createDirectory
功能描述:用spark+UUID的方式创建临时文件目录,如果创建失败会多次重试,最多重试10次。 def createDirectory(root: String, namePrefix: String = "spark"): File = {
var attempts = 0
val maxAttempts = MAX_DIR_CREATION_ATTEMPTS
var dir: File = null
while (dir == null) {
attempts += 1
if (attempts > maxAttempts) {
throw new IOException("Failed to create a temp directory (under " + root + ") after " +
maxAttempts + " attempts!")
}
try {
dir = new File(root, namePrefix + "-" + UUID.randomUUID.toString)
if (dir.exists() || !dir.mkdirs()) {
dir = null
}
} catch { case e: SecurityException => dir = null; }
}
dir.getCanonicalFile
}
isRunningInYarnContainer
功能描述:判断是否运行在Yarn的Container中。实际是根据系统环境变量是否包含CONTAINER_ID为依据。CONTAINER_ID是由Yarn所设置的。 private[spark] def isRunningInYarnContainer(conf: SparkConf): Boolean = {
conf.getenv("CONTAINER_ID") != null
}
getYarnLocalDirs
功能描述:获取Yarn所批准的本地目录。Yarn会向系统环境变量中设置LOCAL_DIRS来指定所批准的本地目录。 private def getYarnLocalDirs(conf: SparkConf): String = {
val localDirs = Option(conf.getenv("LOCAL_DIRS")).getOrElse("")
if (localDirs.isEmpty) {
throw new Exception("Yarn Local dirs can't be empty")
}
localDirs
}
getConfiguredLocalDirs
功能描述:获取所配置的本地目录。如果当前Executor或者Driver运行在Yarn的Container中,则获取Yarn所批准的本地目录。否则如果系统环境变量包含SPARK_EXECUTOR_DIRS,那就获取SPARK_EXECUTOR_DIRS所指定的目录。否则如果系统环境变量包含SPARK_LOCAL_DIRS那就获取SPARK_LOCAL_DIRS所指定的目录。否则如果系统环境变量包含MESOS_DIRECTORY,并且当前不允许shuffle(由于Mesos中的任务运行结束后,会自动清除所有的临时文件,因此任务结束后的中间输出将不复存在,使shuffle无法正常工作),那么就获取MESOS_DIRECTORY指定的目录。否则将获取spark.local.dir属性或者系统属性java.io.tmpdir指定的目录。 def getConfiguredLocalDirs(conf: SparkConf): Array[String] = {
val shuffleServiceEnabled = conf.getBoolean("spark.shuffle.service.enabled", false)
if (isRunningInYarnContainer(conf)) {
getYarnLocalDirs(conf).split(",")
} else if (conf.getenv("SPARK_EXECUTOR_DIRS") != null) {
conf.getenv("SPARK_EXECUTOR_DIRS").split(File.pathSeparator)
} else if (conf.getenv("SPARK_LOCAL_DIRS") != null) {
conf.getenv("SPARK_LOCAL_DIRS").split(",")
} else if (conf.getenv("MESOS_DIRECTORY") != null && !shuffleServiceEnabled) {
Array(conf.getenv("MESOS_DIRECTORY"))
} else {
if (conf.getenv("MESOS_DIRECTORY") != null && shuffleServiceEnabled) {
logInfo("MESOS_DIRECTORY available but not using provided Mesos sandbox because " +
"spark.shuffle.service.enabled is enabled.")
}
conf.get("spark.local.dir", System.getProperty("java.io.tmpdir")).split(",")
}
}
createTempDir
功能描述:在Spark一级目录下创建临时目录,并将目录注册到关闭钩子管理器ShutdownHookManager中,这样当JVM退出时,就可以对这些目录进行删除。 def createTempDir(
root: String = System.getProperty("java.io.tmpdir"),
namePrefix: String = "spark"): File = {
val dir = createDirectory(root, namePrefix)
ShutdownHookManager.registerShutdownDeleteDir(dir)
dir
}
chmod700
功能描述:给文件所有者授予文件的读、写、执行的权限,而同组和其他用户无任何权限。 def chmod700(file: File): Boolean = {
file.setReadable(false, false) &&
file.setReadable(true, true) &&
file.setWritable(false, false) &&
file.setWritable(true, true) &&
file.setExecutable(false, false) &&
file.setExecutable(true, true)
}
getOrCreateLocalRootDirsImpl
功能描述:在获取的本地目录下创建临时目录,并给临时目录进行授权。 private def getOrCreateLocalRootDirsImpl(conf: SparkConf): Array[String] = {
getConfiguredLocalDirs(conf).flatMap { root =>
try {
val rootDir = new File(root)
if (rootDir.exists || rootDir.mkdirs()) {
val dir = createTempDir(root)
chmod700(dir)
Some(dir.getAbsolutePath)
} else {
logError(s"Failed to create dir in $root. Ignoring this directory.")
None
}
} catch {
case e: IOException =>
logError(s"Failed to create local root dir in $root. Ignoring this directory.")
None
}
}
}
getOrCreateLocalRootDirs
功能描述:在获取的本地目录下创建临时目录,并给临时目录进行授权。 private[spark] def getOrCreateLocalRootDirs(conf: SparkConf): Array[String] = {
if (localRootDirs == null) {
this.synchronized {
if (localRootDirs == null) {
localRootDirs = getOrCreateLocalRootDirsImpl(conf)
}
}
}
localRootDirs
}
getLocalDir
功能描述:查询Spark本地文件的一级目录。def getLocalDir(conf: SparkConf): String = {
getOrCreateLocalRootDirs(conf)(0)
}
getFormattedClassName
功能描述:过滤类的简单名称中的$符号。 def getFormattedClassName(obj: AnyRef): String = {
obj.getClass.getSimpleName.replace("$", "")
}
nonNegativeHash
功能描述:根据指定对象,获取非负的哈希值。 def nonNegativeHash(obj: AnyRef): Int = {
// Required ?
if (obj eq null) return 0
val hash = obj.hashCode
// math.abs fails for Int.MinValue
val hashAbs = if (Int.MinValue != hash) math.abs(hash) else 0
// Nothing else to guard against ?
hashAbs
}
nonNegativeMod
功能描述:对输入参数x和mod进行取模运算,如果取模的结果rawMod小于0,会以rawMod与mod的和作为返回值,否则以rawMod作为返回值。 def nonNegativeMod(x: Int, mod: Int): Int = {
val rawMod = x % mod
rawMod + (if (rawMod < 0) mod else 0)
}
tryWithSafeFinally
功能描述:以安全方式调用block函数,其实质为当在finally中调用finallyBlock函数发生异常时,不要压制外层catch块里的异常。 def tryWithSafeFinally[T](block: => T)(finallyBlock: => Unit): T = {
var originalThrowable: Throwable = null
try {
block
} catch {
case t: Throwable =>
// Purposefully not using NonFatal, because even fatal exceptions
// we don't want to have our finallyBlock suppress
originalThrowable = t
throw originalThrowable
} finally {
try {
finallyBlock
} catch {
case t: Throwable =>
if (originalThrowable != null) {
originalThrowable.addSuppressed(t)
logWarning(s"Suppressing exception in finally: " + t.getMessage, t)
throw originalThrowable
} else {
throw t
}
}
}
}
tryOrIOException
功能描述:执行带有返回值的代码块,并将任何非致命的未捕获异常转换为IOException抛出。
def tryOrIOException[T](block: => T): T = {
try {
block
} catch {
case e: IOException =>
logError("Exception encountered", e)
throw e
case NonFatal(e) =>
logError("Exception encountered", e)
throw new IOException(e)
}
}
deleteRecursively
功能描述:用于删除文件或者删除目录及其子目录、子文件,并且从关闭钩子管理器ShutdownHookManager中移除此文件或目录。 def deleteRecursively(file: File) {
if (file != null) {
try {
if (file.isDirectory && !isSymlink(file)) {
var savedIOException: IOException = null
for (child <- listFilesSafely(file)) {
try {
deleteRecursively(child)
} catch {
// In case of multiple exceptions, only last one will be thrown
case ioe: IOException => savedIOException = ioe
}
}
if (savedIOException != null) {
throw savedIOException
}
ShutdownHookManager.removeShutdownDeleteDir(file)
}
} finally {
if (!file.delete()) {
// Delete can also fail if the file simply did not exist
if (file.exists()) {
throw new IOException("Failed to delete: " + file.getAbsolutePath)
}
}
}
}
}
getSparkClassLoader
功能描述:获取加载当前class的ClassLoader。 def getSparkClassLoader = getClass.getClassLoader
bytesToString
功能描述:将字节数转换为人类可读的字符串,例如"4.0 MB"。 def bytesToString(size: Long): String = {
val TB = 1L << 40
val GB = 1L << 30
val MB = 1L << 20
val KB = 1L << 10
val (value, unit) = {
if (size >= 2*TB) {
(size.asInstanceOf[Double] / TB, "TB")
} else if (size >= 2*GB) {
(size.asInstanceOf[Double] / GB, "GB")
} else if (size >= 2*MB) {
(size.asInstanceOf[Double] / MB, "MB")
} else if (size >= 2*KB) {
(size.asInstanceOf[Double] / KB, "KB")
} else {
(size.asInstanceOf[Double], "B")
}
}
"%.1f %s".formatLocal(Locale.US, value, unit)
}
classForName
功能描述:加载指定的Class。 def classForName(className: String): Class[_] = {
Class.forName(className, true, getContextOrSparkClassLoader)
}
getThreadDump
功能描述:获取线程转储。 def getThreadDump(): Array[ThreadStackTrace] = {
val threadInfos = ManagementFactory.getThreadMXBean.dumpAllThreads(true, true).filter(_ != null)
threadInfos.sortBy(_.getThreadId).map(threadInfoToThreadStackTrace)
}
CallerContext
功能描述:Utils工具类中提供的保存调用者上下文信息的类型。private[spark] class CallerContext(
from: String,
appId: Option[String] = None,
appAttemptId: Option[String] = None,
jobId: Option[Int] = None,
stageId: Option[Int] = None,
stageAttemptId: Option[Int] = None,
taskId: Option[Long] = None,
taskAttemptNumber: Option[Int] = None) extends Logging {
val appIdStr = if (appId.isDefined) s"_${appId.get}" else ""
val appAttemptIdStr = if (appAttemptId.isDefined) s"_${appAttemptId.get}" else ""
val jobIdStr = if (jobId.isDefined) s"_JId_${jobId.get}" else ""
val stageIdStr = if (stageId.isDefined) s"_SId_${stageId.get}" else ""
val stageAttemptIdStr = if (stageAttemptId.isDefined) s"_${stageAttemptId.get}" else ""
val taskIdStr = if (taskId.isDefined) s"_TId_${taskId.get}" else ""
val taskAttemptNumberStr =
if (taskAttemptNumber.isDefined) s"_${taskAttemptNumber.get}" else ""
val context = "SPARK_" + from + appIdStr + appAttemptIdStr +
jobIdStr + stageIdStr + stageAttemptIdStr + taskIdStr + taskAttemptNumberStr
def setCurrentContext(): Unit = {
if (CallerContext.callerContextSupported) {
try {
val callerContext = Utils.classForName("org.apache.hadoop.ipc.CallerContext")
val builder = Utils.classForName("org.apache.hadoop.ipc.CallerContext$Builder")
val builderInst = builder.getConstructor(classOf[String]).newInstance(context)
val hdfsContext = builder.getMethod("build").invoke(builderInst)
callerContext.getMethod("setCurrent", callerContext).invoke(null, hdfsContext)
} catch {
case NonFatal(e) =>
logWarning("Fail to set Spark caller context", e)
}
}
}
}
memoryStringToMb
功能描述:将字符串转换为字节,然后将字节转换为Mb。 def memoryStringToMb(str: String): Int = {
(JavaUtils.byteStringAsBytes(str) / 1024 / 1024).toInt
}
getMaxResultSize
功能描述:用于获取结果总大小的字节限制。 def getMaxResultSize(conf: SparkConf): Long = {
memoryStringToMb(conf.get("spark.driver.maxResultSize", "1g")).toLong << 20
}
tempFileWith
功能描述:用于根据指定的路径,返回临时文件的路径。 def tempFileWith(path: File): File = {
new File(path.getAbsolutePath + "." + UUID.randomUUID())
}
copyStream
功能描述:用于将输入流中的字节拷贝到输出流中。 def copyStream(in: InputStream,
out: OutputStream,
closeStreams: Boolean = false,
transferToEnabled: Boolean = false): Long =
{
var count = 0L
tryWithSafeFinally {
if (in.isInstanceOf[FileInputStream] && out.isInstanceOf[FileOutputStream]
&& transferToEnabled) {
val inChannel = in.asInstanceOf[FileInputStream].getChannel()
val outChannel = out.asInstanceOf[FileOutputStream].getChannel()
val initialPos = outChannel.position()
val size = inChannel.size()
while (count < size) {
count += inChannel.transferTo(count, size - count, outChannel)
}
val finalPos = outChannel.position()
assert(finalPos == initialPos + size,
s"""
|Current position $finalPos do not equal to expected position ${initialPos + size}
|after transferTo, please check your kernel version to see if it is 2.6.32,
|this is a kernel bug which will lead to unexpected behavior when using transferTo.
|You can set spark.file.transferTo = false to disable this NIO feature.
""".stripMargin)
} else {
val buf = new Array[Byte](8192)
var n = 0
while (n != -1) {
n = in.read(buf)
if (n != -1) {
out.write(buf, 0, n)
count += n
}
}
}
count
} {
if (closeStreams) {
try {
in.close()
} finally {
out.close()
}
}
}
}
logUncaughtExceptions
功能描述:执行函数f,并捕获异常后打印错误日志。 def logUncaughtExceptions[T](f: => T): T = {
try {
f
} catch {
case ct: ControlThrowable =>
throw ct
case t: Throwable =>
logError(s"Uncaught exception in thread ${Thread.currentThread().getName}", t)
throw t
}
}
getContextOrSparkClassLoader
功能描述:用于获取线程上下文的ClassLoader,没有设置时获取加载Spark的ClassLoader。def getContextOrSparkClassLoader =
Option(Thread.currentThread().getContextClassLoader).getOrElse(getSparkClassLoader)
doFetchFile
功能描述:使用URLConnection通过http协议下载文件。 private def doFetchFile(
url: String,
targetDir: File,
filename: String,
conf: SparkConf,
securityMgr: SecurityManager,
hadoopConf: Configuration) {
val targetFile = new File(targetDir, filename)
val uri = new URI(url)
val fileOverwrite = conf.getBoolean("spark.files.overwrite", defaultValue = false)
Option(uri.getScheme).getOrElse("file") match {
case "spark" =>
if (SparkEnv.get == null) {
throw new IllegalStateException(
"Cannot retrieve files with 'spark' scheme without an active SparkEnv.")
}
val source = SparkEnv.get.rpcEnv.openChannel(url)
val is = Channels.newInputStream(source)
downloadFile(url, is, targetFile, fileOverwrite)
case "http" | "https" | "ftp" =>
var uc: URLConnection = null
if (securityMgr.isAuthenticationEnabled()) {
logDebug("fetchFile with security enabled")
val newuri = constructURIForAuthentication(uri, securityMgr)
uc = newuri.toURL().openConnection()
uc.setAllowUserInteraction(false)
} else {
logDebug("fetchFile not using security")
uc = new URL(url).openConnection()
}
Utils.setupSecureURLConnection(uc, securityMgr)
val timeoutMs =
conf.getTimeAsSeconds("spark.files.fetchTimeout", "60s").toInt * 1000
uc.setConnectTimeout(timeoutMs)
uc.setReadTimeout(timeoutMs)
uc.connect()
val in = uc.getInputStream()
downloadFile(url, in, targetFile, fileOverwrite)
case "file" =>
// In the case of a local file, copy the local file to the target directory.
// Note the difference between uri vs url.
val sourceFile = if (uri.isAbsolute) new File(uri) else new File(url)
copyFile(url, sourceFile, targetFile, fileOverwrite)
case _ =>
val fs = getHadoopFileSystem(uri, hadoopConf)
val path = new Path(uri)
fetchHcfsFile(path, targetDir, fs, conf, hadoopConf, fileOverwrite,
filename = Some(filename))
}
}
fetchFile
功能描述:如果文件在本地有缓存,则从本地获取,否则通过HTTP远程下载。最后对.tar、.tar.gz等格式的文件解压缩后,调用shell命令行的chmod命令给文件增加a+x的权限。 def fetchFile(
url: String,
targetDir: File,
conf: SparkConf,
securityMgr: SecurityManager,
hadoopConf: Configuration,
timestamp: Long,
useCache: Boolean) {
val fileName = decodeFileNameInURI(new URI(url))
val targetFile = new File(targetDir, fileName)
val fetchCacheEnabled = conf.getBoolean("spark.files.useFetchCache", defaultValue = true)
if (useCache && fetchCacheEnabled) {
val cachedFileName = s"${url.hashCode}${timestamp}_cache"
val lockFileName = s"${url.hashCode}${timestamp}_lock"
val localDir = new File(getLocalDir(conf))
val lockFile = new File(localDir, lockFileName)
val lockFileChannel = new RandomAccessFile(lockFile, "rw").getChannel()
val lock = lockFileChannel.lock()
val cachedFile = new File(localDir, cachedFileName)
try {
if (!cachedFile.exists()) {
doFetchFile(url, localDir, cachedFileName, conf, securityMgr, hadoopConf)
}
} finally {
lock.release()
lockFileChannel.close()
}
copyFile(
url,
cachedFile,
targetFile,
conf.getBoolean("spark.files.overwrite", false)
)
} else {
doFetchFile(url, targetDir, fileName, conf, securityMgr, hadoopConf)
}
// Decompress the file if it's a .tar or .tar.gz
if (fileName.endsWith(".tar.gz") || fileName.endsWith(".tgz")) {
logInfo("Untarring " + fileName)
executeAndGetOutput(Seq("tar", "-xzf", fileName), targetDir)
} else if (fileName.endsWith(".tar")) {
logInfo("Untarring " + fileName)
executeAndGetOutput(Seq("tar", "-xf", fileName), targetDir)
}
// Make the file executable - That's necessary for scripts
FileUtil.chmod(targetFile.getAbsolutePath, "a+x")
// Windows does not grant read permission by default to non-admin users
// Add read permission to owner explicitly
if (isWindows) {
FileUtil.chmod(targetFile.getAbsolutePath, "u+r")
}
}
executeAndGetOutput
功能描述:执行一条command命令,并且获取它的输出。调用stdoutThread的join方法,让当前线程等待stdoutThread执行完成。 def executeAndGetOutput(
command: Seq[String],
workingDir: File = new File("."),
extraEnvironment: Map[String, String] = Map.empty,
redirectStderr: Boolean = true): String = {
val process = executeCommand(command, workingDir, extraEnvironment, redirectStderr)
val output = new StringBuilder
val threadName = "read stdout for " + command(0)
def appendToOutput(s: String): Unit = output.append(s).append("\n")
val stdoutThread = processStreamByLine(threadName, process.getInputStream, appendToOutput)
val exitCode = process.waitFor()
stdoutThread.join() // Wait for it to finish reading output
if (exitCode != 0) {
logError(s"Process $command exited with code $exitCode: $output")
throw new SparkException(s"Process $command exited with code $exitCode")
}
output.toString
}
memoryStringToMb
功能描述:将内存大小字符串转换为以MB为单位的整型值。 def memoryStringToMb(str: String): Int = {
(JavaUtils.byteStringAsBytes(str) / 1024 / 1024).toInt
}
extractHostPortFromSparkUrl
功能描述:从格式为spark://host:port的Spark URL中抽取出host和port。 @throws(classOf[SparkException])
def extractHostPortFromSparkUrl(sparkUrl: String): (String, Int) = {
try {
val uri = new java.net.URI(sparkUrl)
val host = uri.getHost
val port = uri.getPort
if (uri.getScheme != "spark" ||
host == null ||
port < 0 ||
(uri.getPath != null && !uri.getPath.isEmpty) || // uri.getPath returns "" instead of null
uri.getFragment != null ||
uri.getQuery != null ||
uri.getUserInfo != null) {
throw new SparkException("Invalid master URL: " + sparkUrl)
}
(host, port)
} catch {
case e: java.net.URISyntaxException =>
throw new SparkException("Invalid master URL: " + sparkUrl, e)
}
}
isDynamicAllocationEnabled
功能描述:判断是否启用了动态分配。 def isDynamicAllocationEnabled(conf: SparkConf): Boolean = {
val dynamicAllocationEnabled = conf.getBoolean("spark.dynamicAllocation.enabled", false)
dynamicAllocationEnabled &&
(!isLocalMaster(conf) || conf.getBoolean("spark.dynamicAllocation.testing", false))
}
关于《Spark内核设计的艺术 架构设计与实现》
经过近一年的准备,《 Spark内核设计的艺术 架构设计与实现 》一书现已出版发行,图书如图:
纸质版售卖链接如下:
电子版售卖链接如下: