MapReduce 简介
MapReduce 核心思想
Hadoop MapReduce 是一个编程框架,它可以轻松地编写应用程序,以可靠的、容错的方式处理大量的数据(数千个节点)。
正如其名,MapReduce 的工作模式主要分为 Map 阶段和 Reduce 阶段。
一个 MapReduce 任务(Job)通常将输入的数据集分割成独立的块,这些块被 map 任务以完全并行的方式处理。框架对映射(map)的输出进行排序,然后将其输入到 reduce 任务中。通常,作业的输入和输出都存储在文件系统中。框架负责调度任务、监视任务并重新执行失败的任务。
在 Hadoop 集群中,计算节点一般和存储节点相同,即 MapReduce 框架和 Hadoop 分布式文件系统均运行在同一组节点上。这种配置允许框架有效地调度已经存在数据的节点上的作业,使得跨集群的带宽具有较高的聚合度,能够有效利用资源。
MapReduce 框架由单个主节点(Master)的 ResourceManager、每个从节点(Slave) NodeManager 和每个应用程序的 MRAppMaster 组成。
在处于最低程度时,应用程序通过实现适当的接口以及抽象类来指定输入输出的位置、提供 map 和 reduce 函数。在实现过程中还会涉及到作业配置等参数的设置。
在编程框架完善并打包之后,Hadoop 的作业客户端(job client)可以将作业(一般是 jar 包或者可执行文件)和配置项提交给 ResourceManager 。后者负责将作业代码和配置项分发给从节点(Slave),之后负责作业的调度和监视,同时也向作业客户端提供状态和诊断信息。
- Client Service: 应用提交、终止、输出信息(应用、队列、集群等的状态信息)
- Adaminstration Service: 队列、节点、Client 权限管理
- ApplicationMasterService: 注册、终止 ApplicationMaster, 获取 ApplicationMaster 的资源申请或取消的请求,并将其异步地传给 Scheduler, 单线程处理
- ApplicationMaster Liveliness Monitor: 接收 ApplicationMaster 的心跳消息,如果某个 ApplicationMaster 在一定时间内没有发送心跳,则被任务失效,其资源将会被回收,然后 ResourceManager 会重新分配一个 ApplicationMaster 运行该应用(默认尝试 2 次)
- Resource Tracker Service: 注册节点, 接收各注册节点的心跳消息
- NodeManagers Liveliness Monitor: 监控每个节点的心跳消息,如果长时间没有收到心跳消息,则认为该节点无效, 同时所有在该节点上的 Container 都标记成无效,也不会调度任务到该节点运行
- ApplicationManager: 管理应用程序,记录和管理已完成的应用
- ApplicationMaster Launcher: 一个应用提交后,负责与 NodeManager 交互,分配 Container 并加载 ApplicationMaster,也负责终止或销毁
- YarnScheduler: 资源调度分配, 有 FIFO(with Priority),Fair,Capacity 方式
- ContainerAllocationExpirer: 管理已分配但没有启用的 Container,超过一定时间则将其回收
MapReduce 编程框架
你可能想知道 MapReduce 究竟处理什么形式的数据。事实上, MapReduce 框架只对 <key, value> 形式的键值对进行处理。
换言之,该框架会将任务的输入当成一组 <key, value> 键值对,最后也会生成一组 <key, value> 键值对作为结果。其中的 key 和 value 可以根据具体问题将其理解为不同的类型。
key 和 value 的类必须由框架来完成序列化,因此我们需要做的就是实现其中的可写接口(Writable)。此外,对于其中的一些关键类还必须实现 WritableComparable 接口,以便于框架对其进行排序。在讲到具体的实现时我们会提到上述知识点。
综上所述,一个 MapReduce 作业从输入到输出的过程中,经历了以下过程。
首先是一些说明:
- <k1, v1>:输入的原始数据,即使不是 K/V 形式的,你也可以在代码中对其进行格式上的调整。
- <k3, v3>:输出的计算结果。
因此,这个过程主要经历了以下几步:
<k1, v1> -> Map -> <k2, v2> -> Combine -> <k2, v2> -> Reduce -> <k3, v3>
下面我们将对这个过程中涉及到的关键概念进行讲解。同时,也请阅读 Google 发表的关于 MapReduce 原理的论文。这篇文章对 MapReduce 的工作原理进行了深入的讲解,可以帮助理解各个流程之间的关系。
扩展链接:
- MapReduce: Simplified Data Processing on Large Clusters
Mapper
Map 阶段是一个独立的任务,由 Mapper 负责将输入的键值对(以下简称为“输入对”)映射到一组中间键值对。转换后的中间键值对(以下简称为“中间对”)不需要与原始的输入有相同的类型。给定的输入键值对可以映射到零或多个输出键值对(以下简称为“输出对”)。
例如,<A, B> 这个键值对可以在 Map 阶段分别映射为 <A, 1> 和 <B, 1> 两个键值对。
总体而言,Mapper 的实现过程主要是向 Job.setmapperclass(Class) 方法传递具体的作业代码。输入的数据被划分为等长的小数据块,称为输入分片(InputSplit)。然后框架为该作业中 InputSplit 所包含的每个键值对调用 map(WritableComparable,Writable, Context) 方法。
当然,我们在应用程序的开发中可以重写 cleanup(Context) 方法来执行一些清理工作。
刚刚提到输出对不需要与输入对具有相同的类型,给定的输入对可以映射到零或多个输出对。输出对将会在 context.write(WritableComparable, Writable) 方法调用时被收集在一起。
此外,应用程序可以使用 Counter (计数器)来报告一些统计数据。
所有与给定输出键有关的中间值都将被框架分组,并传递给 Reducer 以产生最终的输出结果。我们可以通过 Job.setGroupingComparatorClass(Class) 方法来指定 Comparator (比较器)以控制分组过程。这稍微有些复杂,在初次学习时可以仅作了解。
随后,Mapper 的输出将在 Reducer 中进行排序和分区。**分区的总数量与作业的 Reducer 任务数相同。**用户可以通过实现自定义 Partitioner (分区器)来控制哪些键(及其相关的键值对)由哪些 Reducer 处理。
用户可以通过 Job.setCombinerClass(Class) 方法来指定 Combiner (组合器)。这个 Combiner 用于在本地执行中间输出的聚合操作,这将有利于减少从 Mapper 传输到 Reducer 的数据量。本地执行是指在单个节点范围内进行处理,减少数据量是指跨节点的数据传输量变少。
在上述过程中产生的中间输出总是以 (key-len, key, value-len, value) 的格式存储,并且这些中间输出是已经排过序的。我们可以在应用程序中通过设置 Configuration 对象的参数来利用 CompressionCodec 工具控制是否要压缩中间输出,以及如何压缩中间输出。
面试时容易问到一个问题是如何计算 Map 的数量:一个简单的回答是其数量通常由输入数据的总大小决定,即输入文件块的总数。
对于 Map 来说,正常情况下的并行计算量是每个节点大约 10-100 个 Map 。对于某些计算量较小的 Map 任务,这个数量可以设置为 300 个。任务的设置过程也是需要耗费时间的,所以在海量数据背景下,最好让 Map 的处理时间不低于一分钟,以最大化地利用集群资源。
因此,如果预计输入数据有 10TB ,而块大小设置为 128MB ,那默认情况下将会产生 82000 个 map 。如果我们通过 Configuration.set(MRJobConfig。NUM_MAPS, int) 设置了更高的值,则这个数量将会根据设置再次改变。
Reducer
Map 过程(包括了 Combine)之后就是 Reduce 过程。Reduce 过程利用 Reducer 共享键来减少中间值集合的规模。
在作业中,reducer 的数量可以通过 job.setNumReduceTasks(int) 方法设置。
具体而言,Reducer 在实现时需要我们通过 Job.setReducerClass(Class) 方法来传递具体的作业代码,并可以通过重写覆盖该方法来完成初始化。框架将对分组后的输入中的每个 <key, (含有多个value的列表)> 调用 reduce(WritableComparable, Iterable<Writable>, Context) 方法。同样的,我们可以在应用程序中重写 cleanup(Context) 方法来执行清理工作。
Reducer 有三个主要阶段,分别是:shuffle 、 sort 和 reduce 。
- Shuffle:也称之为混洗,mapper 的排序输出是 Reducer 的输入。在这个阶段,MapReduce 框架通过 HTTP 协议获取所有 Mapper 输出的相关分区。
- Sort:排序。由于不同的 Mapper 可能输出相同的键,因此在此阶段,框架将对 Reducer 的输入通过键进行分组。需要特别说明的是,Shuffle(混洗)和 Sort (排序)阶段可能会同时发生。当获取到 map 的输出时,这两个阶段就会被合并进行。
- Reduce:规约。在这个阶段将会对每一个 <key, (含有多个value的列表)> 记录都调用 reduce(WritableComparable, Iterable<Writable>, Context) 方法进行处理。作为最后一个阶段,Reduce 任务的输出通常都通过 Context.write(WritableComparable, Writable) 方法写入到文件系统(无论是 HDFS 还是本地文件系统)。在这个过程中,Reducer 的输出是无序的,我们也可以使用 Counter 来统计一些数据。
Partitioner
Partitioner 用于对键的空间进行分区。
具体来说,它控制了中间 Map 输出的键的分区,其目的是为了优化并减少计算量。键(或键的子集)用于派生分区,通常由散列函数派生。分区的总数与作业的 Reduce 任务数相同。HashPartitioner 是 MapReduce 使用的默认分区程序。
Counter
Counter (计数器)是 MapReduce 应用程序报告统计数据的一种工具。在 Mapper 和 Reducer 的具体实现中,可以利用 Counter 来报告统计信息。
除了需要我们自己实现的部分,MapReduce 也自带了非常实用的 Mapper 、 Reducer 和 Partitioner 库,你可以在官方文档的 Package Summary 章节了解他们的具体用法。
WordCount 实验
在了解了 MapReduce 的工作原理之后,我们将再次使用 WordCount 来帮助理解它的工作过程。只是,这一次将由我们自己编写 WordCount 的代码来实现相关的逻辑。
MapReduce 作业的编译与执行
准备
准备输入文件
在集群启动完成后,我们上传一些数据作为稍后词频统计的“原料”。
请在终端中输入以下命令来完成上传,这一次上传的文件是 Linux 系统 /etc 目录下的 protocols 协议文件。
hdfs dfs -put /etc/protocols /user/hadoop
创建文件
数据准备好之后,在终端中输入以下命令,新建一个名为 WordCount.java 的文件。
# 请在 hadoop 用户的主目录新建该文件 cd /home/hadoop touch WordCount.java
使用 Vim 编辑器打开并编辑该文件。你也可以使用其他的编辑器。
vim /home/hadoop/WordCount.java
在 WordCount.java 文件内先输入以下代码,完成整个 WordCount 程序的大体框架。
// 以下是本程序会用到的相关的包,先引入 import java.io.IOException; import java.util.StringTokenizer; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; // 注意类名要与文件名的大小写一致 public class WordCount { // 自定义的 TokenizerMapper 类将继承自 Mapper 类,以实现相关的接口和方法 // 在 Map 阶段将会执行其中的作业逻辑 public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable>{ } // 自定义的 IntSumReducer 类将继承自 Reducer 类,以实现相关的接口和方法 // 在 Reduce 阶段将会执行其中的作业逻辑 public static class IntSumReducer extends Reducer<Text,IntWritable,Text,IntWritable> { } // main 方法是整个程序的入口,在这里涉及到作业(Job)的各项设置 public static void main(String[] args) throws Exception { } }
编写 Mapper
map 方法是 Mapper 的具体实现。在下面这段代码中,Mappter 通过 map 方法一次只处理读入的一行数据。数据是由指定的 TextInputFormat 提供的。然后,它通过 StringTokenizer 将每一行的内容分割为由空格分隔的单词,形成 <word, 1> 形式的键值对。即每个单词作为键,对应一个计数的值。
public void map(Object key, Text value, Context context) throws IOException, InterruptedException { StringTokenizer itr = new StringTokenizer(value.toString()); while (itr.hasMoreTokens()) { word.set(itr.nextToken()); context.write(word, one); } }
举个例子,如果我们在文件中读入了一行数据:
AAA BBB uiuing AAA
那么在 map 方法的处理后,将形成如下所示的键值对:
<AAA, 1> <BBB, 1> <uiuing, 1> <AAA, 1>
理解了 map 的工作机制之后,现在我们来完善 Mapper 类的代码。
请在 /home/hadoop/WordCount.java 文件中将 TokenizerMapper 的代码按照如下的样例补充完整。相关的讲解将以注释给出。
public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable>{ // 在 MapReduce 框架中,基本数据类型都封装成了 Writable 类型 // 因此 int 类型对应于 IntWritable 类型,在初始化时将其声明为静态常量是为了方便地使用 1 的值 private final static IntWritable one = new IntWritable(1); // 声明一个 Text 类型的私有成员变量 word private Text word = new Text(); // map 方法的写法是标准格式,可以参考官方文档理解各个参数的含义 public void map(Object key, Text value, Context context) throws IOException, InterruptedException { // 从 value 中读入数据并按照空格分隔 StringTokenizer itr = new StringTokenizer(value.toString()); // 将每个分隔形成的单词组装成键值对 while (itr.hasMoreTokens()) { word.set(itr.nextToken()); context.write(word, one); } } }
编写 Reduce
reduce 方法是 Reducer 的具体实现。在 WordCount 的例子中,通过如下所示的 reduce 方法可以实现汇总键值对中的值,从而达到统计每个键出现次数的目的。这也是词频统计的核心所在。
public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { int sum = 0; for (IntWritable val : values) { sum += val.get(); } result.set(sum); context.write(key, result); }
上一步 map 阶段我们得到了如下所示的键值对:
<AAA, 1> <BBB, 1> <uiuing, 1> <AAA, 1>
那么在 reduce 阶段,这些键值对中键相同的值将会被累加,聚合成如下所示的键值对:
<AAA, 2> <BBB, 1> <uiuing, 1>
理解了以上逻辑之后,请在 /home/hadoop/WordCount.java 文件中将 IntSumReducer 类的代码补充完整:
public static class IntSumReducer extends Reducer<Text,IntWritable,Text,IntWritable> { // 声明一个 IntWritable 类型值用于存放累加结果 private IntWritable result = new IntWritable(); // reduce 方法的写法也是参考官方文档进行的,相关的参数可以查阅官方文档进行理解 public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { // 进行值的累加操作 int sum = 0; for (IntWritable val : values) { sum += val.get(); } // 将 int 基本类型通过 set 方法赋予到结果中 result.set(sum); // 写入上下文中进行保存 context.write(key, result); } }
编写任务驱动
我们知道任何编程框架都会有一个主入口,就像 Java 程序以 main 方法作为主入口一样。程序执行的第一步就是执行 main 方法中的逻辑。
因此,我们将 main 方法理解为任务驱动,即设置和发起任务的角色。
此外,在 WordCount 中,我们还可以复用 Reducer 的实现逻辑,并且指定一个 Combiner (组合器)。因此,在对键进行排序之后,每个 map 的输出都通过本地的 Combiner (根据作业配置与还原器相同)传递给本地的聚合过程,这样做可以减少后续 reduce 过程跨节点的数据传输量。
请在 /home/hadoop/WordCount.java 文件中将 main 方法的逻辑代码补充完整。相关的介绍将以注释形式给出。
// throws Exception 可以合理地抛出 MapReduce 任务中产生的异常,便于我们进行调试 public static void main(String[] args) throws Exception { // 程序的第一步是声明并初始化 Configuration 对象用于设置作业的相关运行参数 Configuration conf = new Configuration(); // 设置作业的配置参数和名称 Job job = Job.getInstance(conf, "word count"); // 将 WordCount 类作为运行的入口 job.setJarByClass(WordCount.class); // 通过 setMapperClass 方法告诉集群应当在 map 阶段执行哪些逻辑 job.setMapperClass(TokenizerMapper.class); // 通过 setCombinerClass 方法告诉集群应当在 combine 阶段执行哪些逻辑,此处复用了 Reducer 的逻辑,用于在本地进行部分结果的累加 // 这个步骤不是必须的 job.setCombinerClass(IntSumReducer.class); // 通过 setReducerClass 方法告诉集群应当在 combine 阶段执行哪些逻辑 job.setReducerClass(IntSumReducer.class); // 设置输出结果中键的数据类型 job.setOutputKeyClass(Text.class); // 设置输出结果中值的数据类型 job.setOutputValueClass(IntWritable.class); // 利用 main 函数的第 1 个输入参数获取输入数据的路径 FileInputFormat.addInputPath(job, new Path(args[0])); // 利用 main 函数的第 2 个输入参数获取输出数据的路径 FileOutputFormat.setOutputPath(job, new Path(args[1])); // job.waitForCompletion(true) 相当于开启执行任务的开关,执行到此处时一个 MapReduce 应用才会真正地开始计算 // 使用 System.exit 方法来告知程序运行的状态 System.exit(job.waitForCompletion(true) ? 0 : 1); }
MapReduce 作业的编译和执行
编译 Jar 包
完成所有代码的编写之后,我们需要将 WordCount.java 文件编译为 Jar 包。
在终端中输入以下命令来完成编译:
cd ~ # 导入生效 export HADOOP_CLASSPATH=${JAVA_HOME}/lib/tools.jar # 编译 hadoop com.sun.tools.javac.Main /home/hadoop/WordCount.java # 打包 jar cf wc.jar WordCount*.class
执行作业
Jar 包编译完成之后,就可以开始执行作业了。使用 hadoop jar 命令,并附带上 jar 包的路径、程序主类名称和输入输出文件的路径。
请在终端中输入以下命令来提交作业执行。
hadoop jar /home/hadoop/wc.jar WordCount /user/hadoop/protocols /user/hadoop/wordcount_output
观察作业进度
在 MapReduce 的工作过程中,我们可以在终端中的输出看到作业的进度:
如果看到 map 100% reduce 100% 的提示之后,则表明计算已经完成,在输出的提示中还包含了 Counter 、 MapReduce 框架以及 Shuffle 的一些统计信息,这些信息可以帮助我们分析作业的执行质量,也可以定位问题。
查看结果
最后,我们来查看计算的结果。
在终端中输入以下命令来查看计算结果:
hdfs dfs -cat /user/hadoop/wordcount_output/part-r-*
可以看到当前文件中出现最多的单词(字符)是 #:
作为改进,你可以在框架实现过程中通过正则表达式来过滤掉非单词的字符。
学习 MapReduce 方法
尽管前文较为详细地介绍了 MapReduce 的工作原理,并且以 WordCount 为例实现了一个 MapReduce 应用程序,但对初学者而言,如何将这样的编程思想应用在解决实际问题之中呢?
先不要忙着上手写自己的代码,学习一个编程框架最好的途径就是熟读文档和样例代码。
目前 Hadoop 在不同版本的官方文档(Documentation)中均提供了 MapReduce 框架的介绍,
cd ~ wget https://archive.apache.org/dist/hadoop/common/hadoop-2.6.1/hadoop-2.6.1-src.tar.gz
tar -zxvf hadoop-2.6.1-src.tar.gz
tree hadoop-2.6.1-src/hadoop-mapreduce-project/hadoop-mapreduce-examples/
这其中既有我们学习过的 WordCount 案例,也还有一些其他的示例程序。下面选择其中一些进行介绍:
- **AggregateWordCount:**基于聚合的 MapReduce 程序,用于对输入文件中的单词进行计数。
- **AggregateWordHistogram:**基于聚合的计算输入文件中单词的直方图的 MapReduce 程序。
- **BaileyBorweinPlouffe:**使用 Bailey-Borwein-Plouffe 算法来计算精确的圆周率数字的 MapReduce 程序。
- **Grep:**在输入中用于统计符合正则表达式匹配数量的 MapReduce 程序。
- **Join:**对已排序并处于相同分区的数据集进行连接的 MapReduce 程序。
- **MultiFileWordCount:**从多个文件中计算单词词频的 MapReduce 程序。
- **Pentomino:**模拟瓷砖铺设的 MapReduce 程序,可用来寻找解决方法。
- **Pi:**使用拟蒙特卡罗方法估计圆周率的 MapReduce 程序。
- **RandomTextWriter:**该程序可向每个节点写入 10GB 的随机文本数据。
- **RandomWriter:**该程序可向每个节点写入 10GB 的随机数据。
- **SecondarySort:**一个二级排序的示例。
- **Sort:**对数据进行排序的示例。
- **Sudoku:**求解数独问题的程序。
- **TeraGen:**为 Terasort 生成数据。
- **TeraSort:**运行 Terasort 测试。这是一个非常出名的测试,用于验证分布式计算框架对 1TB 数据排序的速度,你可以访问 Sort Benchmark 了解更多信息。
- **TeraValidate:**查看 Terasort 的结果。
- **WordMean:**用于计算输入文件中单词的平均长度。
- **WordMedian:**计算输入文件中单词的中位数长度的 MapReduce 程序。
- **WordStandardDeviation:**一个 MapReduce 程序,它计算输入文件中单词长度的标准偏差。
在以上示例代码文件中,每一个代码文件都详尽地注释了其工作原理。通过阅读计算框架或示例程序的源代码,将能够在短时间内迅速加深理解。你可以选择以上任意感兴趣的源代码进行阅读,此处不再详细展开。