Mapper 与 Reducer 解析-阿里云开发者社区

开发者社区> 大数据> 正文
登录阅读全文

Mapper 与 Reducer 解析

简介:

1 . 旧版 API 的 Mapper/Reducer 解析

Mapper/Reducer 中封装了应用程序的数据处理逻辑。为了简化接口,MapReduce 要求所有存储在底层分布式文件系统上的数据均要解释成 key/value 的形式,并交给Mapper/Reducer 中的 map/reduce 函数处理,产生另外一些 key/value。Mapper 与 Reducer 的类体系非常类似,我们以 Mapper 为例进行讲解。Mapper 的类图如图所示,包括初始化、Map操作和清理三部分。

(1)初始化
Mapper 继承了 JobConfigurable 接口。该接口中的 configure 方法允许通过 JobConf 参数对 Mapper 进行初始化。

(2)Map 操作
MapReduce 框架会通过 InputFormat 中 RecordReader 从 InputSplit 获取一个个 key/value 对, 并交给下面的 map() 函数处理:

void map(K1 key, V1 value, OutputCollector<K2, V2> output, Reporter reporter) throws IOException;

该函数的参数除了 key 和 value 之外, 还包括 OutputCollector 和 Reporter 两个类型的参数, 分别用于输出结果和修改 Counter 值。

(3)清理
Mapper 通过继承 Closeable 接口(它又继承了 Java IO 中的 Closeable 接口)获得 close方法,用户可通过实现该方法对 Mapper 进行清理。
MapReduce 提供了很多 Mapper/Reducer 实现,但大部分功能比较简单,具体如图所示。它们对应的功能分别是:

ChainMapper/ChainReducer:用于支持链式作业。

IdentityMapper/IdentityReducer:对于输入 key/value 不进行任何处理, 直接输出。

InvertMapper:交换 key/value 位置。

RegexMapper:正则表达式字符串匹配。

TokenMapper:将字符串分割成若干个 token(单词),可用作 WordCount 的 Mapper。

LongSumReducer:以 key 为组,对 long 类型的 value 求累加和。

对于一个 MapReduce 应用程序,不一定非要存在 Mapper。MapReduce 框架提供了比 Mapper 更通用的接口:MapRunnable,如图所示。用 户可以实现该接口以定制Mapper 的调用 方式或者自己实现 key/value 的处理逻辑,比如,Hadoop Pipes 自行实现了MapRunnable,直接将数据通过 Socket 发送给其他进程处理。提供该接口的另外一个好处是允许用户实现多线程 Mapper。

如图所示, MapReduce 提供了两个 MapRunnable 实现,分别是 MapRunner 和MultithreadedMapRunner,其中 MapRunner 为默认实现。 MultithreadedMapRunner 实现了一种多线程的 MapRunnable。 默认情况下,每个 Mapper 启动 10 个线程,通常用于非 CPU类型的作业以提供吞吐率。

2. 新版 API 的 Mapper/Reducer 解析

从图可知, 新 API 在旧 API 基础上发生了以下几个变化:

Mapper 由接口变为类,且不再继承 JobConfigurable 和 Closeable 两个接口,而是直接在类中添加了 setup 和 cleanup 两个方法进行初始化和清理工作。

将参数封装到 Context 对象中,这使得接口具有良好的扩展性。

去掉 MapRunnable 接口,在 Mapper 中添加 run 方法,以方便用户定制 map() 函数的调用方法,run 默认实现与旧版本中 MapRunner 的 run 实现一样。

新 API 中 Reducer 遍历 value 的迭代器类型变为 java.lang.Iterable,使得用户可以采用“ foreach” 形式遍历所有 value,如下所示:

void reduce(KEYIN key, Iterable<VALUEIN> values, Context context) throws IOException, InterruptedException {
    for(VALUEIN value: values)  { // 注意遍历方式
        context.write((KEYOUT) key, (VALUEOUT) value);
    }
}

Mapper类的完整代码如下:

package org.apache.hadoop.mapreduce;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.RawComparator;
import org.apache.hadoop.io.compress.CompressionCodec;
/** 
 * Maps input key/value pairs to a set of intermediate key/value pairs.  
 * 
 * <p>Maps are the individual tasks which transform input records into a 
 * intermediate records. The transformed intermediate records need not be of 
 * the same type as the input records. A given input pair may map to zero or 
 * many output pairs.</p> 
 * 
 * <p>The Hadoop Map-Reduce framework spawns one map task for each 
 * {@link InputSplit} generated by the {@link InputFormat} for the job.
 * <code>Mapper</code> implementations can access the {@link Configuration} for 
 * the job via the {@link JobContext#getConfiguration()}.
 * 
 * <p>The framework first calls 
 * {@link #setup(org.apache.hadoop.mapreduce.Mapper.Context)}, followed by
 * {@link #map(Object, Object, Context)} 
 * for each key/value pair in the <code>InputSplit</code>. Finally 
 * {@link #cleanup(Context)} is called.</p>
 * 
 * <p>All intermediate values associated with a given output key are 
 * subsequently grouped by the framework, and passed to a {@link Reducer} to  
 * determine the final output. Users can control the sorting and grouping by 
 * specifying two key {@link RawComparator} classes.</p>
 *
 * <p>The <code>Mapper</code> outputs are partitioned per 
 * <code>Reducer</code>. Users can control which keys (and hence records) go to 
 * which <code>Reducer</code> by implementing a custom {@link Partitioner}.
 * 
 * <p>Users can optionally specify a <code>combiner</code>, via 
 * {@link Job#setCombinerClass(Class)}, to perform local aggregation of the 
 * intermediate outputs, which helps to cut down the amount of data transferred 
 * from the <code>Mapper</code> to the <code>Reducer</code>.
 * 
 * <p>Applications can specify if and how the intermediate
 * outputs are to be compressed and which {@link CompressionCodec}s are to be
 * used via the <code>Configuration</code>.</p>
 *  
 * <p>If the job has zero
 * reduces then the output of the <code>Mapper</code> is directly written
 * to the {@link OutputFormat} without sorting by keys.</p>
 * 
 * <p>Example:</p>
 * <p><blockquote><pre>
 * public class TokenCounterMapper 
 *     extends Mapper<Object, Text, Text, IntWritable>{
 *    
 *   private final static IntWritable one = new IntWritable(1);
 *   private Text word = new Text();
 *   
 *   public void map(Object key, Text value, Context context) throws IOException {
 *     StringTokenizer itr = new StringTokenizer(value.toString());
 *     while (itr.hasMoreTokens()) {
 *       word.set(itr.nextToken());
 *       context.collect(word, one);
 *     }
 *   }
 * }
 * </pre></blockquote></p>
 *
 * <p>Applications may override the {@link #run(Context)} method to exert 
 * greater control on map processing e.g. multi-threaded <code>Mapper</code>s 
 * etc.</p>
 * 
 * @see InputFormat
 * @see JobContext
 * @see Partitioner  
 * @see Reducer
 */
public class Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> {

  public class Context 
    extends MapContext<KEYIN,VALUEIN,KEYOUT,VALUEOUT> {
    public Context(Configuration conf, TaskAttemptID taskid,
                   RecordReader<KEYIN,VALUEIN> reader,
                   RecordWriter<KEYOUT,VALUEOUT> writer,
                   OutputCommitter committer,
                   StatusReporter reporter,
                   InputSplit split) throws IOException, InterruptedException {
      super(conf, taskid, reader, writer, committer, reporter, split);
    }
  }
  /**
   * Called once at the beginning of the task.
   */
  protected void setup(Context context
                       ) throws IOException, InterruptedException {
    // NOTHING
  }
  /**
   * Called once for each key/value pair in the input split. Most applications
   * should override this, but the default is the identity function.
   */
  @SuppressWarnings("unchecked")
  protected void map(KEYIN key, VALUEIN value, 
                     Context context) throws IOException, InterruptedException {
    context.write((KEYOUT) key, (VALUEOUT) value);
  }
  /**
   * Called once at the end of the task.
   */
  protected void cleanup(Context context
                         ) throws IOException, InterruptedException {
    // NOTHING
  }
  /**
   * Expert users can override this method for more complete control over the
   * execution of the Mapper.
   * @param context
   * @throws IOException
   */
  public void run(Context context) throws IOException, InterruptedException {
    setup(context);
    while (context.nextKeyValue()) {
      map(context.getCurrentKey(), context.getCurrentValue(), context);
    }
    cleanup(context);
  }
}

从代码中可以看到,Mapper类中定义了一个新的类Context,继承自MapContext

我们来看看MapContext类的源代码:

package org.apache.hadoop.mapreduce;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
/**
 * The context that is given to the {@link Mapper}.
 * @param <KEYIN> the key input type to the Mapper
 * @param <VALUEIN> the value input type to the Mapper
 * @param <KEYOUT> the key output type from the Mapper
 * @param <VALUEOUT> the value output type from the Mapper
 */
public class MapContext<KEYIN,VALUEIN,KEYOUT,VALUEOUT> 
  extends TaskInputOutputContext<KEYIN,VALUEIN,KEYOUT,VALUEOUT> {
  private RecordReader<KEYIN,VALUEIN> reader;
  private InputSplit split;

  public MapContext(Configuration conf, TaskAttemptID taskid,
                    RecordReader<KEYIN,VALUEIN> reader,
                    RecordWriter<KEYOUT,VALUEOUT> writer,
                    OutputCommitter committer,
                    StatusReporter reporter,
                    InputSplit split) {
    super(conf, taskid, writer, committer, reporter);
    this.reader = reader;
    this.split = split;
  }
  /**
   * Get the input split for this map.
   */
  public InputSplit getInputSplit() {
    return split;
  }
  @Override
  public KEYIN getCurrentKey() throws IOException, InterruptedException {
    return reader.getCurrentKey();
  }
  @Override
  public VALUEIN getCurrentValue() throws IOException, InterruptedException {
    return reader.getCurrentValue();
  }
  @Override
  public boolean nextKeyValue() throws IOException, InterruptedException {
    return reader.nextKeyValue();
  }
}

MapContext类继承自TaskInputOutputContext,再看看TaskInputOutputContext类的代码:

package org.apache.hadoop.mapreduce;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.util.Progressable;
/**
 * A context object that allows input and output from the task. It is only
 * supplied to the {@link Mapper} or {@link Reducer}.
 * @param <KEYIN> the input key type for the task
 * @param <VALUEIN> the input value type for the task
 * @param <KEYOUT> the output key type for the task
 * @param <VALUEOUT> the output value type for the task
 */
public abstract class TaskInputOutputContext<KEYIN,VALUEIN,KEYOUT,VALUEOUT> 
       extends TaskAttemptContext implements Progressable {
  private RecordWriter<KEYOUT,VALUEOUT> output;
  private StatusReporter reporter;
  private OutputCommitter committer;

  public TaskInputOutputContext(Configuration conf, TaskAttemptID taskid,
                                RecordWriter<KEYOUT,VALUEOUT> output,
                                OutputCommitter committer,
                                StatusReporter reporter) {
    super(conf, taskid);
    this.output = output;
    this.reporter = reporter;
    this.committer = committer;
  }
  /**
   * Advance to the next key, value pair, returning null if at end.
   * @return the key object that was read into, or null if no more
   */
  public abstract 
  boolean nextKeyValue() throws IOException, InterruptedException;
  /**
   * Get the current key.
   * @return the current key object or null if there isn't one
   * @throws IOException
   * @throws InterruptedException
   */
  public abstract 
  KEYIN getCurrentKey() throws IOException, InterruptedException;
  /**
   * Get the current value.
   * @return the value object that was read into
   * @throws IOException
   * @throws InterruptedException
   */
  public abstract VALUEIN getCurrentValue() throws IOException, 
                                                   InterruptedException;
  /**
   * Generate an output key/value pair.
   */
  public void write(KEYOUT key, VALUEOUT value
                    ) throws IOException, InterruptedException {
    output.write(key, value);
  }
  public Counter getCounter(Enum<?> counterName) {
    return reporter.getCounter(counterName);
  }
  public Counter getCounter(String groupName, String counterName) {
    return reporter.getCounter(groupName, counterName);
  }
  @Override
  public void progress() {
    reporter.progress();
  }
  @Override
  public void setStatus(String status) {
    reporter.setStatus(status);
  }
  public OutputCommitter getOutputCommitter() {
    return committer;
  }
}

TaskInputOutputContext类继承自TaskAttemptContext,实现了Progressable接口,先看看Progressable接口的代码:

package org.apache.hadoop.util;
/**
 * A facility for reporting progress.
 * 
 * <p>Clients and/or applications can use the provided <code>Progressable</code>
 * to explicitly report progress to the Hadoop framework. This is especially
 * important for operations which take an insignificant amount of time since,
 * in-lieu of the reported progress, the framework has to assume that an error
 * has occured and time-out the operation.</p>
 */
public interface Progressable {
  /**
   * Report progress to the Hadoop framework.
   */
  public void progress();
}

TaskAttemptContext类的代码:

package org.apache.hadoop.mapreduce;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.util.Progressable;
/**
 * The context for task attempts.
 */
public class TaskAttemptContext extends JobContext implements Progressable {
  private final TaskAttemptID taskId;
  private String status = "";
  
  public TaskAttemptContext(Configuration conf, 
                            TaskAttemptID taskId) {
    super(conf, taskId.getJobID());
    this.taskId = taskId;
  }
  /**
   * Get the unique name for this task attempt.
   */
  public TaskAttemptID getTaskAttemptID() {
    return taskId;
  }

  /**
   * Set the current status of the task to the given string.
   */
  public void setStatus(String msg) throws IOException {
    status = msg;
  }
  /**
   * Get the last set status message.
   * @return the current status message
   */
  public String getStatus() {
    return status;
  }
  /**
   * Report progress. The subtypes actually do work in this method.
   */
  public void progress() { 
  }
}

TaskAttemptContext继承自类JobContext,最后来看看JobContext的源代码:

package org.apache.hadoop.mapreduce;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.RawComparator;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.mapreduce.lib.partition.HashPartitioner;
/**
 * A read-only view of the job that is provided to the tasks while they
 * are running.
 */
public class JobContext {
  // Put all of the attribute names in here so that Job and JobContext are
  // consistent.
  protected static final String INPUT_FORMAT_CLASS_ATTR = 
    "mapreduce.inputformat.class";
  protected static final String MAP_CLASS_ATTR = "mapreduce.map.class";
  protected static final String COMBINE_CLASS_ATTR = "mapreduce.combine.class";
  protected static final String REDUCE_CLASS_ATTR = "mapreduce.reduce.class";
  protected static final String OUTPUT_FORMAT_CLASS_ATTR = 
    "mapreduce.outputformat.class";
  protected static final String PARTITIONER_CLASS_ATTR = 
    "mapreduce.partitioner.class";

  protected final org.apache.hadoop.mapred.JobConf conf;
  private final JobID jobId;
  
  public JobContext(Configuration conf, JobID jobId) {
    this.conf = new org.apache.hadoop.mapred.JobConf(conf);
    this.jobId = jobId;
  }
  /**
   * Return the configuration for the job.
   * @return the shared configuration object
   */
  public Configuration getConfiguration() {
    return conf;
  }

  /**
   * Get the unique ID for the job.
   * @return the object with the job id
   */
  public JobID getJobID() {
    return jobId;
  }
  /**
   * Get configured the number of reduce tasks for this job. Defaults to 
   * <code>1</code>.
   * @return the number of reduce tasks for this job.
   */
  public int getNumReduceTasks() {
    return conf.getNumReduceTasks();
  }
  /**
   * Get the current working directory for the default file system.
   * 
   * @return the directory name.
   */
  public Path getWorkingDirectory() throws IOException {
    return conf.getWorkingDirectory();
  }
  /**
   * Get the key class for the job output data.
   * @return the key class for the job output data.
   */
  public Class<?> getOutputKeyClass() {
    return conf.getOutputKeyClass();
  }
  /**
   * Get the value class for job outputs.
   * @return the value class for job outputs.
   */
  public Class<?> getOutputValueClass() {
    return conf.getOutputValueClass();
  }
  /**
   * Get the key class for the map output data. If it is not set, use the
   * (final) output key class. This allows the map output key class to be
   * different than the final output key class.
   * @return the map output key class.
   */
  public Class<?> getMapOutputKeyClass() {
    return conf.getMapOutputKeyClass();
  }
  /**
   * Get the value class for the map output data. If it is not set, use the
   * (final) output value class This allows the map output value class to be
   * different than the final output value class.
   *  
   * @return the map output value class.
   */
  public Class<?> getMapOutputValueClass() {
    return conf.getMapOutputValueClass();
  }
  /**
   * Get the user-specified job name. This is only used to identify the 
   * job to the user.
   * 
   * @return the job's name, defaulting to "".
   */
  public String getJobName() {
    return conf.getJobName();
  }
  /**
   * Get the {@link InputFormat} class for the job.
   * 
   * @return the {@link InputFormat} class for the job.
   */
  @SuppressWarnings("unchecked")
  public Class<? extends InputFormat<?,?>> getInputFormatClass() 
     throws ClassNotFoundException {
    return (Class<? extends InputFormat<?,?>>) 
      conf.getClass(INPUT_FORMAT_CLASS_ATTR, TextInputFormat.class);
  }
  /**
   * Get the {@link Mapper} class for the job.
   * 
   * @return the {@link Mapper} class for the job.
   */
  @SuppressWarnings("unchecked")
  public Class<? extends Mapper<?,?,?,?>> getMapperClass() 
     throws ClassNotFoundException {
    return (Class<? extends Mapper<?,?,?,?>>) 
      conf.getClass(MAP_CLASS_ATTR, Mapper.class);
  }
  /**
   * Get the combiner class for the job.
   * 
   * @return the combiner class for the job.
   */
  @SuppressWarnings("unchecked")
  public Class<? extends Reducer<?,?,?,?>> getCombinerClass() 
     throws ClassNotFoundException {
    return (Class<? extends Reducer<?,?,?,?>>) 
      conf.getClass(COMBINE_CLASS_ATTR, null);
  }
  /**
   * Get the {@link Reducer} class for the job.
   * 
   * @return the {@link Reducer} class for the job.
   */
  @SuppressWarnings("unchecked")
  public Class<? extends Reducer<?,?,?,?>> getReducerClass() 
     throws ClassNotFoundException {
    return (Class<? extends Reducer<?,?,?,?>>) 
      conf.getClass(REDUCE_CLASS_ATTR, Reducer.class);
  }
  /**
   * Get the {@link OutputFormat} class for the job.
   * 
   * @return the {@link OutputFormat} class for the job.
   */
  @SuppressWarnings("unchecked")
  public Class<? extends OutputFormat<?,?>> getOutputFormatClass() 
     throws ClassNotFoundException {
    return (Class<? extends OutputFormat<?,?>>) 
      conf.getClass(OUTPUT_FORMAT_CLASS_ATTR, TextOutputFormat.class);
  }
  /**
   * Get the {@link Partitioner} class for the job.
   * 
   * @return the {@link Partitioner} class for the job.
   */
  @SuppressWarnings("unchecked")
  public Class<? extends Partitioner<?,?>> getPartitionerClass() 
     throws ClassNotFoundException {
    return (Class<? extends Partitioner<?,?>>) 
      conf.getClass(PARTITIONER_CLASS_ATTR, HashPartitioner.class);
  }
  /**
   * Get the {@link RawComparator} comparator used to compare keys.
   * 
   * @return the {@link RawComparator} comparator used to compare keys.
   */
  public RawComparator<?> getSortComparator() {
    return conf.getOutputKeyComparator();
  }
  /**
   * Get the pathname of the job's jar.
   * @return the pathname
   */
  public String getJar() {
    return conf.getJar();
  }
  /** 
   * Get the user defined {@link RawComparator} comparator for 
   * grouping keys of inputs to the reduce.
   * 
   * @return comparator set by the user for grouping values.
   * @see Job#setGroupingComparatorClass(Class) for details.  
   */
  public RawComparator<?> getGroupingComparator() {
    return conf.getOutputValueGroupingComparator();
  }
}

版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。

分享:
大数据
使用钉钉扫一扫加入圈子
+ 订阅

大数据计算实践乐园,近距离学习前沿技术

其他文章
最新文章
相关文章