1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
package  com.mzsx.hadoop;
import  java.io.IOException;
import  java.util.Random;
import  java.util.StringTokenizer;
import  org.apache.hadoop.conf.Configuration;
import  org.apache.hadoop.fs.FileSystem;
import  org.apache.hadoop.fs.Path;
import  org.apache.hadoop.io.IntWritable;
import  org.apache.hadoop.io.Text;
import  org.apache.hadoop.io.WritableComparable;
import  org.apache.hadoop.mapreduce.Job;
import  org.apache.hadoop.mapreduce.Mapper;
import  org.apache.hadoop.mapreduce.Reducer;
import  org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import  org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public  class  MySortWordCount {
     public  static  class  MyMapper  extends
             Mapper<Object, Text, Text, IntWritable> {
         private  final  static  IntWritable one =  new  IntWritable( 1 ); // 类似于int类型
         private  Text word =  new  Text();  // 可以理解成String类型
         public  void  map(Object key, Text value, Context context)
                 throws  IOException, InterruptedException {
             System.err.println(key +  ","  + value);
             // 默认情况下即根据空格分隔字符串
             String tmp=value.toString();
             tmp=tmp.replace( '\'' ' ' );
             tmp=tmp.replace( '.' ' ' );
             tmp=tmp.replace( ',' ' ' );
             tmp=tmp.replace( ':' ' ' );
             tmp=tmp.replace( '!' ' ' );
             tmp=tmp.replace( ';' ' ' );
             tmp=tmp.replace( '?' ' ' );
             tmp=tmp.replace( '`' ' ' );
             tmp=tmp.replace( '"' ' ' );
             tmp=tmp.replace( '&' ' ' );
             tmp=tmp.replace( '(' ' ' );
             tmp=tmp.replace( ')' ' ' );
             tmp=tmp.replace( '-' ' ' );
             StringTokenizer itr =  new  StringTokenizer(tmp);
             while  (itr.hasMoreTokens()) {
                 word.set(itr.nextToken());
                 context.write(word, one);
             }
         };
     }
     // Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT>
     public  static  class  MyReducer  extends
             Reducer<Text, IntWritable, Text, IntWritable> {
         private  IntWritable result =  new  IntWritable();
         protected  void  reduce(Text key, Iterable<IntWritable> values,
                 Context context)  throws  IOException, InterruptedException {
             System.err.println(key +  ","  + values);
             int  sum =  0 ;
             for  (IntWritable val : values) {
                 sum += val.get();
             }
             result.set(sum);
             ;
             context.write(key, result); // 这是最后结果
         };
     }
       
     public  static  class  SortMapper  extends  Mapper<Object, Text, IntWritable,Text>{
           
         public  void  map(Object key, Text value, Context context)  throws  IOException, InterruptedException {
              
             IntWritable times =  new  IntWritable( 1 );
             Text password =  new  Text();
             String eachline=value.toString();
             String[] eachterm =eachline.split( "\t" );
             password.set(eachterm[ 0 ]);
             times.set(Integer.parseInt(eachterm[ 1 ]));
             context.write(times,password);
               
         }
       }
          
       public  static  class  SortReducer  extends  Reducer<IntWritable,Text,IntWritable,Text> {
           private  Text password =  new  Text();
           public  void  reduce(IntWritable key,Iterable<Text> values, Context context)  throws  IOException, InterruptedException {
             for  (Text val : values) {
                 password.set(val);
                 context.write(key,password);
             }
         }
     }
       private  static  class  IntDecreasingComparator  extends  IntWritable.Comparator {
           public  int  compare(WritableComparable a, WritableComparable b) {
             //return -super.compare(a, b);
               return  super .compare(a, b);
           }
             
           public  int  compare( byte [] b1,  int  s1,  int  l1,  byte [] b2,  int  s2,  int  l2) {
               //return -super.compare(b1, s1, l1, b2, s2, l2);
               return  super .compare(b1, s1, l1, b2, s2, l2);
           }
       }
     public  static  void  main(String[] args)  throws  Exception {
         // 声明配置信息
         Configuration conf =  new  Configuration();
         // 声明Job
         Job job =  new  Job(conf,  "Word Count" );
         // 设置工作类
         job.setJarByClass(MySortWordCount. class );
         // 设置mapper类
         job.setMapperClass(MyMapper. class );
         // 可选
         job.setCombinerClass(MyReducer. class );
         // 设置合并计算类
         job.setReducerClass(MyReducer. class );
         // 设置key为String类型
         job.setOutputKeyClass(Text. class );
         // 设置value为int类型
         job.setOutputValueClass(IntWritable. class );
         //job.setInputFormatClass(KeyValueTextInputFormat.class);
         // 设置或是接收输入输出
         /*FileInputFormat.setInputPaths(job, new Path("/user/root/aoman.txt"));
         FileOutputFormat.setOutputPath(job, new Path("/user/root/r3"));
         // 执行
         System.exit(job.waitForCompletion(true) ? 0 : 1);*/
           
         //定义一个临时目录,先将词频统计任务的输出结果写到临时目录中, 下一个排序任务以临时目录为输入目录。
         FileInputFormat.addInputPath(job,  new  Path( "/user/root/aoman.txt" ));
         Path tempDir =  new  Path( "MySortWordCount-temp-"  + Integer.toString( new  Random().nextInt(Integer.MAX_VALUE)));
         FileOutputFormat.setOutputPath(job, tempDir);
           
         if (job.waitForCompletion( true ))
         {
             Job sortJob =  new  Job(conf,  "csdnsort" );
             sortJob.setJarByClass(MySortWordCount. class );
               
             FileInputFormat.addInputPath(sortJob, tempDir);
               
             sortJob.setMapperClass(SortMapper. class );
             FileOutputFormat.setOutputPath(sortJob,  new  Path( "/user/root/sort1" ));
               
             sortJob.setOutputKeyClass(IntWritable. class );
             sortJob.setOutputValueClass(Text. class );
               
             sortJob.setSortComparatorClass(IntDecreasingComparator. class );
    
             FileSystem.get(conf).deleteOnExit(tempDir);
               
             System.exit(sortJob.waitForCompletion( true ) ?  0  1 );
         }
           
         System.exit(job.waitForCompletion( true ) ?  0  1 );
           
           
     }
}