MapReduce 实验:二次排序

简介: MapReduce 实验:二次排序

前言

通过hadoop进行二次排序,不打乱第一次的顺序结果,通过eclipse编写排序规则,完成数据的二次排序规则。

实验目的

基于 MapReduce 思想,编写 SecondarySort 程序。

实验要求

要能理解 MapReduce 编程思想,会编写 MapReduce 版本二次排序程序,然后将其执行并分析执行过程。

实验原理

MR 默认会对键进行排序,然而有的时候我们也有对值进行排序的需求。满足这种需求一是可以在 reduce 阶段排序收集过来的 values,但是,如果有数量巨大的 values 可能就会导致内存溢出等问题,这就是二次排序应用的场景——将对值的排序也安排到 MR 计算过程之中,而不是单独来做。

二次排序就是首先按照第一字段排序,然后再对第一字段相同的行按照第二字段排序,注意不能破坏第一次排序的结果。

实验步骤

编写程序

程序主要难点在于排序和聚合。

对于排序我们需要定义一个 IntPair 类用于数据的存储,并在 IntPair 类内部自定义

Comparator 类以实现第一字段和第二字段的比较。

对于聚合我们需要定义一个 FirstPartitioner 类,在 FirstPartitioner 类内部指定聚合规则为第一字段。

此外,我们还需要开启 MapReduce 框架自定义 Partitioner 功能和 GroupingComparator 功能。

IntPair

1. package mr; 
2. 
3. import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; 
4. import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.WritableComparable; 
5. public class IntPair implements WritableComparable {     private IntWritable first;     private IntWritable second;     public void set(IntWritable first, IntWritable second) {         this.first = first;         this.second = second; 
6.     } 
7. //注意:需要添加无参的构造方法,否则反射时会报错。 
8. public IntPair() {         set(new IntWritable(), new IntWritable()); 
9.     }     public IntPair(int first, int second) {         set(new IntWritable(first), new IntWritable(second)); 
10.     }     public IntPair(IntWritable first, IntWritable second) {         set(first, second); 
11.     }     public IntWritable getFirst() {         return first; 
12.     }     public void setFirst(IntWritable first) {         this.first = first; 
13.     }     public IntWritable getSecond() {         return second; 
14.     }     public void setSecond(IntWritable second) {         this.second = second; 
15.     } 
16. public void write(DataOutput out) throws IOException {         first.write(out);         second.write(out); 
17.     } 
18. public void readFields(DataInput in) throws IOException {         first.readFields(in);         second.readFields(in); 
19.     } 
20. public int hashCode() {         return first.hashCode() * 163 + second.hashCode(); 
21.     }     public boolean equals(Object o) {         if (o instanceof IntPair) {             IntPair tp = (IntPair) o;             return first.equals(tp.first) && second.equals(tp.second); 
22.         }         return false; 
23.     }     public String toString() {         return first + "\t" + second; 
24.     } 
25. public int compareTo(Object o) {         IntPair tp=(IntPair) o;         int cmp = first.compareTo(tp.first);         if (cmp != 0) {             return cmp; 
26.         }         return second.compareTo(tp.second); 
27.     } 
28. }

完整代码

1. package mr;  import java.io.IOException; 
2. import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.io.WritableComparator; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Partitioner; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 
3.  public class SecondarySort {     static class TheMapper extends Mapper<LongWritable, Text, IntPair, 
4. NullWritable> {         @Override         protected void map(LongWritable key, Text value, Context context)                 throws IOException, InterruptedException {             String[] fields = value.toString().split("\t");             int field1 = Integer.parseInt(fields[0]);             int field2 = Integer.parseInt(fields[1]);             context.write(new IntPair(field1,field2), NullWritable.get()); 
5.         }     } 
6.     static class TheReducer extends Reducer<IntPair, NullWritable,IntPair, 
7. NullWritable> { 
8.         //private static final Text SEPARATOR = new Text("---------------------
9. ---------------------------"); 
10.         @Override 
11.         protected void reduce(IntPair key, Iterable<NullWritable> values, Context context)                 throws IOException, InterruptedException {             context.write(key, NullWritable.get()); 
12.         }     } 
13.     public static class FirstPartitioner extends Partitioner<IntPair, NullWritable> {         public int getPartition(IntPair key, NullWritable value,                 int numPartitions) {             return Math.abs(key.getFirst().get()) % numPartitions; 
14.         } 
15.     } 
16. //如果不添加这个类,默认第一列和第二列都是升序排序的。 
17. //这个类的作用是使第一列升序排序,第二列降序排序 
18.     public static class KeyComparator extends WritableComparator { 
19. //无参构造器必须加上,否则报错。 
20.         protected KeyComparator() {             super(IntPair.class, true); 
21.         } 
22.         public int compare(WritableComparable a, WritableComparable b) { 
23.             IntPair ip1 = (IntPair) a; 
24.             IntPair ip2 = (IntPair) b; 
25. //第一列按升序排序 
26.             int cmp = ip1.getFirst().compareTo(ip2.getFirst());             if (cmp != 0) {                 return cmp; 
27.             } 
28. //在第一列相等的情况下,第二列按倒序排序 
29. return -ip1.getSecond().compareTo(ip2.getSecond()); 
30.         } 
31.     } 
32. //入口程序 
33.     public static void main(String[] args) throws Exception { 
34.         Configuration conf = new Configuration();         Job job = Job.getInstance(conf);         job.setJarByClass(SecondarySort.class); 
35. //设置Mapper的相关属性 
36.         job.setMapperClass(TheMapper.class); 
37. //当Mapper中的输出的key和value的类型和Reduce输出 //的key和value的类型相同时,以下两句可以省略。 
38.         //job.setMapOutputKeyClass(IntPair.class); 
39.         //job.setMapOutputValueClass(NullWritable.class); 
40.         FileInputFormat.setInputPaths(job, new Path(args[0])); 
41. //设置分区的相关属性 
42.         job.setPartitionerClass(FirstPartitioner.class); 
43. //在map中对key进行排序 
44.         job.setSortComparatorClass(KeyComparator.class); 
45.         //job.setGroupingComparatorClass(GroupComparator.class); 
46. //设置Reducer的相关属性 
47.         job.setReducerClass(TheReducer.class);         job.setOutputKeyClass(IntPair.class);         job.setOutputValueClass(NullWritable.class); 
48.         FileOutputFormat.setOutputPath(job, new Path(args[1])); 
49. //设置Reducer数量         int reduceNum = 1;         if(args.length >= 3 && args[2] != null){             reduceNum = Integer.parseInt(args[2]); 
50.         }         job.setNumReduceTasks(reduceNum);         job.waitForCompletion(true); 
51.     } 
52. }

输入数据

 

1. 输入数据如下:secsortdata.txt ('\t'分割)(数据放在/root/data/6 目录下): 
2. 7    444 
3. 3    9999 
4. 7    333 
5. 4    22 
6. 3    7777 
7. 7    555 
8. 3    6666 
9. 6    0 
10. 3 8888 
11. 4 11

 

在 master 上执行对 hdfs 上的文件/user/mapreduce/secsort/out/part-r-00000 内容查看的操作

[root@master hadoop]# bin/hadoop fs -cat  /user/mapreduce/secsort/out/p*

bin/hadoop fs -cat  /user/mapreduce/secsort/out/p*

如图 所示:

总结

通过实验,我们可以了解到MR 默认会对键进行排序,然而有的时候我们也有对值进行排序的需求。满足这种需求一是可以在 reduce 阶段排序收集过来的 values,但是,如果有数量巨大的 values 可能就会导致内存溢出等问题,这就是二次排序应用的场景——将对值的排序也安排到 MR 计算过程之中,而不是单独来做。


相关文章
|
9月前
|
存储 编解码 分布式计算
云计算与大数据实验六 MapReduce综合应用
云计算与大数据实验六 MapReduce综合应用
171 0
|
9月前
|
分布式计算 Java Hadoop
云计算与大数据实验五 MapReduce编程
云计算与大数据实验五 MapReduce编程
222 0
|
分布式计算 Hadoop Java
Mapreduce实验之wordcount
利用hadoop函数,标准输出输出堆中的k个单词与频次。
Mapreduce实验之wordcount
|
分布式计算 Hadoop
MapReduce 二次排序详解
1 首先说一下工作原理: 在map阶段,使用job.setInputFormatClass定义的InputFormat将输入的数据集分割成小数据块splites,同时InputFormat提供一个RecordReder的实现。
877 0
|
4月前
|
分布式计算 Hadoop
Hadoop系列 mapreduce 原理分析
Hadoop系列 mapreduce 原理分析
40 1
|
7月前
|
分布式计算 Hadoop 大数据
Hadoop学习:深入解析MapReduce的大数据魔力之数据压缩(四)
Hadoop学习:深入解析MapReduce的大数据魔力之数据压缩(四)