RDD 算子_转换_重分区 | 学习笔记

简介: 快速学习 RDD 算子_转换_重分区

开发者学堂课程【大数据 Spark 2020版(知识精讲与实战演练)第三阶段 RDD 算子_转换_重分区】学习笔记,与课程紧密联系,让用户快速学习知识。

课程地址:https://developer.aliyun.com/learning/course/689/detail/11978


RDD 算子_转换_重分区


内容介绍

一、前言

二、介绍 repartition coalesce 的用法

三、总结

 

一、前言

要想让 RDD 运行的更快,就要控制它的并行度,或者想更节省集群资源,也要控制他的并行度。控制分区数是控制并行度的手段之一。控制分区数还有很多其他的作用。接下来去了解两个算计,一个叫做 repartition ,一个叫做 coalesce ,它们都用于改变分区数。

 

二、介绍 repartition coalesce 的用法

我们可以在 RDD 中使用改变分区数,现在我们直接用代码来说明。首先,创建新方法和集合。我们是有两个算子可以改变分区数的,是 repartition 和 coalesce ,所以我们要创建两个集合。使用repartition 可以指定任何的分区数,在 repartition 后可以获取到所有的partitions ,然后获取 partitions 的个数,接下来运行下 repartition 的代码,看看结果的转变,

代码如下:

@Test

def partitioning() : Unit = {

val rdd = sc.parallelize(Seq(1,2,3,4,5),numSlices = 2)

println(rdd.repartition(numpartitions = 5).partitions.size)

}

然后我们将分区数5改为1,再运行一遍,看看结果,代码如下:

@Test

def partitioning() : Unit = {

val rdd = sc.parallelize(Seq(1,2,3,4,5),numSlices = 2)

println(rdd.repartition(numpartitions = 1).partitions.size)

}

根据这个结果可以说明的是, repartition 改变的分区数是可大可小的。

我们另一个算子叫 coalesce ,修改代码中的部分内容,然后运行代码,看看结果是怎么样的,代码如下:

@Test

def partitioning() : Unit = {

val rdd = sc.parallelize(Seq(1,2,3,4,5),numSlices = 2)

println(rdd.coalesce(numpartitions = 1).partitions.size)

}

Repartition 是重分区, coalesce 是减少或者合并。然后我们将分区数1改为5,我们再次运行代码,看看结果是怎么样的,

代码如下:

@Test

def partitioning() : Unit = {

val rdd = sc.parallelize(Seq(1,2,3,4,5),numSlices = 2)

println(rdd.coalesce(numpartitions = 5).partitions.size)

}

可以发现结果不是5而是2,原因是5大于原始的分区数。目前可能会认为 coalesce 只能减少分区数,不能增大分区数,其实不是这样的,我们稍稍做一点点改动,即在允许 coalesce 的时候,允许进行 Shuffle 操作,然后再次运行一下,看看结果是如何,

代码如下:

@Test

def partitioning() : Unit = {

val rdd = sc.parallelize(Seq(1,2,3,4,5),numSlices = 2)

println(rdd.coalesce(numpartitions = 5,shuffle = true).partitions.size)

}

1.png

可以发现分区的结果变为5了。也就是说 coalesce 没有默认进行 Shuffle 操作,而 repartition 是默认可以进行 Shuffle 操作的重分区,因此 repartition 可以把分区增大,也可以把分区变小,但 coalesce 在默认情况下,只能把分区数变小。

 

三、总结

简单总结一下, repartition 进行重分区的时,默认 Shuffle 操作, coalesce进行重分区的时,没有默认 Shuffle 操作,即 coalesce 默认不能增大分区数,这是它们的区别。这就是操作分区数的两个算子。

相关文章
|
负载均衡 Java 应用服务中间件
Caddy Web服务器深度解析与对比:Caddy vs. Nginx vs. Apache
Caddy Web服务器深度解析与对比:Caddy vs. Nginx vs. Apache
1485 0
|
消息中间件 缓存 资源调度
在 Flink 算子中使用多线程如何保证不丢数据?
本人通过分析痛点、同步批量请求优化为异步请求、多线程 Client 模式、Flink 算子内多线程实现以及总结四部分帮助大家理解 Flink 中使用多线程的优化及在 Flink 算子中使用多线程如何保证不丢数据。
在 Flink 算子中使用多线程如何保证不丢数据?
|
10月前
|
数据库连接 数据库 DataX
数据接入方案
数仓平台可直连或通过从库、堡垒机、FTP/SFTP等方式接入业务数据库,需提供可读用户权限及相应连接方式。若无法直连,可通过提供数据文件或脚本处理实现数据导入。
487 7
数据接入方案
|
算法
|
分布式计算 Hadoop 大数据
Spark 与 Hadoop 的大数据之战:一场惊心动魄的技术较量,决定数据处理的霸权归属!
【8月更文挑战第7天】无论是 Spark 的高效内存计算,还是 Hadoop 的大规模数据存储和处理能力,它们都为大数据的发展做出了重要贡献。
211 2
|
索引 Python
Pandas学习笔记之Dataframe
Pandas学习笔记之Dataframe
|
Java 调度 流计算
基于多线程的方式优化 FLink 程序
这篇内容介绍了线程的基本概念和重要性。线程是程序执行的最小单位,比进程更细粒度,常用于提高程序响应性和性能。多线程可以实现并发处理,利用多核处理器,实现资源共享和复杂逻辑。文章还讨论了线程的五种状态(NEW、RUNNABLE、BLOCKED、WAITING、TIMED_WAITING和TERMINATED)以及如何在Java中创建和停止线程。最后提到了两种停止线程的方法:使用标识和中断机制。
452 5
|
Java 开发工具
JVM参数太多?一网打尽常用JVM参数!
JVM参数太多?一网打尽常用JVM参数!
|
监控 关系型数据库 MySQL
Flink CDC产品常见问题之look up hint 没有生效如何解决
Flink CDC(Change Data Capture)是一个基于Apache Flink的实时数据变更捕获库,用于实现数据库的实时同步和变更流的处理;在本汇总中,我们组织了关于Flink CDC产品在实践中用户经常提出的问题及其解答,目的是辅助用户更好地理解和应用这一技术,优化实时数据处理流程。
1286 1
|
SQL 分布式计算 资源调度
一文看懂 Hive 优化大全(参数配置、语法优化)
以下是对提供的内容的摘要,总长度为240个字符: 在Hadoop集群中,服务器环境包括3台机器,分别运行不同的服务,如NodeManager、DataNode、NameNode等。集群组件版本包括jdk 1.8、mysql 5.7、hadoop 3.1.3和hive 3.1.2。文章讨论了YARN的配置优化,如`yarn.nodemanager.resource.memory-mb`、`yarn.nodemanager.vmem-check-enabled`和`hive.map.aggr`等参数,以及Map-Side聚合优化、Map Join和Bucket Map Join。
898 0