1、Mapper 类
* Maps input key/value pairs to a set of intermediate key/value pairs.
*
* <p>Maps are the individual tasks which transform input records into a
* intermediate records. The transformed intermediate records need not be of
* the same type as the input records. A given input pair may map to zero or
* many output pairs.</p>
*
* <p>The Hadoop Map-Reduce framework spawns one map task for each
* {@link InputSplit} generated by the {@link InputFormat} for the job.
* <code>Mapper</code> implementations can access the {@link Configuration} for
* the job via the {@link JobContext#getConfiguration()}.
*
* <p>The framework first calls
* {@link #setup(org.apache.hadoop.mapreduce.Mapper.Context)}, followed by
* {@link #map(Object, Object, org.apache.hadoop.mapreduce.Mapper.Context)}
* for each key/value pair in the <code>InputSplit</code>. Finally
* {@link #cleanup(org.apache.hadoop.mapreduce.Mapper.Context)} is called.</p>
*
* <p>All intermediate values associated with a given output key are
* subsequently grouped by the framework, and passed to a {@link Reducer} to
* determine the final output. Users can control the sorting and grouping by
* specifying two key {@link RawComparator} classes.</p>
*
* <p>The <code>Mapper</code> outputs are partitioned per
* <code>Reducer</code>. Users can control which keys (and hence records) go to
* which <code>Reducer</code> by implementing a custom {@link Partitioner}.
*
* <p>Users can optionally specify a <code>combiner</code>, via
* {@link Job#setCombinerClass(Class)}, to perform local aggregation of the
* intermediate outputs, which helps to cut down the amount of data transferred
* from the <code>Mapper</code> to the <code>Reducer</code>.
*
* <p>Applications can specify if and how the intermediate
* outputs are to be compressed and which {@link CompressionCodec}s are to be
* used via the <code>Configuration</code>.</p>
*
* <p>If the job has zero
* reduces then the output of the <code>Mapper</code> is directly written
* to the {@link OutputFormat} without sorting by keys.</p>
*
* <p>Example:</p>
* <p><blockquote><pre>
* public class TokenCounterMapper
* extends Mapper<Object, Text, Text, IntWritable>{
*
* private final static IntWritable one = new IntWritable(1);
* private Text word = new Text();
*
* public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
* StringTokenizer itr = new StringTokenizer(value.toString());
* while (itr.hasMoreTokens()) {
* word.set(itr.nextToken());
* context.write(word, one);
* }
* }
* }
* </pre></blockquote>
*
* <p>Applications may override the
* {@link #run(org.apache.hadoop.mapreduce.Mapper.Context)} method to exert
* greater control on map processing e.g. multi-threaded <code>Mapper</code>s
* etc.</p>
将输入键/值对映射到一组中间键/值对。
<p> map是将输入记录转换为a
中间记录。 转换后的中间记录不必是
与输入记录的类型相同。 给定的输入对可以映射为零或
</p> . txt / /输出>
*
* <p> Hadoop map - reduce框架为每个映射生成一个映射任务
* {@link InputFormat}为作业生成的{@link InputSplit}。
* <code>Mapper</code>实现可以访问{@link Configuration}
*该任务通过{@link JobContext#getConfiguration()}。
** mapper调用流程
* <p>框架首先调用
* {@link #setup(org.apache.hadoop.mapreduce.Mapper.Context)},然后是
* {@link #map(Object, Object, org.apache.hadoop.mapreduce.Mapper.Context)}
*为<code>InputSplit</code>. value >中的每个键/值对。 最后
* {@link #cleanup(org.apache.hadoop.mapreduce.Mapper.Context)}被调用
*
* <p>所有与给定输出键相关联的中间值是
*随后被框架分组,并传递给{@link Reducer}
*确定最终输出。 用户可以通过控制排序和分组
</p> . *指定两个键{@link RawComparator}类
** 1、Partioner分区
* <p> <code>Mapper</code> output are partitioned per . / <p> <code>Mapper</code> output
* <代码>减速器> < /代码。 用户可以控制去哪个键(以及记录)
<code>Reducer</code>通过实现一个自定义的{@link Partitioner}。
*** 2、数据预合并Combiner
* <p>用户可以选择指定一个<code>合成器</code>,通过
* {@link Job#setCombinerClass(Class)},执行局部聚合
*中间输出,有助于减少传输的数据量
从<code>Mapper</code>到<code>Reducer</code>. *
** 3、压缩Compression
* <p>应用程序可以指定是否以及如何使用中间体
*输出将被压缩,哪个{@link CompressionCodec}将被压缩
*通过<code>Configuration</code>.</p> . Configuration
Mapper核心调用
Reducer 类
* Reduces a set of intermediate values which share a key to a smaller set of
* values.
*
* <p><code>Reducer</code> implementations
* can access the {@link Configuration} for the job via the
* {@link JobContext#getConfiguration()} method.</p>
* <p><code>Reducer</code> has 3 primary phases:</p>
* <ol>
* <li>
*
* <b id="Shuffle">Shuffle</b>
*
* <p>The <code>Reducer</code> copies the sorted output from each
* {@link Mapper} using HTTP across the network.</p>
* </li>
*
* <li>
* <b id="Sort">Sort</b>
*
* <p>The framework merge sorts <code>Reducer</code> inputs by
* <code>key</code>s
* (since different <code>Mapper</code>s may have output the same key).</p>
*
* <p>The shuffle and sort phases occur simultaneously i.e. while outputs are
* being fetched they are merged.</p>
*
* <b id="SecondarySort">SecondarySort</b>
*
* <p>To achieve a secondary sort on the values returned by the value
* iterator, the application should extend the key with the secondary
* key and define a grouping comparator. The keys will be sorted using the
* entire key, but will be grouped using the grouping comparator to decide
* which keys and values are sent in the same call to reduce.The grouping
* comparator is specified via
* {@link Job#setGroupingComparatorClass(Class)}. The sort order is
* controlled by
* {@link Job#setSortComparatorClass(Class)}.</p>
*将一组共享密钥的中间值减少到更小的一组
*值。
*
* < p > <代码>减速器> < /代码的实现
. *可以访问任务的{@link配置}
* {@link JobContext#getConfiguration()}方法
<p><code>Reducer</code>有3个主要阶段:</p>
* < ol >
李* < >
*
改组* < b id = "洗牌" > < / b >
*
* <p> <code>Reducer</code>从每个
* {@link Mapper}使用HTTP跨网络
李* < / >
*
李* < >
* < b id = "排序" > < / b >排序
*
* <p>框架合并排序<代码>Reducer</代码>输入
* <代码>关键代码> < / s
*(因为不同的<code>Mapper</code>s可能输出相同的键)
*
* <p> shuffle和sort阶段同时发生,即当输出是
</p> . txt > </p> . txt
*
* < b id = " SecondarySort " > SecondarySort < / b >
*
* <p>对返回的值进行二级排序
*迭代器时,应用程序应该使用secondary扩展键
键并定义一个分组比较器。 索引键将被排序
*整个键,但将使用分组比较器分组决定
在同一个reduce调用中发送哪些键和值。 分组
*比较器通过
* {@link工作# setGroupingComparatorClass(类)}。 排序顺序是
*控制
* {@link工作# setSortComparatorClass(类)}。< / p >
Reducer核心调用
大致处理流程
提交任务执行流程
通过比较来确定ClientProtocol
以下截图为 submitJobInternal内容
1、上传jar到集群的临时目录
将这些jar,文件,achieve上传到上面的临时路径
我这里呢,没有jar,achieve所以这里是空的
2、根据job对文件进行逻辑分片
重点1:使用 哪种 InputFormat 进行数据读取
public List<InputSplit> getSplits(JobContext job) throws IOException {
StopWatch sw = new StopWatch().start();
** 确定minSize = 1 , maxSize=long最大值
long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job));
long maxSize = getMaxSplitSize(job);
// generate splits
List<InputSplit> splits = new ArrayList<InputSplit>();
List<FileStatus> files = listStatus(job);
。。。
** 遍历输入路径的所有文件
for (FileStatus file: files) {
if (ignoreDirs && file.isDirectory()) {
continue;
}
Path path = file.getPath();
long length = file.getLen();
if (length != 0) {
BlockLocation[] blkLocations;
if (file instanceof LocatedFileStatus) {
blkLocations = ((LocatedFileStatus) file).getBlockLocations();
} else {
FileSystem fs = path.getFileSystem(job.getConfiguration());
blkLocations = fs.getFileBlockLocations(file, 0, length);
}
** 判断文件是否可分割
if (isSplitable(job, path)) {
** 获取默认blockSize = 32M,集群默认128M,集群可配
long blockSize = file.getBlockSize();
** 通过blockSize,minSize,maxSize计算分片大小
long splitSize = computeSplitSize(blockSize, minSize, maxSize);
** 文件字节数
long bytesRemaining = length;
** 文件字节数/32M > 1.1 , 继续分片
while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {
** 确定分片,确定分片末尾索引
** 例如 40M-32M=8M, 获取32M的索引,拿出这一部分作为一个分片文件
int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
splits.add(makeSplit(path, length-bytesRemaining, splitSize,
blkLocations[blkIndex].getHosts(),
blkLocations[blkIndex].getCachedHosts()));
bytesRemaining -= splitSize;
}
if (bytesRemaining != 0) {
int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
splits.add(makeSplit(path, length-bytesRemaining, bytesRemaining,
blkLocations[blkIndex].getHosts(),
blkLocations[blkIndex].getCachedHosts()));
}
} else { // not splitable
if (LOG.isDebugEnabled()) {
// Log only if the file is big enough to be splitted
if (length > Math.min(file.getBlockSize(), minSize)) {
LOG.debug("File is not splittable so no parallelization "
+ "is possible: " + file.getPath());
}
}
splits.add(makeSplit(path, 0, length, blkLocations[0].getHosts(),
blkLocations[0].getCachedHosts()));
}
} else {
//Create empty hosts array for zero length files
splits.add(makeSplit(path, 0, length, new String[0]));
}
}
// Save the number of input files for metrics/loadgen
job.getConfiguration().setLong(NUM_INPUT_FILES, files.size());
sw.stop();
if (LOG.isDebugEnabled()) {
LOG.debug("Total # of splits generated by getSplits: " + splits.size()
+ ", TimeTaken: " + sw.now(TimeUnit.MILLISECONDS));
}
** 返回分片数量
return splits;
}
任务提交之后,会用一个YarnRunner或者LocalRunner 运行任务,调用map.run,让mapTask执行
而jobRunner不就是我们之前确定的ClientProtocal嘛嘛嘛嘛嘛嘛