packagecom.syh;
importorg.apache.hadoop.conf.Configuration;
importorg.apache.hadoop.fs.FileSystem;
importorg.apache.hadoop.fs.Path;
importorg.apache.hadoop.io.LongWritable;
importorg.apache.hadoop.io.Text;
importorg.apache.hadoop.mapreduce.Job;
importorg.apache.hadoop.mapreduce.Mapper;
importorg.apache.hadoop.mapreduce.Reducer;
importorg.apache.hadoop.mapreduce.lib.input.FileInputFormat;
importorg.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
importjava.io.IOException;
publicclassWordCountApp {
publicstaticclassMyMapperextendsMapper<LongWritable, Text, Text, LongWritable> {
LongWritableone=newLongWritable(1);
@Overrideprotectedvoidmap(LongWritablekey, Textvalue, Contextcontext) throwsIOException, InterruptedException {
Stringline=value.toString();
String[] s=line.split(" ");
for (Stringword : s) {
context.write(newText(word), one);
}
}
}
publicstaticclassMyReducerextendsReducer<Text, LongWritable, Text, LongWritable> {
@Overrideprotectedvoidreduce(Textkey, Iterable<LongWritable>values, Contextcontext) throwsIOException, InterruptedException {
longsum=0;
for (LongWritablevalue : values) {
sum+=value.get();
}
context.write(key, newLongWritable(sum));
}
}
publicstaticvoidmain(String[] args) throwsException {
Configurationconfiguration=newConfiguration();
Jobjob=Job.getInstance(configuration, "wordcount");
job.setJarByClass(WordCountApp.class);
FileInputFormat.setInputPaths(job, newPath(args[0]));
job.setMapperClass(MyMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(LongWritable.class);
job.setReducerClass(MyReducer.class);
job.setOutputKeyClass(MyReducer.class);
job.setOutputValueClass(LongWritable.class);
PathoutPath=newPath(args[1]);
FileSystemfileSystem=FileSystem.get(configuration);
if (fileSystem.exists(outPath)) {
fileSystem.delete(outPath, true);
System.out.println("输出路径已存在, 已被删除");
}
FileOutputFormat.setOutputPath(job, outPath);
System.exit(job.waitForCompletion(true) ?0 : 1);
}
}