Hive的MAP数或者说MAPREDUCE的MAP数是由谁来决定的呢?inputsplit size,那么对于每一个inputsplit size是如何计算出来的,这是做MAP数调整的关键.
HADOOP给出了Inputformat接口用于描述输入数据的格式,其中一个关键的方法就是getSplits,对输入的数据进行分片.
Hive对InputFormat进行了封装:
而具体采用的实现是由参数hive.input.format来决定的,主要使用2中类型HiveInputFormat和CombineHiveInputFormat.
对于HiveInputFormat来说:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
|
public
InputSplit[] getSplits(JobConf job,
int
numSplits)
throws
IOException {
//扫描每一个分区
for
(Path dir : dirs) {
PartitionDesc part = getPartitionDescFromPath(pathToPartitionInfo, dir);
//获取分区的输入格式
Class inputFormatClass = part.getInputFileFormatClass();
InputFormat inputFormat = getInputFormatFromCache(inputFormatClass, job);
//按照相应格式的分片算法获取分片
//注意:这里的Inputformat只是old version API:org.apache.hadoop.mapred而不是org.apache.hadoop.mapreduce,因此不能采用新的API,否则在查询时会报异常:Input format must implement InputFormat.区别就是新的API的计算inputsplit size(Math.max(minSize, Math.min(maxSize, blockSize))和老的(Math.max(minSize, Math.min(goalSize, blockSize)))不一样;
InputSplit[] iss = inputFormat.getSplits(newjob, numSplits / dirs.length);
for
(InputSplit is : iss) {
//封装结果,返回
result.add(
new
HiveInputSplit(is, inputFormatClass.getName()));
}
}
return
result.toArray(
new
HiveInputSplit[result.size()]);
}
|
对于CombineHiveInputFormat来说的计算就比较复杂了:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
|
public
InputSplit[] getSplits(JobConf job,
int
numSplits)
throws
IOException {
//加载CombineFileInputFormatShim,这个类继承了org.apache.hadoop.mapred.lib.CombineFileInputFormat
CombineFileInputFormatShim combine = ShimLoader.getHadoopShims()
.getCombineFileInputFormat();
if
(combine ==
null
) {
//若为空则采用HiveInputFormat的方式,下同
return
super
.getSplits(job, numSplits);
}
Path[] paths = combine.getInputPathsShim(job);
for
(Path path : paths) {
//若是外部表,则按照HiveInputFormat方式分片
if
((tableDesc !=
null
) && tableDesc.isNonNative()) {
return
super
.getSplits(job, numSplits);
}
Class inputFormatClass = part.getInputFileFormatClass();
String inputFormatClassName = inputFormatClass.getName();
InputFormat inputFormat = getInputFormatFromCache(inputFormatClass, job);
if
(
this
.mrwork !=
null
&& !
this
.mrwork.getHadoopSupportsSplittable()) {
if
(inputFormat
instanceof
TextInputFormat) {
if
((
new
CompressionCodecFactory(job)).getCodec(path) !=
null
)
//在未开启hive.hadoop.supports.splittable.combineinputformat(MAPREDUCE-1597)参数情况下,对于TextInputFormat并且为压缩则采用HiveInputFormat分片算法
return
super
.getSplits(job, numSplits);
}
}
//对于连接式同上
if
(inputFormat
instanceof
SymlinkTextInputFormat) {
return
super
.getSplits(job, numSplits);
}
CombineFilter f =
null
;
boolean
done =
false
;
Path filterPath = path;
//由参数hive.mapper.cannot.span.multiple.partitions控制,默认false;如果没true,则对每一个partition创建一个pool,以下省略为true的处理;对于同一个表的同一个文件格式的split创建一个pool为combine做准备;
if
(!mrwork.isMapperCannotSpanPartns()) {
opList = HiveFileFormatUtils.doGetWorksFromPath(
pathToAliases, aliasToWork, filterPath);
f = poolMap.get(
new
CombinePathInputFormat(opList, inputFormatClassName));
}
if
(!done) {
if
(f ==
null
) {
f =
new
CombineFilter(filterPath);
combine.createPool(job, f);
}
else
{
f.addPath(filterPath);
}
}
}
if
(!mrwork.isMapperCannotSpanPartns()) {
//到这里才调用combine的分片算法,继承了org.apache.hadoop.mapred.lib.CombineFileInputFormat extends 新版本CombineFileInputformat
iss = Arrays.asList(combine.getSplits(job,
1
));
}
//对于sample查询特殊处理
if
(mrwork.getNameToSplitSample() !=
null
&& !mrwork.getNameToSplitSample().isEmpty()) {
iss = sampleSplits(iss);
}
//封装结果返回
for
(InputSplitShim is : iss) {
CombineHiveInputSplit csplit =
new
CombineHiveInputSplit(job, is);
result.add(csplit);
}
return
result.toArray(
new
CombineHiveInputSplit[result.size()]);
}
|
具体combine的getSplits算法如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
|
public
List<InputSplit> getSplits(JobContext job)
throws
IOException {
//决定切分的几个参数
if
(minSplitSizeNode !=
0
) {
minSizeNode = minSplitSizeNode;
}
else
{
minSizeNode = conf.getLong(SPLIT_MINSIZE_PERNODE,
0
);
}
if
(minSplitSizeRack !=
0
) {
minSizeRack = minSplitSizeRack;
}
else
{
minSizeRack = conf.getLong(SPLIT_MINSIZE_PERRACK,
0
);
}
if
(maxSplitSize !=
0
) {
maxSize = maxSplitSize;
}
else
{
maxSize= = conf.getLong(
"mapreduce.input.fileinputformat.split.maxsize"
,
0
);
}
for
(MultiPathFilter onepool : pools) {
ArrayList<Path> myPaths =
new
ArrayList<Path>();
// create splits for all files in this pool.
getMoreSplits(job, myPaths.toArray(
new
Path[myPaths.size()]),
maxSize, minSizeNode, minSizeRack, splits);
}
}
|
跳到getMoreSplits:主要是填充如下数据结构,
1
2
3
4
5
6
7
8
|
// all blocks for all the files in input set
OneFileInfo[] files;
// mapping from a rack name to the list of blocks it has
HashMap<String, List<OneBlockInfo>> rackToBlocks =
new
HashMap<String, List<OneBlockInfo>>();
// mapping from a block to the nodes on which it has replicas
HashMap<OneBlockInfo, String[]> blockToNodes =
new
HashMap<OneBlockInfo, String[]>();
// mapping from a node to the list of blocks that it contains
HashMap<String, List<OneBlockInfo>> nodeToBlocks =
new
HashMap<String, List<OneBlockInfo>>();
|
大概流程则是(这里blockInfo生成略过不表,可以参考MAPREDUCE-2046):
1.首先处理每个Datanode的blockInfo,先按照>=maxsplitsize来切分split,剩余的再按照blockinfo>=minSplitSizeNode切分,其余的等和rack的其余blockinfo进行合并
2.其次对每个Rack进行处理:先按照>=maxsplitsize来切分split,剩余的再按照blockinfo>=minSplitSizeRack切分,其余的等和overflow的其余blockinfo进行合并
3.对于overflow blockInfo直接根据maxsplitsize来进行切分.
其余影响MAP数的参数比较好理解了:
1.影响在MAPREDUCE后是否会启动MAP进行文件合并
hive.merge.mapfiles,hive.merge.mapredfiles,hive.merge.size.per.task(default=256 * 1000 * 1000),hive.merge.smallfiles.avgsize(default=16 * 1000 * 1000)
2.影响是否存在skew开启多MAP:
hive.groupby.skewindata=false:
当该参数有true时会生成2个MR:
第一个MR的分区键是grouping key+distinct key,通过hash分配到reduce进行第一次聚合操作
第二个MR的分区键则是grouping key进行第二次聚合;(2个MR的sort key都是grouping key+distinct key)
https://issues.apache.org/jira/browse/HIVE-5118
hive.optimize.skewjoin=false
hive.optimize.skewjoin.compiletime=false
hive.skewjoin.key=100000
hive.skewjoin.mapjoin.map.tasks=10000
hive.skewjoin.mapjoin.min.split=33554432
3.mapreduce参数,是否开启map speculative
4.bucket table.
对于MAP/REDUCE的性能分析放到下一篇再说吧
本文转自MIKE老毕 51CTO博客,原文链接:http://blog.51cto.com/boylook/1316432,如需转载请自行联系原作者