Spark RDD概念学习系列之RDD的转换(十)-阿里云开发者社区

开发者社区> 大数据> 正文
登录阅读全文

Spark RDD概念学习系列之RDD的转换(十)

简介:

 RDD的转换

      

   Spark会根据用户提交的计算逻辑中的RDD的转换和动作来生成RDD之间的依赖关系,同时这个计算链也就生成了逻辑上的DAG。接下来以“Word Count”为例,详细描述这个DAG生成的实现过程。

   Spark Scala版本的Word Count程序如下:

1
2
3
4
5
1: <span style="color: #ff0000;"><strong>val file = spark.textFile("hdfs://...")</strong></span>
2: <strong><span style="color: #ff0000;">val counts = file.flatMap(line => line.split(" "))</span></strong>
3: <strong><span style="color: #ff0000;"> .map(word => (word, 1))</span></strong>
4: <strong><span style="color: #ff0000;"> .reduceByKey(_ + _)</span></strong>
5:<strong><span style="color: #ff0000;"> counts.saveAsTextFile("hdfs://...")</span></strong>

     file和counts都是RDD,其中file是从HDFS上读取文件并创建了RDD,而counts是在file的基础上通过flatMap、map和reduceByKey这三个RDD转换生成的。最后,counts调用了动作saveAsTextFile,用户的计算逻辑就从这里开始提交的集群进行计算。那么上面这5行代码的具体实现是什么呢?

 

     1)行1:spark是org.apache.spark.SparkContext的实例,它是用户程序和Spark的交互接口。spark会负责连接到集群管理者,并根据用户设置或者系统默认设置来申请计算资源,完成RDD的创建等。

  spark.textFile("hdfs://...")就完成了一个org.apache.spark.rdd.HadoopRDD的创建,并且完成了一次RDD的转换:通过map转换到一个org.apache.spark.rdd.MapPartitions-RDD。

  也就是说,file实际上是一个MapPartitionsRDD,它保存了文件的所有行的数据内容。

 

   2)行2:将file中的所有行的内容,以空格分隔为单词的列表,然后将这个按照行构成的单词列表合并为一个列表。最后,以每个单词为元素的列表被保存到MapPartitionsRDD

 

    3)行3:将第2步生成的MapPartitionsRDD再次经过map将每个单词word转为(word, 1)的元组。这些元组最终被放到一个MapPartitionsRDD中。

 

    4)行4:首先会生成一个MapPartitionsRDD,起到map端combiner的作用;然后会生成一个ShuffledRDD,它从上一个RDD的输出读取数据,作为reducer的开始;最后,还会生成一个MapPartitionsRDD,起到reducer端reduce的作用。

 

    5)行5:首先会生成一个MapPartitionsRDD,这个RDD会通过调用org.apache.spark.rdd.PairRDDFunctions#saveAsHadoopDataset向HDFS输出RDD的数据内容。最后,调用org.apache.spark.SparkContext#runJob向集群提交这个计算任务。

 

    RDD之间的关系可以从两个维度来理解:一个是RDD是从哪些RDD转换而来,也就是RDD的parent RDD(s)是什么;还有就是依赖于parent RDD(s)的哪些Partition(s)。这个关系,就是RDD之间的依赖,org.apache.spark.Dependency。根据依赖于parent RDD(s)的Partitions的不同情况,Spark将这种依赖分为两种,一种是宽依赖,一种是窄依赖。  

    RDD的依赖关系(宽依赖和窄依赖)

 

 

如,假设,现在如下

 

 

 

 

所以,

比如,我这里是刚好是4台worker1、worker2、worker3、worker4。还有1台Master。

 

 soga,

1
<span style="color: #ff0000;"><strong>val file = spark.textFile("hdfs://...")<br></strong></span>

   1)行1:spark是org.apache.spark.SparkContext的实例,它是用户程序和Spark的交互接口。spark会负责连接到集群管理者,并根据用户设置或者系统默认设置来申请计算资源,完成RDD的创建等。

  spark.textFile("hdfs://...")就完成了一个org.apache.spark.rdd.HadoopRDD的创建,并且完成了一次RDD的转换:通过map转换到一个org.apache.spark.rdd.MapPartitions-RDD。

  也就是说,file实际上是一个MapPartitionsRDD,它保存了文件的所有行的数据内容。

 

         想要成为高手,一定要多看源码,看上几十遍都太少了,包括看上10个版本的源码。无论是hadoop、还是spark

 

1
<span style="color: #ff0000;"><strong>val counts = file.flatMap(line => line.split(" "))</strong></span><br> 2)<strong>行2</strong>:将file中的所有行的内容,以空格分隔为单词的列表,然后将这个按照行构成的单词列表合并为一个列表。最后,以每个单词为元素的列表被保存到<strong>MapPartitionsRDD</strong>。<br><br><br>

1
<span style="color: #ff0000;"><strong>.map(word => (word, 1))</strong></span><br>    3)<strong>行3</strong>:将第2步生成的MapPartitionsRDD再次经过map将每个单词word转为(word, 1)的元组。这些元组最终被放到一个<strong>MapPartitionsRDD</strong>中。

 

 

 

 

 

 

至此,windows本地,已经完成了。

 

下面是在网络里了。

         注意啦! 分区是计算概念,分片是数据概念。

有4台worker,每台都在自己内存计算。

1
<strong>.reduceByKey(_ + _)</strong>

 4)行4:首先会生成一个MapPartitionsRDD,起到map端combiner的作用;然后会生成一个ShuffledRDD,它从上一个RDD的输出读取数据,作为reducer的开始;最后,还会生成一个MapPartitionsRDD,起到reducer端reduce的作用。

 

 

 

 

 

总结:

 

第一个stage :

         HadoopRDD  ->   MapPartitionRDD  ->   MapPartitionsRDD  ->  MapPartitionsRDD  ->  MapPartitionsRDD

 

第二个stage :   

  Stage shuffledRDD   ->  MapPartitionsRDD

 



本文转自大数据躺过的坑博客园博客,原文链接:http://www.cnblogs.com/zlslch/p/5723764.html,如需转载请自行联系原作者

版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。

分享: