MapReduce单词计数示例
输入
hello world our world
hello bigdata real bigdata
hello hadoop great hadoop
hadoopmapreduce
输出
bigdata 2
great 1
hadoop 3
hello 3
mapreduce 1
our 1
real 1
world 2
代码运行
MapReduce运行流程:input、split、map、shuffle、reduce、output
1.默认情况下,分片个数与数据块个数一致
2.一个分片对应一个Map
3.Map与Reduce读取与输出的数据均为键值对
4.Shuffle阶段能够按键对数据进行分组、排序
MapReduce中的数据类型
实现Mapper类
实现Reducer类
实现Driver类
1. package mr; 2. 3. import java.io.IOException; 4. import java.util.StringTokenizer; 5. 6. import org.apache.hadoop.conf.Configuration; 7. import org.apache.hadoop.fs.Path; 8. import org.apache.hadoop.io.IntWritable; 9. import org.apache.hadoop.io.Text; 10. import org.apache.hadoop.mapreduce.Job; 11. import org.apache.hadoop.mapreduce.Mapper; 12. import org.apache.hadoop.mapreduce.Reducer; 13. import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; 14. import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; 15. import org.apache.hadoop.mapreduce.lib.partition.HashPartitioner; 16. 17. public class WordCount { 18. // map方法,划分一行文本,读一个单词写出一个<单词,1> 19. public static class MyMapper extends Mapper<Object, Text, Text, IntWritable> { 20. private final static IntWritable one = new IntWritable(1); 21. private Text text = new Text(); 22. @Override 23. protected void map(Object key, Text value, 24. Mapper<Object, Text, Text, IntWritable>.Context context) 25. throws IOException, InterruptedException { 26. // 示例输入 key-->0 values-->"hello python hello hadoop" 27. // 对应输出 <'hello', 1> <'python', 1> <'hello', 1> <'hadoop', 1> 28. StringTokenizer sti = new StringTokenizer(value.toString()); 29. while (sti.hasMoreTokens()) { 30. text.set(sti.nextToken()); 31. context.write(text, one); 32. } 33. } 34. } 35. 36. // 定义reduce类,对相同的单词,把它们中的VList值全部相加 37. public static class MyReduce extends Reducer<Text, IntWritable, Text, IntWritable> { 38. private IntWritable result1 = new IntWritable(); 39. @Override 40. protected void reduce(Text key, Iterable<IntWritable> values, 41. Reducer<Text, IntWritable, Text, IntWritable>.Context context) 42. throws IOException, InterruptedException { 43. // 示例输入 key-->Hello values--><1,1,1,1> 44. // 对应输出 key-->Hello value-->4 45. int sum = 0; 46. for(IntWritable val:values){ 47. sum += val.get(); 48. } 49. result1.set(sum); 50. context.write(key, result1); 51. } 52. } 53. 54. public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { 55. Configuration conf = new Configuration(); //创建配置类 56. Job job = Job.getInstance(conf, "Word-Count204"); //实例化Job类 57. 58. job.setJarByClass(WordCount.class); //设置主类名 59. 60. TextInputFormat.setInputPaths(job, new Path(args[0])); //设置待输入文件的位置 61. job.setInputFormatClass(TextInputFormat.class); //指定使用字符串输入格式类 62. 63. job.setMapperClass(MyMapper.class); //指定使用自定义Map类 64. job.setMapOutputKeyClass(Text.class); //指定Map类输出的,K类型,(如果同Reduce类的输出可省略) 65. job.setMapOutputValueClass(IntWritable.class); //指定Map类输出的,V类型,(如果同Reduce类的输出可省略) 66. 67. job.setReducerClass(MyReduce.class); //指定使用自定义Reduce类 68. job.setOutputKeyClass(Text.class); //指定Reduce类输出的,K类型 69. job.setOutputValueClass(IntWritable.class); //指定Reduce类输出的,V类型 70. //job.setNumReduceTasks(Integer.parseInt(args[2])); //指定Reduce个数 71. 72. job.setOutputFormatClass(TextOutputFormat.class); //指定使用默认输出格式类 73. TextOutputFormat.setOutputPath(job, new Path(args[1])); //设置输出结果文件位置 74. 75. System.exit(job.waitForCompletion(true)? 0:1); //提交任务并监控任务状态,等待任务完成 76. } 77. } 78.
总结
Ø创建Job对象
Ø设置主类
Ø设置输入
Ø设置Mapper类、设置输出的键和值类型
Ø设置Reducer类、设置输出的键和值类型
Ø设置输出
Ø提交任务