《MapReduce设计模式》一1.4 Hadoop示例:单词计数

简介:

本节书摘来异步社区《MapReduce设计模式》一书中的第1章,第1.4节,作者: 【美】Donald Miner , Adam Shook 译者: 徐钊 , 赵重庆 责编: 杨海玲,更多章节内容可以访问云栖社区“异步社区”公众号查看。

1.4 Hadoop示例:单词计数

在介绍完MapReduce的整个处理过程之后,让我们来看一个简单的示例:单词计数(Word Count)。“单词计数”程序是一个典型的MapReduce示例,因为它既简单,又很适合使用MapReduce高效地处理。很多人会抱怨说“单词计数”作为示例已经被用过太多次了,希望本书后面的内容能弥补这一点!

在这个特定的示例中,我们将对StackOverflow网站上用户提交的评论进行单词计数。网页中Text域的内容将被抽取出来并做一些预处理,然后我们再计算每个词出现的次数。这个数据集中的示例记录如下:

<row Id="8189677" PostId="6881722" Text="Have you looked at Hadoop?" 
CreationDate="2011-07-30T07:29:33.343" UserId="831878" />

这条记录是StackOverflow的第8 189 677条评论,贴子数为6 881 722,用户数是831 878。PostId的数量和UserId数量作为外键可以和数据集中的其他部分数据进行关联。我们将在本书的第5章介绍如何实现这种关联。

我们分析的第一块代码是驱动程序(driver)部分。驱动程序的作用将MapReduce作业的所有组件组合起来然后提交执行。这些代码一般都是通用的并且被作为“通用模板”。在后面介绍的编程模式中你会看到,我们大部分的驱动程序都是相同的。

下面这些代码演变自Hadoop Core代码中的“Word Count”示例。

import java.io.IOException;   
import java.util.StringTokenizer;   
import java.util.Map;  
import java.util.HashMap;  

import org.apache.hadoop.conf.Configuration;   
import org.apache.hadoop.fs.Path;  
import org.apache.hadoop.io.IntWritable;   
import org.apache.hadoop.io.Text;  
import org.apache.hadoop.mapreduce.Job;  
import org.apache.hadoop.mapreduce.Mapper;  
import org.apache.hadoop.mapreduce.Reducer;  
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;   
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;  
import org.apache.hadoop.util.GenericOptionsParser;   

import org.apache.commons.lang.StringEscapeUtils;   

public class CommentWordCount {  

  public static class WordCountMapper  
      extends Mapper<Object, Text, Text, IntWritable> {  
           ...   
  }  

  public static class IntSumReducer  
      extends Reducer<Text, IntWritable, Text, IntWritable> {  
            ...   
  }  
  public static void main(String[] args) throws Exception {   
    Configuration conf = new Configuration();  
    String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();   
    if (otherArgs.length != 2) {  
      System.err.println("Usage: CommentWordCount <in><out>";
      System.exit(2);  
    }  

    job job = new Job(conf, "StackOverflow Comment Word Count");   
    job.setJarByClass(CommentWordCount.class);   
    job.setMapperClass(WordCountMapper.class);   
    job.setCombinerClass(IntSumReducer.class);   
    job.setReducerClass(IntSumReducer.class);   
    job.setOutputKeyClass(Text.class);   
    job.setOutputValueClass(IntWritable.class);   
    FileInputFormat.addInputPath(job, new Path(otherArgs[0]));   
    FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));   
    System.exit(job.waitForCompletion(true) ? 0 : 1);  
  }   
}

驱动程序的作用是协调整个任务。main函数中的前几行代码都是在解析命令行输入参数。然后,开始设置job对象的参数,包括计算过程中用到的类以及输入、输出路径。这就是驱动程序的全部!最重要的一点是要确保设置的类名和你编写的类名是一致的,并且输出的键、值类型和mapper定义的一致。

在后面介绍的不同模式中,上述代码里唯一会变化的是job.setCombinerClass方法。某些情况下,因为reducer的特性,combiner将不会被用到。在另外一些情况下,combiner类将不同于reducer类。不过在“单词计数”程序中,使用combiner会非常高效,并且启用起来非常简单。

接下来看mapper代码是如何解析和准备文本的。当标点符号和随机文本被清理掉后,文本字符串将被分割成一个单词列表。然后,产生的中间键是每个单词,其对应的值为“1”,这表示这个单词已出现过一次。即使一个单词在一条记录中出现了两次,输出的依然是键=该单词、值=1,不过会有两个这样的键/值对,这些键/值对将在后面处理。最终,所有这些键对应的值汇总求和就能得到每个单词出现的总次数。

public static class WordCountMapper  
      extends Mapper<Object, Text, Text, IntWritable> {  
  private final static IntWritable one = new IntWritable(1);  
  private Text word = new Text();  

  public void map(Object key, Text value, Context context)   
              throws IOException, InterruptedException {  
    // Parse the input string into a nice map  
    Map<String, String> parsed = MRDPUtils.transformXmlToMap(value.toString());   

    // Grab the "Text" field, since that is what we are counting over  
    String txt = parsed.get("Text");  

    // .get will return null if the key is not there  
    if (txt == null) {   
       // skip this record  
          return;   
    }  

    // Unescape the HTML because the data is escaped.  
    txt = StringEscapeUtils.unescapeHtml(txt.toLowerCase());  

    // Remove some annoying punctuation  
    txt = txt.replaceAll("'", ""); // remove single quotes (e.g., can't)   
    txt = txt.replaceAll("[^a-zA-Z]", " "); // replace the rest with a space  

    // Tokenize the string by splitting it up on whitespace into   
    // something we can iterate over,  
    // then send the tokens away  
    StringTokenizer itr = new StringTokenizer(txt);  
    while (itr.hasMoreTokens()) {  
      word.set(itr.nextToken());   
      context.write(word, one);  
    }   
  }  
}

第一个函数MRDPUtils.transformXmlToMap是一个辅助函数,它按照通用的方式逐条解析StackOverflow数据。我们后面的示例中会经常用到它。其基本原理是读入一条StackOverflow的XML(这是一种很容易预测的格式)文件中的记录,然后将XML的属性和其值保存在一个Map中。

接下来,请注意WordCountMapper类。这部分代码会比驱动程序稍微复杂些。我们会看到大部分工作是在mapper中完成的,因此,第一个需要重点关注的就是它的父类类型:

Mapper

mapper输入的键、值数据类型是在作业配置的FileInputFormat中定义的。默认实现是TextInputFormat。TextInputFormat的键是LongWritable对象,表示截至目前从文件中读入的字节数;其值是Text对象,表示从文本中读入的一行记录。如果使用不同的输入格式,那么很可能需要改变这些键/值的数据类型。

直到调用StringTokenizer之前,所做的事情都是清理字符串。首先,由于原始数据中的字符串是按照XML的格式存储的,因此需要先将字符串提取出来。然后,再剔除那些无用的标点符号。例如,Hadoop!和Hadoop?应该等同于Hadoop。最后,对于每一个token(即记号),将会输出该记号和数字1,即表明该记号出现了1次。框架将会接下来对这些键/值对进行混排和排序,然后交由reduce任务处理。

最后,我们来分析reducer代码,这部分代码相对简单。对于每个键分组都会调用reduce函数,在本例中键为单词。然后,reduce函数将迭代处理该键对应的所有值,即将值进行求和,在本例中值为单词的出现次数。得到的结果就是该单词出现的总次数。

public static class IntSumReducer  
      extends Reducer<Text, IntWritable, Text, IntWritable> {  
  private IntWritable result = new IntWritable();  

  public void reduce(Text key, Iterable<IntWritable> values,   
          Context context) throws IOException, InterruptedException {  
    int sum=0;  
    for (IntWritable val : values) {  
       sum += val.get();  
    }  

     result.set(sum);  
     context.write(key, result);  
  }  
}

在本例的mapper中,输入、输出类型是通过父类模板定义的。和mapper一样,reducer定义中也包括以下四个类型:输入键、输入值、输出键和输出值。输入键和输入值的数据类型必须和mapper的输出键及输出值类型一致。输出键、输出值数据类型必须和作业配置的FileOutputFormat一致。在本例中,使用默认的TextOutputFormat格式,TextOutputFormat可以把任意两个Writable对象作为输出。

reduce函数的很多签名有别于map函数:reduce函数有一个Iterator(迭代器),它包含的是所有的值,而不是单个值。这意味着你可以通过迭代的方式处理一个键所对应的所有值,而不是一次只能处理一个。对于绝大部分MapReduce作业中的reducer来说,键通常都是非常重要的,但mapper中的输入键则不然。

所有传给context.write的数据最终都会写到一个输出文件中。每个reducer将创建一个文件,因此,如果想要把结果合并到一起,则还需要在最后增加一个合并它们的处理步骤。

现在我们已经掌握了一个简单的示例,下面让我们深入学习一些设计模式!

相关文章
|
2月前
|
分布式计算 资源调度 Hadoop
大数据-80 Spark 简要概述 系统架构 部署模式 与Hadoop MapReduce对比
大数据-80 Spark 简要概述 系统架构 部署模式 与Hadoop MapReduce对比
78 2
|
15天前
|
数据采集 分布式计算 Hadoop
使用Hadoop MapReduce进行大规模数据爬取
使用Hadoop MapReduce进行大规模数据爬取
|
2月前
|
分布式计算 资源调度 Hadoop
Hadoop-10-HDFS集群 Java实现MapReduce WordCount计算 Hadoop序列化 编写Mapper和Reducer和Driver 附带POM 详细代码 图文等内容
Hadoop-10-HDFS集群 Java实现MapReduce WordCount计算 Hadoop序列化 编写Mapper和Reducer和Driver 附带POM 详细代码 图文等内容
104 3
|
2月前
|
分布式计算 资源调度 数据可视化
Hadoop-06-Hadoop集群 历史服务器配置 超详细 执行任务记录 JobHistoryServer MapReduce执行记录 日志聚合结果可视化查看
Hadoop-06-Hadoop集群 历史服务器配置 超详细 执行任务记录 JobHistoryServer MapReduce执行记录 日志聚合结果可视化查看
50 1
|
2月前
|
分布式计算 资源调度 Hadoop
Hadoop-05-Hadoop集群 集群WordCount 超详细 真正的分布式计算 上传HDFS MapReduce计算 YRAN查看任务 上传计算下载查看
Hadoop-05-Hadoop集群 集群WordCount 超详细 真正的分布式计算 上传HDFS MapReduce计算 YRAN查看任务 上传计算下载查看
54 1
|
2月前
|
SQL 分布式计算 关系型数据库
Hadoop-24 Sqoop迁移 MySQL到Hive 与 Hive到MySQL SQL生成数据 HDFS集群 Sqoop import jdbc ETL MapReduce
Hadoop-24 Sqoop迁移 MySQL到Hive 与 Hive到MySQL SQL生成数据 HDFS集群 Sqoop import jdbc ETL MapReduce
103 0
|
2月前
|
SQL 分布式计算 关系型数据库
Hadoop-23 Sqoop 数据MySQL到HDFS(部分) SQL生成数据 HDFS集群 Sqoop import jdbc ETL MapReduce
Hadoop-23 Sqoop 数据MySQL到HDFS(部分) SQL生成数据 HDFS集群 Sqoop import jdbc ETL MapReduce
47 0
|
2月前
|
SQL 分布式计算 关系型数据库
Hadoop-22 Sqoop 数据MySQL到HDFS(全量) SQL生成数据 HDFS集群 Sqoop import jdbc ETL MapReduce
Hadoop-22 Sqoop 数据MySQL到HDFS(全量) SQL生成数据 HDFS集群 Sqoop import jdbc ETL MapReduce
57 0
|
4月前
|
设计模式 存储 Java
掌握Java设计模式的23种武器(全):深入解析与实战示例
掌握Java设计模式的23种武器(全):深入解析与实战示例
|
4月前
|
缓存 分布式计算 算法
优化Hadoop MapReduce性能的最佳实践
【8月更文第28天】Hadoop MapReduce是一个用于处理大规模数据集的软件框架,适用于分布式计算环境。虽然MapReduce框架本身具有很好的可扩展性和容错性,但在某些情况下,任务执行可能会因为各种原因导致性能瓶颈。本文将探讨如何通过调整配置参数和优化算法逻辑来提高MapReduce任务的效率。
638 0