版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/beliefer/article/details/77450157
注:本文是为了配合《Spark内核设计的艺术 架构设计与实现》一书的内容而编写,目的是为了节省成本、方便读者查阅。
书中附录H的内容都在本文呈现。
RpcUtils是RpcEnv中经常用到的工具类,这里讲简要介绍其中提供的方法。lookupRpcTimeout
功能描述:根据提供的配置属性列表获取Rpc查找的超时时间,spark.rpc.lookupTimeout属性的优先级更高 。RpcTimeout是一个伴生对象,此处实际调用了RpcTimeout的apply方法,感兴趣的读者自行阅读其实现。 def lookupRpcTimeout(conf: SparkConf): RpcTimeout = {
RpcTimeout(conf, Seq("spark.rpc.lookupTimeout", "spark.network.timeout"), "120s")
}
makeDriverRef
功能描述:根据RpcEndpoint的名称向远端的NettyRpcEnv询问获取相关RpcEndpoint的RpcEndpointRef 。 def makeDriverRef(name: String, conf: SparkConf, rpcEnv: RpcEnv): RpcEndpointRef = {
val driverHost: String = conf.get("spark.driver.host", "localhost")
val driverPort: Int = conf.getInt("spark.driver.port", 7077)
Utils.checkHost(driverHost, "Expected hostname")
rpcEnv.setupEndpointRef(RpcAddress(driverHost, driverPort), name)
}
numRetries
功能描述:从SparkConf中获取的Rpc最大重新连接次数,可以使用spark.rpc.numRetries属性进行配置,默认为3次。 def numRetries(conf: SparkConf): Int = {
conf.getInt("spark.rpc.numRetries", 3)
}
retryWaitMs
功能描述:从SparkConf中获取的Rpc每次重新连接需要等待的毫秒数,可以使用spark.rpc.retry.wait属性进行配置,默认值为3秒。 def retryWaitMs(conf: SparkConf): Long = {
conf.getTimeAsMs("spark.rpc.retry.wait", "3s")
}
askRpcTimeout
功能描述:从SparkConf中获取Rpc的ask操作的默认超时时间,可以使用spark.rpc.askTimeout或者spark.network.timeout属性进行配置,默认值为120秒。其实现使用了伴生对象RpcTimeout的apply方法,作用为从属性列表中挑出第一个属性的值作为Rpc的ask操作的默认超时时间,因此spark.rpc.askTimeout属性的优先级更高。 def askRpcTimeout(conf: SparkConf): RpcTimeout = {
RpcTimeout(conf, Seq("spark.rpc.askTimeout", "spark.network.timeout"), "120s")
}
maxMessageSizeBytes
功能描述:从SparkConf中获取最大的Rpc消息的大小,单位是MB。 def maxMessageSizeBytes(conf: SparkConf): Int = {
val maxSizeInMB = conf.getInt("spark.rpc.message.maxSize", 128)
if (maxSizeInMB > MAX_MESSAGE_SIZE_IN_MB) {
throw new IllegalArgumentException(
s"spark.rpc.message.maxSize should not be greater than $MAX_MESSAGE_SIZE_IN_MB MB")
}
maxSizeInMB * 1024 * 1024
}
关于《Spark内核设计的艺术 架构设计与实现》
经过近一年的准备,《 Spark内核设计的艺术 架构设计与实现 》一书现已出版发行,图书如图:
纸质版售卖链接如下:
电子版售卖链接如下: