本节书摘来华章计算机《Hadoop与大数据挖掘》一书中的第2章 ,第2.4.2节,张良均 樊 哲 位文超 刘名军 许国杰 周 龙 焦正升 著 更多章节内容可以访问云栖社区“华章计算机”公众号查看。
2.4.2 MapReduce原理
1.通俗理解MapReduce原理
现在你接到一个任务,给你10本长篇英文小说,让你统计这10本书中每一个单词出现的次数。这便是Hadoop编程中赫赫有名的HelloWorld程序:词频统计。这个任务的结果形式如表2-6所示。
即在这10本书中a共出现了12300次,ai共出现了63次……依次计算出每一个单词出现多少次。天啊,这个工作必须由专业人士做呀,自己做的话还不累死呀。这时你可以把这个工作外包给一支职业分布式运算工程队做。
分布式运算工程队中按岗位有Mapper、Mapper助理Comb-iner、Mapper助理InputFormat、Mapper助理Patitioner、运输负责Shuffle、Reducer、Reducer助理Sort、Reducer助理OutputFormat。除了Combiner是非必需人员外,其他岗位都是必需的。下面描述一下这个工程队是怎么做这项工作的。
首先把这10本书分别分到10个Mapper手中。Mapper助理InputFormat负责从书中读取记录,Mapper负责记录怎么解析重新组织成新的格式。然后Mapper把自己的处理结果排好序后放到书旁边,等待Shuffle取走结果。Shuffle把取到的结果送给Reducer助理Sort,由Sort负责把所有Mapper的结果排好序,然后送给Reducer来进行汇总,以得到最终的结果;最后,由Reducer助理Outputformat记录到规定位置并存档。
下面说明什么时候需要Combiner。Maper助理InputForormat从书中一行行读取记录,给到Mapper,Mapper从Inputformat的记录中解析出一个个单词,并进行记录。Mapper处理的结果形如“a出现了一次,a出现了一次,ai出现了一次……zhe出现了一次”。工作一段时间后发现负责搬运工作的Suffle有点吃不消,这时就用到Mapper助理Combiner了。由Combiner对的输出结果进行短暂的汇总,把Mapper的结果处理成形如“书本一中单词a共出现1500次,ai出现了14次,are出现了80次……”这样Shuffle的压力顿时减轻了许多。
对于每个岗位工程队都是有默认时限的。但如果默认时限不能满足需求,也可以对工作量进行自定义。
上面的过程描述了一个MapReduce工程队是如何进行配合工作的。这个过程与MapReduce分布式运算是基本对应的。理解了上面的过程也就大概理解了Hadoop的Map-Reduce过程了。
2. MapReduce过程解析
MapReduce过程可以解析为如下所示:
1)文件在HDFS上被分块存储,DataNode存储实际的块。
2)在Map阶段,针对每个文件块建立一个map任务,map任务直接运行在DataNode上,即移动计算,而非数据,如图2-30所示。
3)每个map任务处理自己的文件块,然后输出新的键值对,如图2-31所示。
4)Map输出的键值对经过shuffle/sort阶段后,相同key的记录会被输送到同一个reducer中,同时键是排序的,值被放入一个列表中,如图2-32所示。
5)每个reducer处理从map输送过来的键值对,然后输出新的键值对,一般输出到HDFS上。
3.单词计数源码解析
上面的分析都是建立在理论基础上的,这样的分析有利于编写MapReduce程序。但是如果要实际编写一个MapReduce的简单程序,还是不够的,需要具体看示例代码。这里直接以官网提供的example代码中的WordCount程序作为示例,进行代码级别分析和说明。
首先,在Hadoop的发行版中找到对应的代码。在解压下载的Hadoop2.6.0的发行版目录中,找到hadoop-2.6.0sharehadoopmapreducesources目录,该目录下面有一个hadoop-mapreduce-examples-2.6.0-sources.jar文件,使用压缩文件解压缩该文件,在目录org/apache/Hadoop/examples中即可找到WordCount.java文件,如图2-33所示。
找到该文件后,使用文本软件打开,或拷贝到Eclipse工程中查看,如代码清单2-23所示。
代码清单2-23 WordCount.java代码
package org.apache.hadoop.examples;
/**
省略代码
*/
public class WordCount {
public static class TokenizerMapper
extends Mapper<Object, Text, Text, IntWritable>{
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();
public void map(Object key, Text value, Context context
) throws IOException, InterruptedException {
StringTokenizer itr = new StringTokenizer(value.toString());
while (itr.hasMoreTokens()) {
word.set(itr.nextToken());
context.write(word, one);
}
}
}
public static class IntSumReducer
extends Reducer<Text,IntWritable,Text,IntWritable> {
private IntWritable result = new IntWritable();
public void reduce(Text key, Iterable<IntWritable> values,
Context context
) throws IOException, InterruptedException {
int sum = 0;
for (IntWritable val : values) {
sum += val.get();
}
result.set(sum);
context.write(key, result);
}
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
if (otherArgs.length < 2){
System.err.println("Usage: wordcount <in> [<in>...] <out>");
System.exit(2);
}
Job job = new Job(conf, "word count");
job.setJarByClass(WordCount.class);
job.setMapperClass(TokenizerMapper.class);
job.setCombinerClass(IntSumReducer.class);
job.setReducerClass(IntSumReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
for (int i = 0; i < otherArgs.length - 1; ++i) {
FileInputFormat.addInputPath(job, new Path(otherArgs[i]));
}
FileOutputFormat.setOutputPath(job,
new Path(otherArgs[otherArgs.length - 1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
下面对该代码进行分析。
(1)应用程序Driver分析
这里的Driver程序主要指的是main函数,在main函数里面设置MapReduce程序的一些初始化设置,并提交任务等待程序运行完成,如代码清单2-24所示。
代码清单2-24 WordCount main 函数代码
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
if (otherArgs.length < 2){
System.err.println("Usage: wordcount <in> [<in>...] <out>");
System.exit(2);
}
Job job = new Job(conf, "word count");
job.setJarByClass(WordCount.class);
job.setMapperClass(TokenizerMapper.class);
job.setCombinerClass(IntSumReducer.class);
job.setReducerClass(IntSumReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
for (int i = 0; i < otherArgs.length - 1; ++i) {
FileInputFormat.addInputPath(job, new Path(otherArgs[i]));
}
FileOutputFormat.setOutputPath(job,
new Path(otherArgs[otherArgs.length - 1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
下面,针对WordCount main函数代码进行分析说明。
1)第1部分Configuration代码,初始化相关Hadoop配置。在2.4.1节中也看到过,这里直接新建一个实例即可。如果是在实际的应用程序中,可以通过conf.set()函数添加必要参数,即可直接运行。
2)第2部分代码新建Job,并设置主类。这里的Job实例需要把Configuration的实例传入,后面的“word count”是该MapReduce任务的任务名(注意这里的方式使用的还是不推荐的MRV1的版本,推荐使用MRV2的版本)。
3)第3部分代码设置Mapper、Reducer、Combiner,这里的设置代码都是固定写法,里面的类名可以改变,一般情况下里面的类名为实际任务Mapper、Reducer、Combiner。
4)第4部分代码设置输出键值对格式。在MapReduce任务中涉及三个键值对格式:Mapper输入键值对格式,Mapper输出键值对格式,Reducer输入键值对格式,Reducer输出键值对格式。当Mapper输出键值对格式和Reducer输出键值对格式一样的时候,可以只设置输出键值对的格式(这个其实就是Reducer输出的键值对格式),否则需要设置“job.setMapOutputKeyClass(Text.class); job.setMapOutputKeyClass(IntWritable.class);”。
5)第5、第6部分代码设置输入、输出路径,其实还有输入、输出文件格式的设置,只是这里没有设置,如果不是默认格式,那么还是需要设置的。
6)最后部分代码是提交MapReduce任务运行(是固定写法),并等待任务运行结束。
综合上面的描述,这里给出MapReduce任务初始化以及提交运行的一般代码,如代码清单2-25所示。
代码清单2-25 MapReduce通用Driver代码
Configuration conf = new Configuration();
Job job =Job.getInstance(conf);
job.setMapperClass(AverageMapper.class);
job.setReducerClass(AverageReducer.class);
job.setCombinerClass(Reducer.class);
job.setMapOutputKeyClass(Writable.class);
job.setMapOutputValueClass(Writable.class);
job.setOutputKeyClass(Writable.class);
job.setOutputValueClass(Writable.class);
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
job.waitForCompletion(true);
在实际应用程序中,一般是直接从应用程序提交任务到Hadoop集群的,而非使用yarn jar的方式提交jar包来运行算法的。这里给出通用的提交应用程序到Hadoop集群的代码作为参考,不过在此之前需要简要分析下Configuration这个类。
Configuration是Hadoop系统的基础公共类,可以通过这个类的API加载配置信息,同时在初始化这个类的实例的时候也可以设置Hadoop集群的配置,从而直接针对某个Hadoop集群提交任务,其API如图2-34所示。
Configuration各种set API中用得比较多的还是第1个,通用的提交应用程序到Hadoop集群的代码也是使用的第1个,见代码清单2-26。
代码清单2-26 通用提交应用程序到Hadoop集群代码
Configuration configuration = new Configuration();
configuration.setBoolean("mapreduce.app-submission.cross-platform", true);
// 配置使用跨平台提交任务
configuration.set("fs.defaultFS", "hdfs://node1:8020"); // 指定namenode
configuration.set("mapreduce.framework.name", "yarn"); // 指定使用yarn框架
configuration.set("yarn.resourcemanager.address", "node1:8032");
// 指定resourcemanager
configuration.set("yarn.resourcemanager.scheduler.address", "node1:8030");
// 指定资源分配器
configuration.set("mapreduce.jobhistory.address", "node2:10020");
// 指定historyserver
configuration.set("mapreduce.job.jar","C:\\Users\\fansy\\Desktop\\jars\\
import2hbase.jar");//设置包含Mapper、Reducer的jar包路径
上面的值需要根据实际的Hadoop集群对应配置进行修改。
同时,通过Configuration的set方法也可以实现在Mapper和Reducer任务之间信息共享。比如在Driver中设置一个参数number,在Mapper或Reducer中取出该参数,如代码清单2-27所示(注意,在MapReduce程序中是不能通过全局static变量获取值的,这点需要特别注意)。
代码清单2-27 通过Configuration在Driver和Mapper/Reducer传递参数
// 在Driver中设置参数值
Configuration conf = new Configuration();
conf.setInt(“number”,10);
// 在Reducer中取出参数值
public class MyReducer extends Reducer<K2,V2,K3,V3>{
public void setup(Context context){
int number = context.getConfiguration().getInt(“number”);
}
}
(2)Mapper分析
对于用户来说,其实比较关心的是Mapper的map函数以及Reducer的reduce函数,这里先分析Mapper的map函数,如代码清单2-28所示。
代码清单2-28 WordCount Mapper代码
public static class TokenizerMapper extends Mapper<Object, Text, Text, Int-Writable> {
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();
@Override
protected void setup(Context context)
throws IOException, InterruptedException {
super.setup(context);
}
public void map(Object key, Text value, Context context)
throws IOException, InterruptedException {
StringTokenizer itr = new StringTokenizer(value.toString());
while (itr.hasMoreTokens()) {
word.set(itr.nextToken());
context.write(word, one);
}
}
@Override
protected void cleanup(Context context)
throws IOException, InterruptedException {
super.cleanup(context);
}
}
1)自定义Mapper需要继承Mapper,同时需要设置输入输出键值对格式,其中输入键值对格式是与输入格式设置的类读取生成的键值对格式匹配,而输出键值对格式需要与Driver中设置的Mapper输出的键值对格式匹配。
2)Mapper有3个函数,分别是setup、map、cleanup,其中实现setup、cleanup函数不是必须要求,Mapper任务启动后首先执行setup函数,该函数主要用于初始化工作;针对每个键值对会执行一次map函数,所有键值对处理完成后会调用cleanup函数,主要用于关闭资源等操作。
3)实现的map函数就是与实际业务逻辑挂钩的代码,主要由用户编写,这里是单词计数程序,所以这里的逻辑是把每个键值对(键值对组成为:<行的偏移量,行字符串>)的值(也就是行字符串)按照空格进行分割,得到每个单词,然后输出每个单词和1这样的键值对。
(3)Reducer分析
Reducer针对Mapper的输出进行整合,同时输入给Reducer的是键值对组,所以其实Reducer中的reduce函数就是针对每个键的所有汇总值的处理。Reducer代码如代码清单2-29所示。
代码清单2-29 WordCount Reducer代码
public class IntSumReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
private IntWritable result = new IntWritable();
@Override
protected void setup(Reducer<Text, IntWritable, Text, IntWritable>.Context context)
throws IOException, InterruptedException {
super.setup(context);
}
public void reduce(Text key, Iterable<IntWritable> values, Context context)
throws IOException, InterruptedException {
int sum = 0;
for (IntWritable val : values) {
sum += val.get();
}
result.set(sum);
context.write(key, result);
}
@Override
protected void cleanup(Reducer<Text, IntWritable, Text, IntWritable>.Context context)
throws IOException, InterruptedException {
super.cleanup(context);
}
}
1)自定义Reducer同样需要继承Reducer,与Mapper相同,需要设置输入输出键值对格式,这里的输入键值对格式需要与Mapper的输出键值对格式保持一致,输出键值对格式需要与Driver中设置的输出键值对格式保持一致。
2)Reducer也有3个函数:setup、cleanup、reduce,其中setup、cleanup函数其实和Mapper的同名函数功能一致,并且也是setup函数在最开始执行一次,而cleanup函数在最后执行一次。
3)用户一般比较关心reduce函数的实现,这个函数里面写的就是与业务相关的处理逻辑了,比如,这里单词计数,就针对相同键,把其值的列表全部加起来进行输出。