接下来是 reduce task 逻辑:
/** * KEYIN VALUEIN 对于map 阶段输出的KEYOUT VALUEOUT * <p> * KEYOUT :是自定义 reduce 逻辑处理结果的key * VALUEOUT : 是自定义reduce 逻辑处理结果的 value */ public class WordcountReduce extends Reducer<Text, IntWritable, Text, IntWritable> { /** * <zhouq,1>,<zhouq,1>,<zhouq,2> ...... * 入参key 是一组单词的kv对 的 key */ @Override protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { //拿到当前传送进来的 单词 // String word = key.toString(); // int count = 0; for (IntWritable value : values) { count += value.get(); } //这里的key 就是单词 context.write(key, new IntWritable(count)); } }
最后是启动类:
/** * wc 启动类 */ public class WordCountDriver { public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { Configuration conf = new Configuration(); // mapreduce.framework.name 配置成 local 就是本地运行模式,默认就是local // 所谓的集群运行模式 yarn ,就是提交程序到yarn 上. 要想集群运行必须指定下面三个配置. // conf.set("mapreduce.framework.name", "yarn"); // conf.set("yarn.resoucemanager.hostname", "mini1"); //conf.set("fs.defaultFS","com.zhouq.hdfs://mini1:9000/"); Job job = Job.getInstance(conf); //指定本程序的jar 包 所在的本地路径 job.setJarByClass(WordCountDriver.class); //指定本次业务的mepper 和 reduce 业务类 job.setMapperClass(WordCountMapper.class); job.setReducerClass(WordcountReduce.class); //指定mapper 输出的 key value 类型 job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(IntWritable.class); //指定 最终输出的 kv 类型 job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); //指定job的输入原始文件所在目录 FileInputFormat.setInputPaths(job,new Path(args[0])); //指定job 输出的文件目录 FileOutputFormat.setOutputPath(job,new Path(args[1])); boolean waitForCompletion = job.waitForCompletion(true); System.exit(waitForCompletion ? 0 : 1); } }
配置启动类参数:填写输入目录和输出目录,注意输出目录不能存在,不然会执行失败的。
执行我们就用编辑器执行,用本地模式,不提交到hadoop 集群上,执行完成后,去到输出目录下可以看到这些文件:
然后输出一下 part-r-00000 这个文件:代码地址:https://github.com/heyxyw/bigdata/blob/master/bigdatastudy/mapreduce/src/main/java/com/zhouq/mr/WordCountDriver.java
最后
希望对你有帮助。后面将会去讲 MapReduce 是如何去运行的。