本节书摘来华章计算机《Hadoop与大数据挖掘》一书中的第2章 ,第2.4.4节,张良均 樊 哲 位文超 刘名军 许国杰 周 龙 焦正升 著 更多章节内容可以访问云栖社区“华章计算机”公众号查看。
2.4.4 MapReduce组件分析与编程实践
MapReduce整个流程包括以下步骤:输入格式(InputFormat)、Mapper、Combiner、Partitioner、Reducer、输出格式(OutputFormat)。这里会针对流程中的Combiner、Part-itioner、输入/输出格式进行分析,同时,也会介绍相关的编程技巧,如自定义键值对。
1. Combiner分析
Combiner是什么呢?从字面意思理解,Combine即合并。其实,Combiner就是对Mapper的输出进行一定的合并,减少网络输出的组件。所以,其去掉与否不影响最终结果,影响的只是性能。
Combiner是Mapper端的汇总,然后才通过网络发向Reducer。如图2-40所示,经过Combiner后,键值对,被合并为,这样发往Reducer的记录就可以减少一条(当然,实际中肯定不是只减少一条记录),从而减少了网络IO。
对于多个输入数据块,每个数据块产生一个InputSplit,每个InputSplit对应一个map任务,每个map任务会对应0个到多个Combiner,最后再汇总到Reducer。在单词计数的例子中,使用Combiner的情形如图2-41所示。
需要注意的是,自定义Combiner也是需要集成Reducer的,同样也需要在reduce函数中写入处理逻辑。但是要注意,Combiner的输入键值对格式与输出键值对格式必须保持一致,也正是因为这个要求,很多情况下,采用自定义Combiner的方式在业务或算法处理上行不通。还有,在单词计数程序中,Combiner和Reducer使用的是同一个类代码,这是可能的,但是大多数情况下不能这样做,因为Reducer和Combiner的逻辑在很多情况下是不一样的。
2. Partitioner分析
Partitioner是来做什么的呢?是用来提高性能的吗?非也!Partitioner主要的目的是把键值对分给不同的Reducer。分给不同的Reducer?难道Reducer可以有多个吗?这是当然的,只需要在初始化Job实例的时候进行设置即可,例如设置代码为job.setNum-ReduceTasks(3),这样就可以设置3个Reducer了。
经过前面的分析可以知道,在Reducer的输入端,其键值对组是按照一个键对应一个值列表的。如果同一个键的不同值被发送到了不同的Reducer中,那么(注意,每个Reducer在一个子节点运行,不同Reducer之间不会干扰),经过不同的Reducer处理后,其实我们已经做不到针对一个键,输出一个值了,而是输出了两条记录。我们可以看下Hadoop系统默认的Partitioner实现,默认的Partitioner是HashPartitioner,其源码如代码清单2-30所示。
代码清单2-30 HashPartitioner源码
public class HashPartitioner<K, V> extends Partitioner<K, V> {
/** Use {@link Object#hashCode()} to partition. */
public int getPartition(K key, V value,
int numReduceTasks) {
return (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks;
}
}
在源码中,可以看到HashPartitiner中只有一个方法,就是getPartition(K key,V value, int numReducTasks)。3个参数分别为键、值、Reducer的个数,输出其实就是Reducer的ID。从代码的实现中可以看出,最终输出的Reducer ID只与键(key)的值有关,这样也就保证了同样的键会被发送到同一个Reducer中处理。
同一个键的记录会被发送到同一个Reducer中处理,一个Reducer可以处理不同的键的记录。
3.输入输出格式/键值类型
一般来说,HDFS一个文件对应多个文件块,每个文件块对应一个InputSplit,而一个InputSplit就对应一个Mapper任务,每个Mapper任务在一个节点上运行,其仅处理当前文件块的数据,但是我们编写Mapper的时候只是关心输入键值对,而不是关心输入文件块。那么,文件块怎么被处理成了键值对呢?这就是Hadoop的输入格式要做的工作了。
在InputFormat中定义了如何分割以及如何进行数据读取从而得到键值对的实现方式,它有一个子类FileInputFormat,如果要自定义输入格式,一般都会集成它的子类File-InputFormat,它里面帮我们实现了很多基本的操作,比如记录跨文件块的处理等。
图2-42所示是InputFormat的类继承结构。
然而,比较常用的则是如表2-7所示的几个实现方式。
同理,可以想象,输出格式(OutputFormat)也与输入格式相同,不过是输入格式的逆过程:把键值对写入HDFS中的文件块中。如图2-43所示是OutputFormat的类继承结构。
同样,比较常用的方式如表2-8所示。
在Hadoop中,无论是Mapper或Reducer处理的都是键值对记录,那么Hadoop中有哪些键值对类型呢?Hadoop中常用的键值对类型如图2-44所示。
从各个类的命名上其实也可以看出其代表什么类型,比如LongWritable,代表的就是Long的实现,而Text就是String的实现。在前面的单词计数中我们使用过IntWritable以及Text。
这里有两点需要注意:
1)值类型都需实现Writale接口;
2)键需要实现WritableComparable接口。
其实从图2-44中也可以看出,Hadoop已有的键值类型都是实现WritableComparable接口的,然而WritableComparable接口又是实现Writable接口的。所以,Hadoop已有的键值类型既可以作为键类型也可以作为值类型。作为键类型的肯定可以作为值类型,但作为值类型的却不能作为键类型。为什么键类型是实现WritableComparable接口呢?其实,如果你联想到了Shuffle/Sort过程的话,应该不难理解,因为MapReduce框架需要在这里对键进行排序。
4.动手实践:指定输入输出格式
这个实验主要是加深理解Hadoop的输入/输出格式,熟悉常用的SequenceFileInput-Format和SequenceFileOutputFormat。
实验步骤:
1)打开Eclipse,打开已经完成的WordCount程序;
2)设置输出格式为SequenceFileOutputFormat,重新打包,并提交到Linux上运行;
3)查看输出的文件;
4)再次修改WordCount程序,设置输入格式为SequenceFileInputFormat、输入路径为3的输出;设置输出格式为TextFileInputFormat;
5)查看输出结果;
6)针对上面的各个步骤以及输出进行分析,解释对应的输出结构。
思考:
1)第4步中查看的文件是否是乱码?如果是乱码,为什么是乱码?针对这样的数据,如何使用HDFS Java API进行读取?如果不是乱码,看到的是什么?
2)使用SequenceFileInputFormat或SequenceFileOutputFormat有什么优势与劣势?
5.自定义键值类型
Hadoop已经定义了很多键值类型,比如Text、IntWritable、LongWritable等,那为什么需要用到自定键值类型呢?答案其实很简单,不够用。在有些情况下,我们需要一些特殊的键值类型来满足我们的业务需求,这种时候就需要自定义键值类型了。前面已经提到,自定义键需要实现WritableComparable接口,自定义值需要实现Writable接口,那么实现了接口后,还需要做哪些操作呢?
自定义值类型可参考代码清单2-31进行分析。
**代码清单2-31 自定义Hadoop 值类型
public class MyWritable implements Writable {
private int counter;
private long timestamp;
@Override
public void write(DataOutput out) throws IOException {
out.writeInt(counter);
out.writeLong(timestamp);
}
@Override
public void readFields(DataInput in) throws IOException {
counter = in.readInt();
timestamp=in.readLong();
}**
}
在代码清单2-31中,首先实现了Writable接口,接着定义了两个变量。这两个变量其实是与业务相关的(比如,这里定义了一个counter,一个timestamp)。实现了Writable接口后,需要覆写两个方法(write和readFields),这里需要注意写入和读取的顺序是很重要的,比如这里先把counter写入out输出流,再把timestamp写入out输出流。那么,在读取的时候就需要先读取counter,再读取timestamp(如果两个变量都是int型,那么就更加需要注意区分)。
自定义键类型可参考代码清单2-32进行分析。
**代码清单2-32 自定义Hadoop 键类型
public class MyWritableComparable implements WritableComparable {
private int counter;
private long timestamp;
@Override
public void write(DataOutput out) throws IOException {
out.writeInt(counter);
out.writeLong(timestamp);
}
@Override
public void readFields(DataInput in) throws IOException {
counter = in.readInt();
timestamp= in.readLong();
}
@Override
public int compareTo(MyWritableComparable other) {
if(this.counter == other.counter){
return (int)(this.timestamp - other.timestamp);
}
return this.counter-other.counter;
}
}**
从代码清单2-32中可以看出,自定义键类型其实就是比自定义值类型多了一个比较方法而已,其他都是一样的。
6.动手实践:自定义键值类型
针对source/hadoop/keyvalue.data数据求解每行数据的个数以及平均值,该数据格式如表2-9所示。
1)编写Driver程序,main函数接收两个参数和,设置输入格式为KeyValueInputFormat;
2)编写Mapper程序,map函数针对每个value值,使用‘t’进行分隔;接着,对分隔后的数据进行求和以及个数统计(注意将字符串转换为数值),输出平均值和个数,Mapper输出键值对类型为;
3)编写自定义value类型MyValue,定义两个字段,一个是average,一个是num,用于存储平均值和个数;重写toString方法;
4)编写Reducer程序,直接输出即可;
5)对编写的程序进行打包averagejob.jar;
6)上传source/hadoop/keyvalue.data到HDFS,上传averagejob.jar到Linux;
7)使用命令hadoop jar averagejob.jar进行调用;
8)查看输出结果。
思考:
1)Reducer类是否必需?如果不需要,则如何修改?如果去掉reducer,输出结果会有什么不一样?
2)如果想让程序可以直接在Eclipse中运行,应该如何修改程序?