MapReduce按照两个字段对数据进行排序

简介:

按照k2排序,要求k2必须是可以比较的,即必须实现WritableComparable接口。

但是如果还想让别的字段(比如v2中的一些字段)参与排序怎么办?

需要重新定义k2....把需要参与排序的字段都放到k2中.

这块用代码实现:

假如数据现在的结构是

3       3

3       2

3       1

2       2

2       1

1       1

看代码:

复制代码
 1 import java.io.DataInput;
 2 import java.io.DataOutput;
 3 import java.io.IOException;
 4 
 5 import org.apache.hadoop.conf.Configuration;
 6 import org.apache.hadoop.fs.Path;
 7 import org.apache.hadoop.io.LongWritable;
 8 import org.apache.hadoop.io.NullWritable;
 9 import org.apache.hadoop.io.Text;
10 import org.apache.hadoop.io.WritableComparable;
11 import org.apache.hadoop.mapreduce.Job;
12 import org.apache.hadoop.mapreduce.Mapper;
13 import org.apache.hadoop.mapreduce.Reducer;
14 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
15 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
16 
17 public class TwoIntSortApp {
18     
19     public static void main(String[] args) throws Exception {
20         Job job = Job.getInstance(new Configuration(), TwoIntSortApp.class.getSimpleName());
21         job.setJarByClass(TwoIntSortApp.class);
22         FileInputFormat.setInputPaths(job, args[0]);
23         
24         job.setMapperClass(TwoIntSortMapper.class);
25         job.setMapOutputKeyClass(TwoInt.class);
26         job.setMapOutputValueClass(NullWritable.class);
27         
28         job.setReducerClass(TwoIntSortReducer.class);
29         job.setOutputKeyClass(TwoInt.class);
30         job.setOutputValueClass(NullWritable.class);
31 
32         FileOutputFormat.setOutputPath(job, new Path(args[1]));
33         job.waitForCompletion(true);        
34     }
35     
36     public static class TwoIntSortMapper extends Mapper<LongWritable, Text, TwoInt, NullWritable>{
37         TwoInt k2 = new TwoInt();
38         @Override
39         protected void map(LongWritable key, Text value,
40                 Mapper<LongWritable, Text, TwoInt, NullWritable>.Context context)
41                 throws IOException, InterruptedException {
42             String[] splited = value.toString().split("\t");
43             k2.set(splited[0],splited[1]);
44             context.write(k2, NullWritable.get());
45             System.out.println("Mapper-----第一个数:"+k2.first+" 第二个数:"+k2.second);
46         }
47     }
48     
49     public static class TwoIntSortReducer extends Reducer<TwoInt, NullWritable, TwoInt, NullWritable>{
50         int i=1;
51         @Override
52         protected void reduce(TwoInt k2, Iterable<NullWritable> arg1,
53                 Reducer<TwoInt, NullWritable, TwoInt, NullWritable>.Context context)
54                 throws IOException, InterruptedException {
55             context.write(k2,NullWritable.get());
56             System.out.println("调用次数"+(i++));
57             System.out.println("Reducer-----第一个数:"+k2.first+" 第二个数:"+k2.second);
58         }
59     }
60     
61     public static class TwoInt implements WritableComparable<TwoInt>{
62         int first;
63         int second;
64         public void write(DataOutput out) throws IOException {
65             out.writeInt(first);
66             out.writeInt(second);
67         }
68         
69         public void set(String s1,String s2){
70             this.first = Integer.parseInt(s1);
71             this.second = Integer.parseInt(s2);
72         }
73 
74         public void readFields(DataInput in) throws IOException {
75             this.first = in.readInt();
76             this.second = in.readInt();
77             
78         }
79 
80         public int compareTo(TwoInt o) {
81             int r1 = this.first - o.first;
82             if(r1 < 0){
83                 return -1;
84             }else if(r1 > 0){
85                 return 1;
86             }
87             int r2 = this.second - o.second;
88             return  (r2 < 0 ? -1 : (r2 > 0 ? 1 : 0)); 
89         }
90         
91         @Override
92         public String toString() {
93             return this.first+"\t"+this.second;
94         }
95     }
96 }
复制代码

 //==============================================================

在job上设置Combiner类...

        
        job.setCombinerClass(TwoIntSortReducer.class);//设置Combiner类
        job.setGroupingComparatorClass(MyGroupingCompartor.class);//设置自定义的分组类

  

复制代码
 1     public static class MyGroupingCompartor extends WritableComparator{
 2         @Override
 3         public int compare(WritableComparable a, WritableComparable b) {
 4             TwoInt aa = (TwoInt)a; 
 5             TwoInt bb = (TwoInt)b; 
 6             return aa.first-bb.first<0?-1:(aa.first-bb.first>0?1:0);//只要是第一列相同的就认为是一个分组.
 7             /*
 8              * 1    1
 9              * 2    1
10              * 2    2
11              * 3    1
12              * 3    2
13              * 3    3
14              * 这样就分成了三组
15              */
16         }
17     }
复制代码

 


本文转自SummerChill博客园博客,原文链接:http://www.cnblogs.com/DreamDrive/p/5678175.html,如需转载请自行联系原作者

相关文章
|
1月前
|
SQL 分布式计算 关系型数据库
Hadoop-24 Sqoop迁移 MySQL到Hive 与 Hive到MySQL SQL生成数据 HDFS集群 Sqoop import jdbc ETL MapReduce
Hadoop-24 Sqoop迁移 MySQL到Hive 与 Hive到MySQL SQL生成数据 HDFS集群 Sqoop import jdbc ETL MapReduce
79 0
|
1月前
|
SQL 分布式计算 关系型数据库
Hadoop-23 Sqoop 数据MySQL到HDFS(部分) SQL生成数据 HDFS集群 Sqoop import jdbc ETL MapReduce
Hadoop-23 Sqoop 数据MySQL到HDFS(部分) SQL生成数据 HDFS集群 Sqoop import jdbc ETL MapReduce
34 0
|
1月前
|
SQL 分布式计算 关系型数据库
Hadoop-22 Sqoop 数据MySQL到HDFS(全量) SQL生成数据 HDFS集群 Sqoop import jdbc ETL MapReduce
Hadoop-22 Sqoop 数据MySQL到HDFS(全量) SQL生成数据 HDFS集群 Sqoop import jdbc ETL MapReduce
44 0
|
3月前
|
存储 分布式计算 分布式数据库
《HBase MapReduce之旅:我的学习笔记与心得》——跟随我的步伐,一同探索HBase世界,揭开MapReduce的神秘面纱,分享那些挑战与收获,让你在数据的海洋里畅游无阻!
【8月更文挑战第17天】HBase是Apache顶级项目,作为Bigtable的开源版,它是一个非关系型、分布式数据库,具备高可扩展性和性能。结合HDFS存储和MapReduce计算框架,以及Zookeeper协同服务,HBase支持海量数据高效管理。MapReduce通过将任务拆解并在集群上并行执行,极大提升处理速度。学习HBase MapReduce涉及理解其数据模型、编程模型及应用实践,虽然充满挑战,但收获颇丰,对职业发展大有裨益。
44 0
|
6月前
|
SQL 分布式计算 数据可视化
数据分享|Python、Spark SQL、MapReduce决策树、回归对车祸发生率影响因素可视化分析
数据分享|Python、Spark SQL、MapReduce决策树、回归对车祸发生率影响因素可视化分析
|
数据采集 分布式计算 搜索推荐
Hadoop学习---7、OutputFormat数据输出、MapReduce内核源码解析、Join应用、数据清洗、MapReduce开发总结(一)
Hadoop学习---7、OutputFormat数据输出、MapReduce内核源码解析、Join应用、数据清洗、MapReduce开发总结(一)
|
分布式计算 Hadoop 大数据
MapReduce 案例之数据去重
MapReduce 案例之数据去重
287 0
|
6月前
|
分布式计算
如何在MapReduce中处理非结构化数据?
如何在MapReduce中处理非结构化数据?
73 0
|
6月前
|
分布式计算 Java Hadoop
MapReduce编程:数据过滤保存、UID 去重
MapReduce编程:数据过滤保存、UID 去重
101 0
|
6月前
|
存储 分布式计算 分布式数据库
对给定的数据利用MapReduce编程实现数据的清洗和预处理,编程实现数据存储到HBase数据库,实现数据的增删改查操作接口
对给定的数据利用MapReduce编程实现数据的清洗和预处理,编程实现数据存储到HBase数据库,实现数据的增删改查操作接口
51 0