MapReduce执行步骤
Map处理任务
读取文件每一行,解析成<key、value>,调用map函数
处理逻辑对key、value处理,行成新的key、value
数据分区
Reduce处理任务
拷贝map任务输出到reduce节点,对map任务输出合并,排序
处理逻辑处理key、value,行成新的key、value
保存到文件中
wordcount示例
准备文件
vim word.txt hello Jack hello Tom hello Jimi hello Mili hello Make
上传文件
hadoop fs -put word.txt /word.txt hadoop fs -ls / # 查看
运行任务
cd hadoop-2.8.5/share/hadoop/mapreduce hadoop jar hadoop-mapreduce-examples-2.8.5.jar wordcount /word.txt /wcout
查看任务结果
hadoop fs -ls /wcout hadoop fs -cat /wcout/part-r-00000 Jack 1 Jimi 1 Make 1 Mili 1 Tom 1 hello 5
java示例
mapper
package mr;
import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import java.io.IOException; /** * 继承Mapper 实现map计算 * 传递的参数需要实现序列化,通过网络传输 */ public class MapDemo extends Mapper<LongWritable, Text, Text, LongWritable>{ protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { // 接收数据 String line = value.toString(); // 切分单词 String[] words = line.split(" "); // 将每个单词转为数字 for(String word: words) { context.write(new Text(word), new LongWritable(1)); } } }
reducer
package mr;
import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; import java.io.IOException; /** * 继承Reducer,实现reduce计算 */ public class ReduceDemo extends Reducer<Text, LongWritable, Text, LongWritable> { @Override protected void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException { // 定义计数器 long count = 0; // 统计 for (LongWritable counter : values) { count += counter.get(); } // 输出结果 context.write(key, new LongWritable(count)); } }
package mr;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import java.io.IOException; /** * 统计单词个数 * 运行:hadoop jar hdfsdemo.jar * 根据实际路径指定输入输出文件 */ public class WordCount { public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { // 构建Job对象 Job job = Job.getInstance(new Configuration()); // 注意:main方法所在类 job.setJarByClass(WordCount.class); // 设置输入文件路径 FileInputFormat.setInputPaths(job, new Path(args[0])); // 设置Mapper属性 job.setMapperClass(MapDemo.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(LongWritable.class); // 设置Reducer属性 job.setReducerClass(ReduceDemo.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(LongWritable.class); // 设置输出文件路径 FileOutputFormat.setOutputPath(job, new Path(args[1])); // 提交任务 job.waitForCompletion(true); } }
将WordCount类的打包为jar,上传至服务器,运行
hadoop jar hdfsdemo.jar /word.txt /out
查看输出文件,和haoop中自带的wordcount输出一致
Jack 1 Jimi 1 Make 1 Mili 1 Tom 1 hello 5
总结
导入依赖jar包
hadoop-2.8.5/share/hadoop/mapreduce/
自定义任务
分析业务逻辑,确定输入输出样式
继承Mapper
继承Reducer
通过job对象组装Mapper和Reducer