开发者学堂课程【大数据 Spark 2020版(知识精讲与实战演练)第三阶段: RDD 算子_转换_重分区】学习笔记,与课程紧密联系,让用户快速学习知识。
课程地址:https://developer.aliyun.com/learning/course/689/detail/11978
RDD 算子_转换_重分区
内容介绍
一、前言
二、介绍 repartition 和coalesce 的用法
三、总结
一、前言
要想让 RDD 运行的更快,就要控制它的并行度,或者想更节省集群资源,也要控制他的并行度。控制分区数是控制并行度的手段之一。控制分区数还有很多其他的作用。接下来去了解两个算计,一个叫做 repartition ,一个叫做 coalesce ,它们都用于改变分区数。
二、介绍 repartition 和 coalesce 的用法
我们可以在 RDD 中使用改变分区数,现在我们直接用代码来说明。首先,创建新方法和集合。我们是有两个算子可以改变分区数的,是 repartition 和 coalesce ,所以我们要创建两个集合。使用repartition 可以指定任何的分区数,在 repartition 后可以获取到所有的partitions ,然后获取 partitions 的个数,接下来运行下 repartition 的代码,看看结果的转变,
代码如下:
@Test
def partitioning() : Unit = {
val rdd = sc.parallelize(Seq(1,2,3,4,5),numSlices = 2)
println(rdd.repartition(numpartitions = 5).partitions.size)
}
然后我们将分区数5改为1,再运行一遍,看看结果,代码如下:
@Test
def partitioning() : Unit = {
val rdd = sc.parallelize(Seq(1,2,3,4,5),numSlices = 2)
println(rdd.repartition(numpartitions = 1).partitions.size)
}
根据这个结果可以说明的是, repartition 改变的分区数是可大可小的。
我们另一个算子叫 coalesce ,修改代码中的部分内容,然后运行代码,看看结果是怎么样的,代码如下:
@Test
def partitioning() : Unit = {
val rdd = sc.parallelize(Seq(1,2,3,4,5),numSlices = 2)
println(rdd.coalesce(numpartitions = 1).partitions.size)
}
Repartition 是重分区, coalesce 是减少或者合并。然后我们将分区数1改为5,我们再次运行代码,看看结果是怎么样的,
代码如下:
@Test
def partitioning() : Unit = {
val rdd = sc.parallelize(Seq(1,2,3,4,5),numSlices = 2)
println(rdd.coalesce(numpartitions = 5).partitions.size)
}
可以发现结果不是5而是2,原因是5大于原始的分区数。目前可能会认为 coalesce 只能减少分区数,不能增大分区数,其实不是这样的,我们稍稍做一点点改动,即在允许 coalesce 的时候,允许进行 Shuffle 操作,然后再次运行一下,看看结果是如何,
代码如下:
@Test
def partitioning() : Unit = {
val rdd = sc.parallelize(Seq(1,2,3,4,5),numSlices = 2)
println(rdd.coalesce(numpartitions = 5,shuffle = true).partitions.size)
}
可以发现分区的结果变为5了。也就是说 coalesce 没有默认进行 Shuffle 操作,而 repartition 是默认可以进行 Shuffle 操作的重分区,因此 repartition 可以把分区增大,也可以把分区变小,但 coalesce 在默认情况下,只能把分区数变小。
三、总结
简单总结一下, repartition 进行重分区的时,默认 Shuffle 操作, coalesce进行重分区的时,没有默认 Shuffle 操作,即 coalesce 默认不能增大分区数,这是它们的区别。这就是操作分区数的两个算子。