通过前面的内容介绍相信大家对于MapReduce的操作有了一定的了解,通过客户端源码的分析也清楚了split是逻辑分区,记录了每个分区对应的是哪个文件,从什么位置开始到什么位置介绍,而且一个split对应一个Map Task任务,而MapTask具体是怎么读取文件的呢?本文来具体分析下。
MapTask读取数据的过程
我们要分析的就是如下的过程:
1.自定义Mapper
在自定义的Mapper中我们只需要重写map方法,那么每读取一行记录就会调用一次该方法,如下
2.查看Mapper源码
public class Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> { public abstract class Context implements MapContext<KEYIN,VALUEIN,KEYOUT,VALUEOUT> { } protected void setup(Context context ) throws IOException, InterruptedException { // NOTHING } @SuppressWarnings("unchecked") protected void map(KEYIN key, VALUEIN value, Context context) throws IOException, InterruptedException { context.write((KEYOUT) key, (VALUEOUT) value); } protected void cleanup(Context context ) throws IOException, InterruptedException { // NOTHING } public void run(Context context) throws IOException, InterruptedException { setup(context); try { while (context.nextKeyValue()) { map(context.getCurrentKey(), context.getCurrentValue(), context); } } finally { cleanup(context); } } }
通过源码我们能够看到里面的方法如下
3.context.nextKeyValue()
context的类型是Context实现了MapContext
public abstract class Context implements MapContext<KEYIN,VALUEIN,KEYOUT,VALUEOUT> { }
MapContext的实现类是MapContextImpl
而nextKeyValue()方法的调用
说明调用的是RecordReader中的方法,而具体是RecordReader中的哪个实现类呢?继续往下。
4.FileInputFormat
我们在启动类中设置了输入输出路径。进入FileInputFormat的子类TextFileInputFormat中查看
说明nextKeyValue()其实执行的是RecordReader中的nextKeyValue方法。
读取split文件中每行数据的方法。将每行的偏移量保存在key中,每行的具体数据保存在value中,分别通过getCurrentKey方法和getCurrentValue方法来获取。
public boolean nextKeyValue() throws IOException { if (key == null) { key = new LongWritable(); } key.set(pos); if (value == null) { value = new Text(); } int newSize = 0; // We always read one extra line, which lies outside the upper // split limit i.e. (end - 1) while (getFilePosition() <= end || in.needAdditionalRecordAfterSplit()) { if (pos == 0) { newSize = skipUtfByteOrderMark(); } else { newSize = in.readLine(value, maxLineLength, maxBytesToConsume(pos)); pos += newSize; } if ((newSize == 0) || (newSize < maxLineLength)) { break; } // line too long. try again LOG.info("Skipped line of size " + newSize + " at pos " + (pos - newSize)); } if (newSize == 0) { key = null; value = null; return false; } else { return true; } }
自此我们简单的分析了下map具体是怎么一行一行的读取数据的了。MapReduce的其他阶段我们后续再慢慢分析