一、MapReduce编程模型概述
MR是分布式计算模型
MR整个并行计算过程中会抽象出两个函数:
map():它是对我们独立元素中的每一个元素进行并行计算操作的函数
reduce():它是对我们独立元素中的数据进行合并
一个简单 的MR程序,我们只需要指定map() reduce() input output,剩下的事情交个我们的框架来完成
关于MR的数据处理流程:
数据处理的阶段 input -> map -> reduce ->output
任务Job = Map+Reduce
Map的输出是Reduce的输入
数据处理过程中,MR要求的数据格式是以
是Map的输入,是Map的输出。
是Reduce的输入,是Reduce的输出。
MapReduce编程模型图:
- k2 = k3,v3是一个集合,v3的元素就是v2,表示为v3=list(v2)
- 所有的输入和输出的数据类型必须是Hadoop的数据类型
- MapReduce处理的数据都是HDFS的数据(或HBase)
一个小案例,解释上面的流程
数据源:
a good beginning is half the battle where there is a will there is a way
map阶段:
输入是,其中k1默认是指偏移量,类型是LongWritable,v1是指一行的文本内容,类型是Text。因为要统计单词的数量,所以k2是指单词,类型是Text。v2用数字1表示,即一个单词记一数,类型是IntWritable。
并行读取文本,获取,进行map操作,输出
读取第一行:
a good beginning is half the battle
在用户自己实现的map函数中,取到的v1即这一行的文本,以空格为分隔符对字符串进行拆分,获取到每个单词,并以的格式输出,得到:
<a,1> <good,1> <beginning,1> <is,1> <half,1> <the,1> <battle,1>
读取第二行:
where there is a will there is a way
得到:
<where,1> <there,1> <is,1> <a,1> <will,1> <there,1> <is,1> <a,1> <way,1>
Reduce阶段:
按MapReduce的理论得知,在Reduce阶段,输入,k3与k2一致,表示单词,类型是Text。v3是v2的集合,即List(v2)。同时k3默认已经按字典顺序排序,因此是:
<a,(1,1,1)> <battle,(1)> <beginning,(1)> <good,(1)> <half,(1)> <is,(1,1,1)> <the,(1)> <there,(1,1)> <way,(1)> <where,(1)> <will,(1)>
在用户实现自己的reduce函数中,读入并统计v3集合中每个单词的总数,并将输出,得到:
<a,3> <battle,1> <beginning,1> <good,1> <half,1> <is,3> <the,1> <there,2> <way,1> <where,1> <will,1>
整个MR的大致过程如下:
二、WordCount案例
统计一批英文文章中,每个单词出现的总次数。
现在有一个输入文件"Gone With The Wind",这个文件有三个block:block1, block2, block3。三个block的内容依次如下图。MR原理图
Map阶段
每一个block对应一个分片split (默认split与block一一对应)。
每一个split对应一个map任务(map task)。所以这里三个block将会对应三个map task(map1, map2, map3),这3个任务的逻辑完全一样。
以map1为例。map1会读取block1的数据,一次读取block1的一行数据,然后会产生一个kv对(其中,key是当前所读行的行首相对于当前block开始处的字节偏移量;value是当前行的内容;如假设当前所读行是第一行,那么当前行的内容是"Dear Bear River",则kv对是(0, “Dear Bear River”)),作为map()的参数传入,调用map()。
map()方法。将value当前行内容按空格切分,得到三个单词Dear|Bear|River,然后将每个单词变成键值对(Dear, 1)|(Bear, 1)|(River, 1),最终结果输出为文件,写入map任务所在节点的本地磁盘中(其中还有一个Shuffle的过程)。
block的第一行数据被处理完后,接着处理第二行,当map任务将当前block所有的数据全部处理完后,此map任务即运行结束。
Reduce阶段
reduce任务(reduce task)的个数由用户程序指定,main()内的job.setNumReduceTask(4)指定reduce任务是4个(reduce1, reduce2, reduce3, reduce4)。
以reduce1为例。reduce1通过网络,连接到map1,将map1输出结果中属于reduce1的分区的数据通过网络获取到reduce1端(拷贝阶段)。同样地,也会连接到map2,map3获取数据。最终reduce1端获得4个(Dear, 1)键值对;由于key键相同,它们分到同一个组。4个(Dear, 1)键值对,转换成[Dear, Iterable(1, 1, 1, )],作为两个参数传入(其中还有一个Shuffle的过程,下文会详细讲解),调用reduce()。
reduce()方法。计算Dear的总数为4,并将(Dear, 4)作为键值对输出,最终结果输出成文件,写入HDFS。
三、MapReduce基于WordCount编程
(1)MapReduce基于本地调试
准备数据源
java python hadoop spark spark java hadoop hive hbase hbase hive spark java python hadoop linux unix java python spark spark storm flink java flink hue kafka kafka java python zookeeper zookeeper spark hive hbase hive
基于WordCount业务逻辑编写:
Map数据处理:
<0,hadoop spark> -> split -> <hadoop,1> <spark,1>
Reduce数据处理:
<hadoop,List(1)> <spark,List(1,1,1,1)> <java,List(1,1)
总结:k1进map k2出,k3进reduce k4出
基于本地测试,去掉core-site.xml中 “hadoop.tmp.dir”配置项
<property> <name>hadoop.tmp.dir</name> <value>/opt/modules/hadoop/tmp</value> </property>
示例代码:
package com.kfk.hadoop.mr; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import java.io.IOException; /** * @author : 蔡政洁 * @email :caizhengjie888@icloud.com * @date : 2020/10/9 * @time : 7:07 下午 */ public class WordCountMR { // 1.map public static class WordCountMapper extends Mapper<LongWritable, Text,Text, IntWritable>{ // 创建k2,v2对象 private final Text mapOutputKey = new Text(); private final IntWritable mapOutputValue = new IntWritable(1); @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { // 输出<k1,v1>,k1就是偏移量,v1就是一行行数据 System.out.println("KeyIn: "+key+" ValueIn: "+value); // 每一行数据 String lineValue = value.toString(); // 将数据按照空格分开 String[] strs = lineValue.split(" "); // 输出<k2,v2> for (String str:strs){ mapOutputKey.set(str); context.write(mapOutputKey,mapOutputValue); } } } // 2.reduce public static class WordCountReducer extends Reducer<Text,IntWritable,Text,IntWritable>{ // 创建v4对象 private final IntWritable outputValue = new IntWritable(); @Override protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { // 对v3求和 int sum = 0; for (IntWritable value:values){ sum += value.get(); } outputValue.set(sum); // 输出<k4,v4> context.write(key,outputValue); } } // driver public int run(String[] args) throws IOException, ClassNotFoundException, InterruptedException { // 1) get conf Configuration configuration = new Configuration(); // 2) create job Job job = Job.getInstance(configuration,this.getClass().getSimpleName()); job.setJarByClass(this.getClass()); // 3.1) input,指定job的输入 Path path = new Path(args[0]); FileInputFormat.addInputPath(job,path); // 3.2) map,指定job的mapper和输出的类型 job.setMapperClass(WordCountMapper.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(IntWritable.class); // 3.3) reduce,指定job的reducer和输出类型 job.setReducerClass(WordCountReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); // 3.4) output,指定job的输出 Path outpath = new Path(args[1]); FileOutputFormat.setOutputPath(job,outpath); // 4) commit,执行job boolean isSuccess = job.waitForCompletion(true); // 如果正常执行返回0,否则返回1 return (isSuccess) ? 0 : 1; } public static void main(String[] args) { // 添加输入,输入参数 args = new String[]{ "hdfs://bigdata-pro-m01:9000/user/caizhengjie/datas/wordcount.txt", "hdfs://bigdata-pro-m01:9000/user/caizhengjie/mr/output" }; WordCountMR wordCountMR = new WordCountMR(); try { // 判断输出的文件存不存在,如果存在就将它删除 Path fileOutPath = new Path(args[1]); FileSystem fileSystem = FileSystem.get(new Configuration()); if (fileSystem.exists(fileOutPath)){ fileSystem.delete(fileOutPath,true); } // 调用run方法 int status = wordCountMR.run(args); // 退出程序 System.exit(status); } catch (IOException e) { e.printStackTrace(); } catch (ClassNotFoundException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } } }
将数据源上传到hdfs上:
bin/hdfs dfs -mkdir /user/caizhengjie/datas
bin/hdfs dfs -put /opt/datas/wordcount.txt /user/caizhengjie/datas
创建输出的文件路径
bin/hdfs dfs -mkdir /user/caizhengjie/mr
这是测试Hadoop中自带的mapreduce程序
bin/hadoop jar share/hadoop/mapreduce/hadoop-mapreduce-examples-2.6.0-cdh5.9.3.jar wordcount /user/caizhengjie/datas/wordcou
查看结果
bin/hdfs dfs -text /user/caizhengjie/mr/output/par*
where applicable 1 flink 2 hadoop 3 hbase 3 hive 4 hue 1 java 6 kafka 2 linux 1 python 4 spark 6 storm 1 unix 1 zookeeper 2
(2)MapReduce基于YARN调试
将编写好的程序,上传到虚拟机,并测试:
bin/hadoop jar /opt/jars/bigdata_study.jar /user/caizhengjie/datas/wordcount.txt /user/caizhengjie/mr/output
bin/hdfs dfs -text /user/caizhengjie/mr/output/par*
会出现上面的同样的结果。
四、MapReduce模板优化
根据上面代码,我们可以对MapReduce的编程模板进行优化,成为通用编程模版:
package com.kfk.hadoop.mr; import com.google.common.collect.Lists; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; import java.io.IOException; import java.util.List; /** * @author : 蔡政洁 * @email :caizhengjie888@icloud.com * @date : 2020/10/9 * @time : 7:07 下午 */ public class WordCountUpMR extends Configured implements Tool { /** * map */ public static class WordCountMapper extends Mapper<LongWritable, Text,Text, IntWritable>{ // 创建k2,v2对象 private final Text mapOutputKey = new Text(); private final IntWritable mapOutputValue = new IntWritable(1); @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { // 输出<k1,v1>,k1就是偏移量,v1就是一行行数据 System.out.println("Map in == KeyIn: "+key+" ValueIn: "+value); // 每一行数据 String lineValue = value.toString(); // 将数据按照空格分开 String[] strs = lineValue.split(" "); // 输出<k2,v2> for (String str:strs){ mapOutputKey.set(str); context.write(mapOutputKey,mapOutputValue); // 打印出<k2,v2> System.out.println("Map out == KeyOut: "+mapOutputKey+" ValueOut: "+mapOutputValue); } } } /** * reduce */ public static class WordCountReducer extends Reducer<Text,IntWritable,Text,IntWritable>{ // 创建v4对象 private final IntWritable outputValue = new IntWritable(); @Override protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { // 打印出k3,v3集合,用于本机测试 List<IntWritable> list = Lists.newArrayList(values); System.out.println("Reduce in == KeyIn: "+key+" ValueIn: "+list); // 对v3求和 int sum = 0; for (IntWritable value:list){ sum += value.get(); } outputValue.set(sum); // 输出<k4,v4> context.write(key,outputValue); // 打印出k4,v4的值 System.out.println("Reduce out == KeyOut: "+key+" ValueOut: "+outputValue); } } /** * run * @param args * @return * @throws IOException * @throws ClassNotFoundException * @throws InterruptedException */ public int run(String[] args) throws IOException, ClassNotFoundException, InterruptedException { // 1) get conf Configuration configuration = this.getConf(); // 2) create job Job job = Job.getInstance(configuration,this.getClass().getSimpleName()); job.setJarByClass(this.getClass()); // 3.1) input,指定job的输入 Path path = new Path(args[0]); FileInputFormat.addInputPath(job,path); // 3.2) map,指定job的mapper和输出的类型 job.setMapperClass(WordCountMapper.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(IntWritable.class); // 3.3) reduce,指定job的reducer和输出类型 job.setReducerClass(WordCountReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); // 3.4) output,指定job的输出 Path outpath = new Path(args[1]); FileOutputFormat.setOutputPath(job,outpath); // 4) commit,执行job boolean isSuccess = job.waitForCompletion(true); // 如果正常执行返回0,否则返回1 return (isSuccess) ? 0 : 1; } public static void main(String[] args) { // 添加输入,输入参数 args = new String[]{ "hdfs://bigdata-pro-m01:9000/user/caizhengjie/datas/wordcount.txt", "hdfs://bigdata-pro-m01:9000/user/caizhengjie/mr/output" }; // WordCountUpMR wordCountUpMR = new WordCountUpMR(); Configuration configuration = new Configuration(); try { // 判断输出的文件存不存在,如果存在就将它删除 Path fileOutPath = new Path(args[1]); FileSystem fileSystem = FileSystem.get(configuration); if (fileSystem.exists(fileOutPath)){ fileSystem.delete(fileOutPath,true); } // 调用run方法 int status = ToolRunner.run(configuration,new WordCountUpMR(),args); // 退出程序 System.exit(status); } catch (IOException e) { e.printStackTrace(); } catch (ClassNotFoundException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } catch (Exception e) { e.printStackTrace(); } } }
运行结果:
五、总结
WordCount程序MR运行流程:
input -> map -> reduce -> output
每个阶段的数据类型
map输入 -> <0,hadoop spark> map输出 -> <hadoop,1><spark,1> reduce输入 -> <hadoop,List(1,1)> reduce输出 -> <hadoop,2>
要知道的几个知识点:
- 第一点:默认情况下,Map输入对格式
key:偏移量 value:文本中的一行值
- 第二点:map与reduce之间的shuffle过程
map -> partition -> sort -> group -> reduce *分区: 决定map输出的<key,value>,发送给哪个reduce去处理 *排序: map -> 默认按单词首字母排序 *分组: 将相同key的value组合在一起,放在List中 eg: (keyIn:spark ValueIn:[1, 1, 1, 1])
- 第三点:reduce的输出结果,默认情况下,key和value作为一行数据输出key和value之间的分隔符为制表符 \t