本节书摘来异步社区《MapReduce设计模式》一书中的第1章,第1.4节,作者: 【美】Donald Miner , Adam Shook 译者: 徐钊 , 赵重庆 责编: 杨海玲,更多章节内容可以访问云栖社区“异步社区”公众号查看。
1.4 Hadoop示例:单词计数
在介绍完MapReduce的整个处理过程之后,让我们来看一个简单的示例:单词计数(Word Count)。“单词计数”程序是一个典型的MapReduce示例,因为它既简单,又很适合使用MapReduce高效地处理。很多人会抱怨说“单词计数”作为示例已经被用过太多次了,希望本书后面的内容能弥补这一点!
在这个特定的示例中,我们将对StackOverflow网站上用户提交的评论进行单词计数。网页中Text域的内容将被抽取出来并做一些预处理,然后我们再计算每个词出现的次数。这个数据集中的示例记录如下:
<row Id="8189677" PostId="6881722" Text="Have you looked at Hadoop?"
CreationDate="2011-07-30T07:29:33.343" UserId="831878" />
这条记录是StackOverflow的第8 189 677条评论,贴子数为6 881 722,用户数是831 878。PostId的数量和UserId数量作为外键可以和数据集中的其他部分数据进行关联。我们将在本书的第5章介绍如何实现这种关联。
我们分析的第一块代码是驱动程序(driver)部分。驱动程序的作用将MapReduce作业的所有组件组合起来然后提交执行。这些代码一般都是通用的并且被作为“通用模板”。在后面介绍的编程模式中你会看到,我们大部分的驱动程序都是相同的。
下面这些代码演变自Hadoop Core代码中的“Word Count”示例。
import java.io.IOException;
import java.util.StringTokenizer;
import java.util.Map;
import java.util.HashMap;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.commons.lang.StringEscapeUtils;
public class CommentWordCount {
public static class WordCountMapper
extends Mapper<Object, Text, Text, IntWritable> {
...
}
public static class IntSumReducer
extends Reducer<Text, IntWritable, Text, IntWritable> {
...
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
if (otherArgs.length != 2) {
System.err.println("Usage: CommentWordCount <in><out>";
System.exit(2);
}
job job = new Job(conf, "StackOverflow Comment Word Count");
job.setJarByClass(CommentWordCount.class);
job.setMapperClass(WordCountMapper.class);
job.setCombinerClass(IntSumReducer.class);
job.setReducerClass(IntSumReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
驱动程序的作用是协调整个任务。main函数中的前几行代码都是在解析命令行输入参数。然后,开始设置job对象的参数,包括计算过程中用到的类以及输入、输出路径。这就是驱动程序的全部!最重要的一点是要确保设置的类名和你编写的类名是一致的,并且输出的键、值类型和mapper定义的一致。
在后面介绍的不同模式中,上述代码里唯一会变化的是job.setCombinerClass方法。某些情况下,因为reducer的特性,combiner将不会被用到。在另外一些情况下,combiner类将不同于reducer类。不过在“单词计数”程序中,使用combiner会非常高效,并且启用起来非常简单。
接下来看mapper代码是如何解析和准备文本的。当标点符号和随机文本被清理掉后,文本字符串将被分割成一个单词列表。然后,产生的中间键是每个单词,其对应的值为“1”,这表示这个单词已出现过一次。即使一个单词在一条记录中出现了两次,输出的依然是键=该单词、值=1,不过会有两个这样的键/值对,这些键/值对将在后面处理。最终,所有这些键对应的值汇总求和就能得到每个单词出现的总次数。
public static class WordCountMapper
extends Mapper<Object, Text, Text, IntWritable> {
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();
public void map(Object key, Text value, Context context)
throws IOException, InterruptedException {
// Parse the input string into a nice map
Map<String, String> parsed = MRDPUtils.transformXmlToMap(value.toString());
// Grab the "Text" field, since that is what we are counting over
String txt = parsed.get("Text");
// .get will return null if the key is not there
if (txt == null) {
// skip this record
return;
}
// Unescape the HTML because the data is escaped.
txt = StringEscapeUtils.unescapeHtml(txt.toLowerCase());
// Remove some annoying punctuation
txt = txt.replaceAll("'", ""); // remove single quotes (e.g., can't)
txt = txt.replaceAll("[^a-zA-Z]", " "); // replace the rest with a space
// Tokenize the string by splitting it up on whitespace into
// something we can iterate over,
// then send the tokens away
StringTokenizer itr = new StringTokenizer(txt);
while (itr.hasMoreTokens()) {
word.set(itr.nextToken());
context.write(word, one);
}
}
}
第一个函数MRDPUtils.transformXmlToMap是一个辅助函数,它按照通用的方式逐条解析StackOverflow数据。我们后面的示例中会经常用到它。其基本原理是读入一条StackOverflow的XML(这是一种很容易预测的格式)文件中的记录,然后将XML的属性和其值保存在一个Map中。
接下来,请注意WordCountMapper类。这部分代码会比驱动程序稍微复杂些。我们会看到大部分工作是在mapper中完成的,因此,第一个需要重点关注的就是它的父类类型:
Mapper
mapper输入的键、值数据类型是在作业配置的FileInputFormat中定义的。默认实现是TextInputFormat。TextInputFormat的键是LongWritable对象,表示截至目前从文件中读入的字节数;其值是Text对象,表示从文本中读入的一行记录。如果使用不同的输入格式,那么很可能需要改变这些键/值的数据类型。
直到调用StringTokenizer之前,所做的事情都是清理字符串。首先,由于原始数据中的字符串是按照XML的格式存储的,因此需要先将字符串提取出来。然后,再剔除那些无用的标点符号。例如,Hadoop!和Hadoop?应该等同于Hadoop。最后,对于每一个token(即记号),将会输出该记号和数字1,即表明该记号出现了1次。框架将会接下来对这些键/值对进行混排和排序,然后交由reduce任务处理。
最后,我们来分析reducer代码,这部分代码相对简单。对于每个键分组都会调用reduce函数,在本例中键为单词。然后,reduce函数将迭代处理该键对应的所有值,即将值进行求和,在本例中值为单词的出现次数。得到的结果就是该单词出现的总次数。
public static class IntSumReducer
extends Reducer<Text, IntWritable, Text, IntWritable> {
private IntWritable result = new IntWritable();
public void reduce(Text key, Iterable<IntWritable> values,
Context context) throws IOException, InterruptedException {
int sum=0;
for (IntWritable val : values) {
sum += val.get();
}
result.set(sum);
context.write(key, result);
}
}
在本例的mapper中,输入、输出类型是通过父类模板定义的。和mapper一样,reducer定义中也包括以下四个类型:输入键、输入值、输出键和输出值。输入键和输入值的数据类型必须和mapper的输出键及输出值类型一致。输出键、输出值数据类型必须和作业配置的FileOutputFormat一致。在本例中,使用默认的TextOutputFormat格式,TextOutputFormat可以把任意两个Writable对象作为输出。
reduce函数的很多签名有别于map函数:reduce函数有一个Iterator(迭代器),它包含的是所有的值,而不是单个值。这意味着你可以通过迭代的方式处理一个键所对应的所有值,而不是一次只能处理一个。对于绝大部分MapReduce作业中的reducer来说,键通常都是非常重要的,但mapper中的输入键则不然。
所有传给context.write的数据最终都会写到一个输出文件中。每个reducer将创建一个文件,因此,如果想要把结果合并到一起,则还需要在最后增加一个合并它们的处理步骤。
现在我们已经掌握了一个简单的示例,下面让我们深入学习一些设计模式!