1.3 Shuffle机制
1.3.1 Shuffle机制
Map方法之后,Reduce方法之前的数据处理过程称之为Shuffle。
1.3.2 Partition
1、问题引出
要求将统计结果按照条件输出到不同文件中(分区)。比如:将统计结果按照收集归属地不同省份输出到不同文件中。
2、默认Partitioner分区
默认分区时根据key的hashCode对ReduceTasks个数取模得到的。用户没法控制哪个key储存到哪个分区
3、自定义Partitioner步骤
(1)自定义类继承Partitioner,重写getPartition()方法
public class CustomPartitioner extends Partitioner<Text, FlowBean> { @Override public int getPartition(Text key, FlowBean value, int numPartitions) { // 控制分区代码逻辑 … … return partition; } }
(2)在驱动中,设置自定义Partitioner
job.setPartitionerClass(CustomPartitioner.class);
(3)自定义Partition后,要根据自定义Partitioner的逻辑设置相对应数量的ReduceTask
job.setNumReduceTasks(5); //我的逻辑分区数量是5
(4)分区总结
(a)如果ReduceTask的数量大于getPartition的结果数,则会多产生几个空的输出文件part-r-000xx;
(b)如果1 小于 ReduceTask的数量 小于 getPartition的结果数,则有一部分分区数据无处安放,会Exception;
(c)如 果ReduceTask的数量=1,则不管MapTask端输出多少个分区文件,最终结果都交给这一个
ReduceTask,最终也就只会产生一个结果文件 part-r-00000;
(d)分区号必须从零开始,逐一累加。
1.3.3 Partition 分区案例实操
1、需求
将统计结果按照手机归属地不同省份输出到不同文件中(分区)
(1)输入数据文件(百度网盘自取数据文件)
链接:https://pan.baidu.com/s/1i2FdQTWFijkrr29n9xAj8Q
提取码:zhm6
(2)手机号 136、137、138、139 开头都分别放到一个独立的 4 个文件中,其他开头的放到
一个文件中。
2、需求分析
3、在序列化实操的基础上,增加一个分区类
package org.example.fenqu; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Partitioner; /** * @ClassName MyPartition * @Description TODO * @Author Zouhuiming * @Date 2023/5/21 11:29 * @Version 1.0 */ public class MyPartition extends Partitioner<Text,FlowBean> { @Override public int getPartition(Text text, FlowBean flowBean, int i) { String phone=text.toString(); String prePhone=phone.substring(0,3); //定义分区编号 int partition; if ("136".equals(prePhone)){ partition=0; }else if ("137".equals(prePhone)){ partition=1; }else if ("138".equals(prePhone)){ partition=2; }else if ("139".equals(prePhone)){ partition=3; } else { partition=4; } return partition; } }
(4)在驱动函数中增加自定义数据分区设置和ReduceTask设置
package org.example.fenqu; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Partitioner; /** * @ClassName MyPartition * @Description TODO * @Author Zouhuiming * @Date 2023/5/21 11:29 * @Version 1.0 */ public class MyPartition extends Partitioner<Text,FlowBean> { @Override public int getPartition(Text text, FlowBean flowBean, int i) { String phone=text.toString(); String prePhone=phone.substring(0,3); //定义分区编号 int partition; if ("136".equals(prePhone)){ partition=0; }else if ("137".equals(prePhone)){ partition=1; }else if ("138".equals(prePhone)){ partition=2; }else if ("139".equals(prePhone)){ partition=3; } else { partition=4; } return partition; } }
1.3.4 WritableComparable排序
1·、排序概述
排序是MapReduce框架中最重要的操作之一。
MapTask和ReduceTask均会对数据按照key进行排序。该操作属于Hadoop默认行为。任何应用程序中的数据均会被排序,而不管逻辑上是否需要。
默认排序是按照字典顺序排序,且实现该排序的方法是快速排序
对于MapTask,它将会处理的结果暂时存放到环形缓冲区中,当环形缓冲区使用率达到一定阙值后,再对缓冲区中的数据进行一次快速排序,并将这些有序数据溢写到磁盘上,而当数据处理完毕后,它会对磁盘上所以文件进行归并排序。
对于ReduceTask,它从每个MapTask上远程拷贝相应的数据文件,如果文件大小超过一定阙值,则溢写到磁盘上,否者储存在内存中。如果磁盘上文件数目达到一定阙值,则进行一次归并排序以生成一个更大的文件;如果内存中文件大小或者数目超过一定的阙值,则进行一次合并后将数据溢写到磁盘上。当所以数据拷贝完毕后,ReduceTask统一对内存和磁盘上的所有数据进行一次归并排序。
2、排序的分类
(1)部分排序
MapReduce根据输入记录的键值对数据集进行排序。保证输出的每个文件内部有序。
(2)全排序
最终输出结果只有一个文件,且文件内部有序。实现方式是只设置一个ReduceTask。但该方法在处理大型文件时效率极低,因为一台机器处理所有文件,完全丧失了MapReduce所提供的并行架构。
(3)辅助排序(GroupingComparator)
在Reduce端对key进行分组。应用于:在接受的key为bean对象的时候,想让一个或几个这段相同(全部字段比较不相同)的key进入到同一个Reduce方法,可以采用分组排序。
(4)二次排序
在自定义排序过程中,如果compareTo中的判断条件为两个即为二次排序。
3、自定义排序WritableComparable 原理分析
bean对象作为key传输,需要实现WritableComparable接口重写compareTo()方法,就可以实现排序
@Override public int compareTo(FlowBean bean) { int result; // 按照总流量大小,倒序排列 if (this.sumFlow > bean.getSumFlow()) { result = -1; }else if (this.sumFlow < bean.getSumFlow()) { result = 1; }else { result = 0; } return result; }
1.3.5 WritableComparable排序案例实操(全排序)
1、需求
根据序列化案例(上一篇文章最后一个案例)产生的结果再次对总流量进行倒序排序。
2、需求分析
3、代码实现
1、FlowBean对象在需求1基础上增加了比较功能
package org.example.paixu; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableComparable; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; /** * @ClassName FlowBean * @Description TODO * @Author Zouhuiming * @Date 2023/5/20 10:13 * @Version 1.0 */ public class FlowBean implements WritableComparable<FlowBean> { private Long upFlow; private Long downFlow; private long sumFlow; public FlowBean() { } public Long getUpFlow() { return upFlow; } public void setUpFlow(Long upFlow) { this.upFlow = upFlow; } public Long getDownFlow() { return downFlow; } public void setDownFlow(Long downFlow) { this.downFlow = downFlow; } public void setSumFlow() { this.sumFlow = this.upFlow+this.downFlow; } //实现序列化 @Override public void write(DataOutput dataOutput) throws IOException { dataOutput.writeLong(upFlow); dataOutput.writeLong(downFlow); dataOutput.writeLong(sumFlow); } //实现反序列化 @Override public void readFields(DataInput dataInput) throws IOException { this.upFlow=dataInput.readLong(); this.downFlow=dataInput.readLong(); this.sumFlow=dataInput.readLong(); } @Override public String toString() { return upFlow+"\t"+downFlow+"\t"+sumFlow; } @Override public int compareTo(FlowBean o) { //倒序排列 return this.sumFlow>o.sumFlow?-1:1; } }
2、编写Mapper类
package org.example.paixu; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import java.io.IOException; /** * @ClassName FlowMapper * @Description TODO * @Author Zouhuiming * @Date 2023/5/20 10:21 * @Version 1.0 */ public class FlowMapper extends Mapper<LongWritable,Text,FlowBean,Text> { private Text outV=new Text(); private FlowBean outK=new FlowBean(); @Override protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, FlowBean, Text>.Context context) throws IOException, InterruptedException { String s = value.toString(); String[] split = s.split("\t"); String phone=split[0]; outV.set(phone); outK.setUpFlow(Long.parseLong(split[1])); outK.setDownFlow(Long.parseLong(split[2])); outK.setSumFlow(); context.write(outK,outV); } }
3、编写Reducer类
package org.example.paixu; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; import java.io.IOException; /** * @ClassName FlowReducer * @Description TODO * @Author Zouhuiming * @Date 2023/5/20 10:29 * @Version 1.0 */ public class FlowReducer extends Reducer <FlowBean, Text,Text,FlowBean>{ @Override protected void reduce(FlowBean key, Iterable<Text> values, Reducer<FlowBean, Text, Text, FlowBean>.Context context) throws IOException, InterruptedException { for (Text value : values) { context.write(value,key); } } }
4、编写Driver类
package org.example.paixu; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import java.io.IOException; /** * @ClassName FlowDriver * @Description TODO * @Author Zouhuiming * @Date 2023/5/20 10:38 * @Version 1.0 */ public class FlowDriver { public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException { //1、获取job对象 Configuration configuration=new Configuration(); Job job=Job.getInstance(configuration); //2、关联本Driver类 job.setJarByClass(FlowDriver.class); //3、关联Mapper和Reducer job.setMapperClass(FlowMapper.class); job.setReducerClass(FlowReducer.class); //4、设置Map端输出KV类型 job.setMapOutputKeyClass(FlowBean.class); job.setMapOutputValueClass(Text.class); //5、设置程序最终输出的KV类型 job.setOutputKeyClass(Text.class); job.setOutputValueClass(FlowBean.class) //6、设置程序的输入和输出路径 FileInputFormat.addInputPath(job,new Path("E:\\test\\output1")); FileOutputFormat.setOutputPath(job,new Path("E:\\test\\output2")); //7、提交Job System.exit(job.waitForCompletion(true)?0:1); } }
1.3.6 WritableComparable排序案例实操(区内排序)
前提:这个案例和上一个案例区别不大,就是增加了分区而已,所以只需要增加自定义分区类和修改一个Driver类就行,像Mapper类和Reducer类是不用修改的。
(1)增加自定义分区类
package org.example.paixu; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Partitioner; /** * @ClassName MyPartition * @Description TODO * @Author Zouhuiming * @Date 2023/5/21 11:29 * @Version 1.0 */ public class MyPartition extends Partitioner< FlowBean,Text> { @Override public int getPartition(FlowBean flowBean, Text text, int i) { String phone=text.toString(); String prePhone=phone.substring(0,3); //定义分区编号 int partition; if ("136".equals(prePhone)){ partition=0; }else if ("137".equals(prePhone)){ partition=1; }else if ("138".equals(prePhone)){ partition=2; }else if ("139".equals(prePhone)){ partition=3; } else { partition=4; } return partition; } }
(2)修改Driver
package org.example.paixu; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import java.io.IOException; /** * @ClassName FlowDriver * @Description TODO * @Author Zouhuiming * @Date 2023/5/20 10:38 * @Version 1.0 */ public class FlowDriver { public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException { //1、获取job对象 Configuration configuration=new Configuration(); Job job=Job.getInstance(configuration); //2、关联本Driver类 job.setJarByClass(FlowDriver.class); //3、关联Mapper和Reducer job.setMapperClass(FlowMapper.class); job.setReducerClass(FlowReducer.class); //4、设置Map端输出KV类型 job.setMapOutputKeyClass(FlowBean.class); job.setMapOutputValueClass(Text.class); //5、设置程序最终输出的KV类型 job.setOutputKeyClass(Text.class); job.setOutputValueClass(FlowBean.class); //8、指定自定义分区器 job.setPartitionerClass(MyPartition.class); //9、同时指定相应数量的ReduceTask job.setNumReduceTasks(5); //6、设置程序的输入和输出路径 FileInputFormat.addInputPath(job,new Path("E:\\test\\output1")); FileOutputFormat.setOutputPath(job,new Path("E:\\test\\output2")); //7、提交Job System.exit(job.waitForCompletion(true)?0:1); } }
1.3.7 Combiner 合并
Combiner是MR程序中Mapper和Reducer之外的一种组件
Combiner组件的父类就是Reducer
Combiner和Reducer的区别就是运行位置
Combiner是在每一个MapTask所在的节点运行
Reducer是接受全局所以Mapper的输出结果
Combiner的意义就是对每一个MapTask的输出进行局部汇总,以减少网络传输量
Combiner能够应用的前提是不能影响最终的业务逻辑,而且,Combine的输出kv应该跟Reducer的输入kv类型要对应起来
1、自定义Combiner实现步骤
(1)自定义一个Combiner继承Reducer,重写Reduce()方法
package org.example._07Combiner; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; import java.io.IOException; /** * @ClassName CombinerClass * @Description TODO * @Author Zouhuiming * @Date 2023/5/22 9:11 * @Version 1.0 */ public class CombinerClass extends Reducer<Text, IntWritable,Text, IntWritable> { private IntWritable outV=new IntWritable(); @Override protected void reduce(Text key, Iterable<IntWritable> values, Reducer<Text, IntWritable, Text, IntWritable>.Context context) throws IOException, InterruptedException { int sum=0; for (IntWritable value : values) { sum+=value.get(); } outV.set(sum); context.write(key,outV); } }
(2)在Job驱动类中设置
job.setCombinerClass(WordCountCombiner.class);
1.3.8 合并案例实操
1、需求(我的还是单词统计的代码)
统计过程中对每一个 MapTask 的输出进行局部汇总,以减小网络传输量即采用Combiner 功能。
(1)数据输入
自己创造一个就行了吧(到这应该会自己创造自己需要用的数据)
(2)期望数据输出
自己算吧
2、案例实操
(1)增加一个类继承Reducer
package org.example._07Combiner; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; import java.io.IOException; /** * @ClassName CombinerClass * @Description TODO * @Author Zouhuiming * @Date 2023/5/22 9:11 * @Version 1.0 */ public class CombinerClass extends Reducer<Text, IntWritable,Text, IntWritable> { private IntWritable outV=new IntWritable(); @Override protected void reduce(Text key, Iterable<IntWritable> values, Reducer<Text, IntWritable, Text, IntWritable>.Context context) throws IOException, InterruptedException { int sum=0; for (IntWritable value : values) { sum+=value.get(); } outV.set(sum); context.write(key,outV); } }
(2)在驱动类中指定Combiner
job.setCombinerClass(CombinerClass.class);
说明:其实这里也可以直接填以下代码
job.setCombinerClass(WordCountReduce.class);