1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
|
package
com.mzsx.hadoop;
import
java.io.IOException;
import
java.util.Random;
import
java.util.StringTokenizer;
import
org.apache.hadoop.conf.Configuration;
import
org.apache.hadoop.fs.FileSystem;
import
org.apache.hadoop.fs.Path;
import
org.apache.hadoop.io.IntWritable;
import
org.apache.hadoop.io.Text;
import
org.apache.hadoop.io.WritableComparable;
import
org.apache.hadoop.mapreduce.Job;
import
org.apache.hadoop.mapreduce.Mapper;
import
org.apache.hadoop.mapreduce.Reducer;
import
org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import
org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public
class
MySortWordCount {
public
static
class
MyMapper
extends
Mapper<Object, Text, Text, IntWritable> {
private
final
static
IntWritable one =
new
IntWritable(
1
);
// 类似于int类型
private
Text word =
new
Text();
// 可以理解成String类型
public
void
map(Object key, Text value, Context context)
throws
IOException, InterruptedException {
System.err.println(key +
","
+ value);
// 默认情况下即根据空格分隔字符串
String tmp=value.toString();
tmp=tmp.replace(
'\''
,
' '
);
tmp=tmp.replace(
'.'
,
' '
);
tmp=tmp.replace(
','
,
' '
);
tmp=tmp.replace(
':'
,
' '
);
tmp=tmp.replace(
'!'
,
' '
);
tmp=tmp.replace(
';'
,
' '
);
tmp=tmp.replace(
'?'
,
' '
);
tmp=tmp.replace(
'`'
,
' '
);
tmp=tmp.replace(
'"'
,
' '
);
tmp=tmp.replace(
'&'
,
' '
);
tmp=tmp.replace(
'('
,
' '
);
tmp=tmp.replace(
')'
,
' '
);
tmp=tmp.replace(
'-'
,
' '
);
StringTokenizer itr =
new
StringTokenizer(tmp);
while
(itr.hasMoreTokens()) {
word.set(itr.nextToken());
context.write(word, one);
}
};
}
// Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT>
public
static
class
MyReducer
extends
Reducer<Text, IntWritable, Text, IntWritable> {
private
IntWritable result =
new
IntWritable();
protected
void
reduce(Text key, Iterable<IntWritable> values,
Context context)
throws
IOException, InterruptedException {
System.err.println(key +
","
+ values);
int
sum =
0
;
for
(IntWritable val : values) {
sum += val.get();
}
result.set(sum);
;
context.write(key, result);
// 这是最后结果
};
}
public
static
class
SortMapper
extends
Mapper<Object, Text, IntWritable,Text>{
public
void
map(Object key, Text value, Context context)
throws
IOException, InterruptedException {
IntWritable times =
new
IntWritable(
1
);
Text password =
new
Text();
String eachline=value.toString();
String[] eachterm =eachline.split(
"\t"
);
password.set(eachterm[
0
]);
times.set(Integer.parseInt(eachterm[
1
]));
context.write(times,password);
}
}
public
static
class
SortReducer
extends
Reducer<IntWritable,Text,IntWritable,Text> {
private
Text password =
new
Text();
public
void
reduce(IntWritable key,Iterable<Text> values, Context context)
throws
IOException, InterruptedException {
for
(Text val : values) {
password.set(val);
context.write(key,password);
}
}
}
private
static
class
IntDecreasingComparator
extends
IntWritable.Comparator {
public
int
compare(WritableComparable a, WritableComparable b) {
//return -super.compare(a, b);
return
super
.compare(a, b);
}
public
int
compare(
byte
[] b1,
int
s1,
int
l1,
byte
[] b2,
int
s2,
int
l2) {
//return -super.compare(b1, s1, l1, b2, s2, l2);
return
super
.compare(b1, s1, l1, b2, s2, l2);
}
}
public
static
void
main(String[] args)
throws
Exception {
// 声明配置信息
Configuration conf =
new
Configuration();
// 声明Job
Job job =
new
Job(conf,
"Word Count"
);
// 设置工作类
job.setJarByClass(MySortWordCount.
class
);
// 设置mapper类
job.setMapperClass(MyMapper.
class
);
// 可选
job.setCombinerClass(MyReducer.
class
);
// 设置合并计算类
job.setReducerClass(MyReducer.
class
);
// 设置key为String类型
job.setOutputKeyClass(Text.
class
);
// 设置value为int类型
job.setOutputValueClass(IntWritable.
class
);
//job.setInputFormatClass(KeyValueTextInputFormat.class);
// 设置或是接收输入输出
/*FileInputFormat.setInputPaths(job, new Path("/user/root/aoman.txt"));
FileOutputFormat.setOutputPath(job, new Path("/user/root/r3"));
// 执行
System.exit(job.waitForCompletion(true) ? 0 : 1);*/
//定义一个临时目录,先将词频统计任务的输出结果写到临时目录中, 下一个排序任务以临时目录为输入目录。
FileInputFormat.addInputPath(job,
new
Path(
"/user/root/aoman.txt"
));
Path tempDir =
new
Path(
"MySortWordCount-temp-"
+ Integer.toString(
new
Random().nextInt(Integer.MAX_VALUE)));
FileOutputFormat.setOutputPath(job, tempDir);
if
(job.waitForCompletion(
true
))
{
Job sortJob =
new
Job(conf,
"csdnsort"
);
sortJob.setJarByClass(MySortWordCount.
class
);
FileInputFormat.addInputPath(sortJob, tempDir);
sortJob.setMapperClass(SortMapper.
class
);
FileOutputFormat.setOutputPath(sortJob,
new
Path(
"/user/root/sort1"
));
sortJob.setOutputKeyClass(IntWritable.
class
);
sortJob.setOutputValueClass(Text.
class
);
sortJob.setSortComparatorClass(IntDecreasingComparator.
class
);
FileSystem.get(conf).deleteOnExit(tempDir);
System.exit(sortJob.waitForCompletion(
true
) ?
0
:
1
);
}
System.exit(job.waitForCompletion(
true
) ?
0
:
1
);
}
}
|
版权声明:原创作品,如需转载,请注明出处。否则将追究法律责任
本文转自 梦朝思夕 51CTO博客,原文链接:http://blog.51cto.com/qiangmzsx/1404661