前言
通过hadoop进行二次排序,不打乱第一次的顺序结果,通过eclipse编写排序规则,完成数据的二次排序规则。
实验目的
基于 MapReduce 思想,编写 SecondarySort 程序。
实验要求
要能理解 MapReduce 编程思想,会编写 MapReduce 版本二次排序程序,然后将其执行并分析执行过程。
实验原理
MR 默认会对键进行排序,然而有的时候我们也有对值进行排序的需求。满足这种需求一是可以在 reduce 阶段排序收集过来的 values,但是,如果有数量巨大的 values 可能就会导致内存溢出等问题,这就是二次排序应用的场景——将对值的排序也安排到 MR 计算过程之中,而不是单独来做。
二次排序就是首先按照第一字段排序,然后再对第一字段相同的行按照第二字段排序,注意不能破坏第一次排序的结果。
实验步骤
编写程序
程序主要难点在于排序和聚合。
对于排序我们需要定义一个 IntPair 类用于数据的存储,并在 IntPair 类内部自定义
Comparator 类以实现第一字段和第二字段的比较。
对于聚合我们需要定义一个 FirstPartitioner 类,在 FirstPartitioner 类内部指定聚合规则为第一字段。
此外,我们还需要开启 MapReduce 框架自定义 Partitioner 功能和 GroupingComparator 功能。
IntPair 类
1. package mr; 2. 3. import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; 4. import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.WritableComparable; 5. public class IntPair implements WritableComparable { private IntWritable first; private IntWritable second; public void set(IntWritable first, IntWritable second) { this.first = first; this.second = second; 6. } 7. //注意:需要添加无参的构造方法,否则反射时会报错。 8. public IntPair() { set(new IntWritable(), new IntWritable()); 9. } public IntPair(int first, int second) { set(new IntWritable(first), new IntWritable(second)); 10. } public IntPair(IntWritable first, IntWritable second) { set(first, second); 11. } public IntWritable getFirst() { return first; 12. } public void setFirst(IntWritable first) { this.first = first; 13. } public IntWritable getSecond() { return second; 14. } public void setSecond(IntWritable second) { this.second = second; 15. } 16. public void write(DataOutput out) throws IOException { first.write(out); second.write(out); 17. } 18. public void readFields(DataInput in) throws IOException { first.readFields(in); second.readFields(in); 19. } 20. public int hashCode() { return first.hashCode() * 163 + second.hashCode(); 21. } public boolean equals(Object o) { if (o instanceof IntPair) { IntPair tp = (IntPair) o; return first.equals(tp.first) && second.equals(tp.second); 22. } return false; 23. } public String toString() { return first + "\t" + second; 24. } 25. public int compareTo(Object o) { IntPair tp=(IntPair) o; int cmp = first.compareTo(tp.first); if (cmp != 0) { return cmp; 26. } return second.compareTo(tp.second); 27. } 28. }
完整代码
1. package mr; import java.io.IOException; 2. import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.io.WritableComparator; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Partitioner; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 3. public class SecondarySort { static class TheMapper extends Mapper<LongWritable, Text, IntPair, 4. NullWritable> { @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String[] fields = value.toString().split("\t"); int field1 = Integer.parseInt(fields[0]); int field2 = Integer.parseInt(fields[1]); context.write(new IntPair(field1,field2), NullWritable.get()); 5. } } 6. static class TheReducer extends Reducer<IntPair, NullWritable,IntPair, 7. NullWritable> { 8. //private static final Text SEPARATOR = new Text("--------------------- 9. ---------------------------"); 10. @Override 11. protected void reduce(IntPair key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException { context.write(key, NullWritable.get()); 12. } } 13. public static class FirstPartitioner extends Partitioner<IntPair, NullWritable> { public int getPartition(IntPair key, NullWritable value, int numPartitions) { return Math.abs(key.getFirst().get()) % numPartitions; 14. } 15. } 16. //如果不添加这个类,默认第一列和第二列都是升序排序的。 17. //这个类的作用是使第一列升序排序,第二列降序排序 18. public static class KeyComparator extends WritableComparator { 19. //无参构造器必须加上,否则报错。 20. protected KeyComparator() { super(IntPair.class, true); 21. } 22. public int compare(WritableComparable a, WritableComparable b) { 23. IntPair ip1 = (IntPair) a; 24. IntPair ip2 = (IntPair) b; 25. //第一列按升序排序 26. int cmp = ip1.getFirst().compareTo(ip2.getFirst()); if (cmp != 0) { return cmp; 27. } 28. //在第一列相等的情况下,第二列按倒序排序 29. return -ip1.getSecond().compareTo(ip2.getSecond()); 30. } 31. } 32. //入口程序 33. public static void main(String[] args) throws Exception { 34. Configuration conf = new Configuration(); Job job = Job.getInstance(conf); job.setJarByClass(SecondarySort.class); 35. //设置Mapper的相关属性 36. job.setMapperClass(TheMapper.class); 37. //当Mapper中的输出的key和value的类型和Reduce输出 //的key和value的类型相同时,以下两句可以省略。 38. //job.setMapOutputKeyClass(IntPair.class); 39. //job.setMapOutputValueClass(NullWritable.class); 40. FileInputFormat.setInputPaths(job, new Path(args[0])); 41. //设置分区的相关属性 42. job.setPartitionerClass(FirstPartitioner.class); 43. //在map中对key进行排序 44. job.setSortComparatorClass(KeyComparator.class); 45. //job.setGroupingComparatorClass(GroupComparator.class); 46. //设置Reducer的相关属性 47. job.setReducerClass(TheReducer.class); job.setOutputKeyClass(IntPair.class); job.setOutputValueClass(NullWritable.class); 48. FileOutputFormat.setOutputPath(job, new Path(args[1])); 49. //设置Reducer数量 int reduceNum = 1; if(args.length >= 3 && args[2] != null){ reduceNum = Integer.parseInt(args[2]); 50. } job.setNumReduceTasks(reduceNum); job.waitForCompletion(true); 51. } 52. }
输入数据
1. 输入数据如下:secsortdata.txt ('\t'分割)(数据放在/root/data/6 目录下): 2. 7 444 3. 3 9999 4. 7 333 5. 4 22 6. 3 7777 7. 7 555 8. 3 6666 9. 6 0 10. 3 8888 11. 4 11
在 master 上执行对 hdfs 上的文件/user/mapreduce/secsort/out/part-r-00000 内容查看的操作
[root@master hadoop]# bin/hadoop fs -cat /user/mapreduce/secsort/out/p*
bin/hadoop fs -cat /user/mapreduce/secsort/out/p*
如图 所示:
总结
通过实验,我们可以了解到MR 默认会对键进行排序,然而有的时候我们也有对值进行排序的需求。满足这种需求一是可以在 reduce 阶段排序收集过来的 values,但是,如果有数量巨大的 values 可能就会导致内存溢出等问题,这就是二次排序应用的场景——将对值的排序也安排到 MR 计算过程之中,而不是单独来做。