MapReduce详解(2)

简介: MapReduce详解

3.2 InputFormat数据输入

2.FileInputFormat操作流程


(1)找到你数据存储的目录。


(2)开始遍历处理(规划切片)目录下的每一个文件


(3)遍历第一个文件xx.txt


a)获取文件大小fs.sizeOf(xx.txt)


b)默认情况下,切片大小=blocksize


c)开始切,形成第1个切片:ss.txt—0:128M 第2个切片ss.txt—128:256M 第3个切片ss.txt—256M:300M(每次切片时,都要判断切完剩下的部分是否大于块的1.1倍,不大于1.1倍就划分一块切片)  剩下部分大于128m但是小于140m


e)将切片信息写到一个切片规划文件中


f)数据切片只是在逻辑上对输入数据进行分片,并不会再磁盘上将其切分成分片进行存储。使用InputSplit只记录了分片的元数据信息,比如起始位置、长度以及所在的节点列表等


h)注意:block是HDFS物理上存储的数据,切片是对数据逻辑上的划分


(4)提交切片规划文件到yarn上,yarn上的MrAppMaster就可以根据切片规划文件计算开启maptask个数。


3.2.2 FileInputFormat切片机制

1.FileInputFormat中默认的切片机制


(1)简单地按照文件的内容长度进行切片


(2)切片大小,默认等于block大小


(3)切片时不考虑数据集整体,而是逐个针对每一个文件单独切片


比如待处理数据有两个文件:


file1.txt    320M
file2.txt    10M


经过FileInputFormat的切片机制运算后,形成的切片信息如下:  


file1.txt.split1--  0~128
file1.txt.split2--  128~256
file1.txt.split3--  256~320
file2.txt.split1--  0~10M


3.2.3 CombineTextInputFormat切片机制

关于大量小文件的优化策略


默认情况下TextInputformat对任务的切片机制是按文件规划切片,不管文件多小,都会是一个单独的切片,都会交给一个maptask,这样如果有大量小文件,就会产生大量的maptask,处理效率极其低下。


1.优化策略


(1)最好的办法,在数据处理系统的最前端(预处理/采集),将小文件先合并成大文件,再上传到HDFS做后续分析。


(2)补救措施:如果已经是大量小文件在HDFS中了,可以使用另一种InputFormat来做切片(CombineTextInputFormat),它的切片逻辑跟TextFileInputFormat不同:它可以将多个小文件从逻辑上规划到一个切片中,这样,多个小文件就可以交给一个maptask。


(3)优先满足最小切片大小,不超过最大切片大小


CombineTextInputFormat.setMaxInputSplitSize(job, 4194304); // 4m  
CombineTextInputFormat.setMinInputSplitSize(job, 2097152); // 2m


举例:0.5m+1m+0.3m+5m=2m + 4.8m=2m + 4m + 0.8m


2.具体实现步骤


//  如果不设置InputFormat,它默认用的是TextInputFormat.class
job.setInputFormatClass(CombineTextInputFormat.class)
CombineTextInputFormat.setMaxInputSplitSize(job, 4194304);// 4m
CombineTextInputFormat.setMinInputSplitSize(job, 2097152);// 2m


3.3 MapTask工作机制

3.3.1 并行度决定机制

1.问题引出


maptask的并行度决定map阶段的任务处理并发度,进而影响到整个job的处理速度。那么,mapTask并行任务是否越多越好呢?


2.MapTask并行度决定机制


一个job的map阶段MapTask并行度(个数),由客户端提交job时的切片个数决定,如图所示。


其.png


3.3.2 MapTask工作机制

MapTask工作机制如图所示

前.png



(1)Read阶段:Map Task通过用户编写的RecordReader,从输入InputSplit中解析出一个个key/value。


(2)Map阶段:该节点主要是将解析出的key/value交给用户编写map()函数处理,并产生一系列新的key/value。


(3)Collect收集阶段:在用户编写map()函数中,当数据处理完成后,一般会调用context.write,context.write底层 OutputCollector.collect()输出结果。在该函数内部,它会将生成的key/value分区(调用Partitioner),并写入一个环形内存缓冲区中。


(4)Spill阶段:即“溢写”,当环形缓冲区满后,MapReduce会将数据写到本地磁盘上,生成一个临时文件。需要注意的是,将数据写入本地磁盘之前,先要对数据进行一次本地排序,并在必要时对数据进行合并等操作。


(5)Combine阶段:当所有数据处理完成后,MapTask对所有临时文件进行一次合并,以确保最终只会生成一个数据文件。


在进行文件合并过程中,MapTask以分区为单位进行合并。对于某个分区,它将采用多轮递归合并的方式。每轮合并io.sort.factor(默认100)个文件,并将产生的文件重新加入待合并列表中,对文件排序后,重复以上过程,直到最终得到一个大文件。


3.4 Shuffle机制

3.4.1 Shuffle机制(洗牌)

Mapreduce确保每个reducer的输入都是按key排序的。系统执行排序的过程(即将mapper输出作为输入传给reducer)称为shuffle,如图所示。


我.png


3.4.2 Partition分区

分区:把数据扎堆存放


问题引出:要求将统计结果按照条件输出到不同文件中(分区)。比如:将统计结果按照手机归属地不同省份输出到不同文件中(分区)


默认partition分区 hello-->hash%reducetask数量--2 ;0,1

public class HashPartitioner<K, V> extends Partitioner<K, V> {
  public int getPartition(K key, V value, int numReduceTasks) {
    return (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks;
  }
}


默认分区是根据key的hashCode对reduceTasks个数取模得到的。用户没法控制哪个key存储到哪个分区。


2.自定义Partitioner步骤


(1)自定义类继承Partitioner,重写getPartition()方法


public class ProvincePartitioner extends Partitioner<Text, FlowBean> {
@Override
public int getPartition(Text key, FlowBean value, int numPartitions) {
// 1 获取电话号码的前三位
String preNum = key.toString().substring(0, 3);
int partition = 4;
// 2 判断是哪个省
if ("136".equals(preNum)) {
partition = 0;
}else if ("137".equals(preNum)) {
partition = 1;
}else if ("138".equals(preNum)) {
partition = 2;
}else if ("139".equals(preNum)) {
partition = 3;
}
return partition;
}
}

(2)在job驱动中,设置自定义partitioner:


job.setPartitionerClass(CustomPartitioner.class);


(3)自定义partition后,要根据自定义partitioner的逻辑设置相应数量的reduce task


job.setNumReduceTasks(5);


注意

reduceTask的个数决定了有几个文件!!


如果reduceTask的数量> getPartition的结果数,则会多产生几个空的输出文件part-r-000xx;


如果1<reduceTask的数量<getPartition的结果数,则有一部分分区数据无处安放,会Exception;


如果reduceTask的数量=1,则不管mapTask端输出多少个分区文件,最终结果都交给这一个reduceTask,最终也就只会产生一个结果文件 part-r-00000;


例如:假设自定义分区数为5,则


(1)job.setNumReduceTasks(1);会正常运行,只不过会产生一个输出文件


(2)job.setNumReduceTasks(2);会报错


(3)job.setNumReduceTasks(6);大于5,程序会正常运行,会产生空文件


3.4.3 WritableComparable排序

排序是MapReduce框架中最重要的操作之一。Map Task和Reduce Task均会对数据(按照key)进行排序。该操作属于Hadoop的默认行为。任何应用程序中的数据均会被排序,而不管逻辑上是否需要。默认排序是按照字典顺序排序。


对于Map Task,它会将处理的结果暂时放到一个缓冲区中,当缓冲区使用率达到一定阈值后,再对缓冲区中的数据进行一次排序,并将这些有序数据写到磁盘上,而当数据处理完毕后,它会对磁盘上所有文件进行一次合并,以将这些文件合并成一个大的有序文件。


对于Reduce Task,它从每个Map Task上远程拷贝相应的数据文件,如果文件大小超过一定阈值,则放到磁盘上,否则放到内存中。如果磁盘上文件数目达到一定阈值,则进行一次合并以生成一个更大文件;如果内存中文件大小或者数目超过一定阈值,则进行一次合并后将数据写到磁盘上。当所有数据拷贝完毕后,Reduce Task统一对内存和磁盘上的所有数据进行一次合并。


每个阶段的默认排序


1.排序的分类


(1)部分排序:


MapReduce根据输入记录的键对数据集排序。保证输出的每个文件内部排序。


(2)全排序:


如何用Hadoop产生一个全局排序的文件?最简单的方法是使用一个分区。但该方法在处理大型文件时效率极低,因为一台机器必须处理所有输出文件,从而完全丧失了MapReduce所提供的并行架构。


替代方案:首先创建一系列排好序的文件;其次,串联这些文件;最后,生成一个全局排序的文件。主要思路是使用一个分区来描述输出的全局排序。例如:可以为上述文件创建3个分区,在第一分区中,记录的单词首字母a-g,第二分区记录单词首字母h-n, 第三分区记录单词首字母o-z。


2.自定义排序WritableComparable


(1)原理分析


bean对象实现WritableComparable接口重写compareTo方法,就可以实现排序


@Override
public int compareTo(FlowBean o) {
// 倒序排列,从大到小
return this.sumFlow > o.getSumFlow() ? -1 : 1;
if(this.sumFlow==o.getSumFlow()){
This.downFlow>o.getDownFlow() ? -1 :1
}
}

3.5 ReduceTask工作机制

我.png


1.设置ReduceTask并行度(个数)


reducetask的并行度同样影响整个job的执行并发度和执行效率,但与maptask的并发数由切片数决定不同,Reducetask数量的决定是可以直接手动设置:


//默认值是1,手动设置为4
job.setNumReduceTasks(4);


2.注意


(1)reducetask=0 ,表示没有reduce阶段,输出文件个数和map个数一致。


(2)reducetask默认值就是1,所以输出文件个数为一个。


(3)如果数据分布不均匀,就有可能在reduce阶段产生数据倾斜


(4)reducetask数量并不是任意设置,还要考虑业务逻辑需求,有些情况下,需要计算全局汇总结果,就只能有1个reducetask。


(5)具体多少个reducetask,需要根据集群性能而定。


(6)如果分区数不是1,但是reducetask为1,是否执行分区过程。答案是:不执行分区过程。因为在maptask的源码中,执行分区的前提是先判断reduceNum个数是否大于1。不大于1肯定不执行。


3.实验:测试reducetask多少合适。


(1)实验环境:1个master节点,16个slave节点:CPU:8GHZ,内存: 2G


(2)实验结论:


表4-3 改变reduce task (数据量为1GB)


Map task =16

Reduce task

1

5

10

15

16

20

25

30

45

60

总时间

892

146

110

92

88

100

128

101

145

104



4.ReduceTask工作机制


(1)Copy阶段:ReduceTask从各个MapTask上远程拷贝一片数据,并针对某一片数据,如果其大小超过一定阈值,则写到磁盘上,否则直接放到内存中。


(2)Merge阶段:在远程拷贝数据的同时,ReduceTask启动了两个后台线程对内存和磁盘上的文件进行合并,以防止内存使用过多或磁盘上文件过多。


(3)Sort阶段:按照MapReduce语义,用户编写reduce()函数输入数据是按key进行聚集的一组数据。为了将key相同的数据聚在一起,Hadoop采用了基于排序的策略。由于各个MapTask已经实现对自己的处理结果进行了局部排序,因此,ReduceTask只需对所有数据进行一次归并排序即可。


(4)Reduce阶段:reduce()函数将计算结果写到HDFS上。


相关文章
|
分布式计算 数据处理
38 MAPREDUCE中的其他应用
38 MAPREDUCE中的其他应用
62 0
|
分布式计算
37 MAPREDUCE中的DistributedCache应用
37 MAPREDUCE中的DistributedCache应用
44 0
|
存储 分布式计算 监控
19 为什么要MAPREDUCE?
19 为什么要MAPREDUCE?
72 0
|
数据采集 机器学习/深度学习 存储
E-MapReduce
E-MapReduce(简称EMR)是阿里云提供的一项大数据处理服务,它基于开源的 Apache Hadoop 和 Apache Spark 构建,并提供了易于使用的 Web 界面和 API 接口,方便用户快速创建、调度和管理大数据处理作业。
266 2
|
分布式计算 并行计算 大数据
初识MapReduce
初识MapReduce
95 0
|
缓存 分布式计算 NoSQL
MapReduce(二)
MapReduce(二)
110 0
MapReduce(二)
|
存储 分布式计算 资源调度
|
存储 缓存 分布式计算
MapReduce —— 历久而弥新(1)
MapReduce —— 历久而弥新(1)
184 0
MapReduce —— 历久而弥新(1)
|
分布式计算 Hadoop Java
MapReduce使用
MapReduce使用
115 0
MapReduce使用
|
存储 分布式计算 监控
MapReduce —— 历久而弥新(2)
MapReduce —— 历久而弥新(2)
151 0