控制Hive MAP个数详解

简介:

HiveMAP数或者说MAPREDUCEMAP数是由谁来决定的呢?inputsplit size,那么对于每一个inputsplit size是如何计算出来的,这是做MAP数调整的关键.

HADOOP给出了Inputformat接口用于描述输入数据的格式,其中一个关键的方法就是getSplits,对输入的数据进行分片.

HiveInputFormat进行了封装:

213505502.png

而具体采用的实现是由参数hive.input.format来决定的,主要使用2中类型HiveInputFormatCombineHiveInputFormat.

对于HiveInputFormat来说:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public  InputSplit[] getSplits(JobConf job,  int  numSplits)  throws  IOException {
     //扫描每一个分区
     for  (Path dir : dirs) {
       PartitionDesc part = getPartitionDescFromPath(pathToPartitionInfo, dir);
     //获取分区的输入格式
       Class inputFormatClass = part.getInputFileFormatClass();
       InputFormat inputFormat = getInputFormatFromCache(inputFormatClass, job);
     //按照相应格式的分片算法获取分片
     //注意:这里的Inputformat只是old version API:org.apache.hadoop.mapred而不是org.apache.hadoop.mapreduce,因此不能采用新的API,否则在查询时会报异常:Input format must implement InputFormat.区别就是新的API的计算inputsplit size(Math.max(minSize, Math.min(maxSize, blockSize))和老的(Math.max(minSize, Math.min(goalSize, blockSize)))不一样;
       InputSplit[] iss = inputFormat.getSplits(newjob, numSplits / dirs.length);
       for  (InputSplit is : iss) {
     //封装结果,返回
         result.add( new  HiveInputSplit(is, inputFormatClass.getName()));
       }
     }
     return  result.toArray( new  HiveInputSplit[result.size()]);
}


对于CombineHiveInputFormat来说的计算就比较复杂了:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
public  InputSplit[] getSplits(JobConf job,  int  numSplits)  throws  IOException {
     //加载CombineFileInputFormatShim,这个类继承了org.apache.hadoop.mapred.lib.CombineFileInputFormat
     CombineFileInputFormatShim combine = ShimLoader.getHadoopShims()
         .getCombineFileInputFormat();
if  (combine ==  null ) {
//若为空则采用HiveInputFormat的方式,下同
       return  super .getSplits(job, numSplits);
     }
     Path[] paths = combine.getInputPathsShim(job);
for  (Path path : paths) {
//若是外部表,则按照HiveInputFormat方式分片
       if  ((tableDesc !=  null ) && tableDesc.isNonNative()) {
         return  super .getSplits(job, numSplits);
       }
       Class inputFormatClass = part.getInputFileFormatClass();
       String inputFormatClassName = inputFormatClass.getName();
       InputFormat inputFormat = getInputFormatFromCache(inputFormatClass, job);
       if  ( this .mrwork !=  null  && ! this .mrwork.getHadoopSupportsSplittable()) {
         if  (inputFormat  instanceof  TextInputFormat) {
          if  (( new  CompressionCodecFactory(job)).getCodec(path) !=  null )
//在未开启hive.hadoop.supports.splittable.combineinputformat(MAPREDUCE-1597)参数情况下,对于TextInputFormat并且为压缩则采用HiveInputFormat分片算法
                     return  super .getSplits(job, numSplits);
         }
       }
     //对于连接式同上
       if  (inputFormat  instanceof  SymlinkTextInputFormat) {
         return  super .getSplits(job, numSplits);
       }
       CombineFilter f =  null ;
       boolean  done =  false ;
Path filterPath = path;
//由参数hive.mapper.cannot.span.multiple.partitions控制,默认false;如果没true,则对每一个partition创建一个pool,以下省略为true的处理;对于同一个表的同一个文件格式的split创建一个pool为combine做准备;
       if  (!mrwork.isMapperCannotSpanPartns()) {
         opList = HiveFileFormatUtils.doGetWorksFromPath(
                    pathToAliases, aliasToWork, filterPath);
         f = poolMap.get( new  CombinePathInputFormat(opList, inputFormatClassName));
       }
       if  (!done) {
         if  (f ==  null ) {
           f =  new  CombineFilter(filterPath);
           combine.createPool(job, f);
         else  {
           f.addPath(filterPath);
         }
       }
     }
if  (!mrwork.isMapperCannotSpanPartns()) {
//到这里才调用combine的分片算法,继承了org.apache.hadoop.mapred.lib.CombineFileInputFormat extends 新版本CombineFileInputformat
       iss = Arrays.asList(combine.getSplits(job,  1 ));
}
//对于sample查询特殊处理
     if  (mrwork.getNameToSplitSample() !=  null  && !mrwork.getNameToSplitSample().isEmpty()) {
       iss = sampleSplits(iss);
}
//封装结果返回
     for  (InputSplitShim is : iss) {
       CombineHiveInputSplit csplit =  new  CombineHiveInputSplit(job, is);
       result.add(csplit);
     }
     return  result.toArray( new  CombineHiveInputSplit[result.size()]);
   }

具体combinegetSplits算法如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public  List<InputSplit> getSplits(JobContext job)
     throws  IOException {
         //决定切分的几个参数
     if  (minSplitSizeNode !=  0 ) {
       minSizeNode = minSplitSizeNode;
     else  {
       minSizeNode = conf.getLong(SPLIT_MINSIZE_PERNODE,  0 );
     }
     if  (minSplitSizeRack !=  0 ) {
       minSizeRack = minSplitSizeRack;
     else  {
       minSizeRack = conf.getLong(SPLIT_MINSIZE_PERRACK,  0 );
     }
     if  (maxSplitSize !=  0 ) {
       maxSize = maxSplitSize;
     else  {
       maxSize= = conf.getLong( "mapreduce.input.fileinputformat.split.maxsize" 0 );
     }
         for  (MultiPathFilter onepool : pools) {
       ArrayList<Path> myPaths =  new  ArrayList<Path>();
       // create splits for all files in this pool.
       getMoreSplits(job, myPaths.toArray( new  Path[myPaths.size()]),
                     maxSize, minSizeNode, minSizeRack, splits);
     }
}

跳到getMoreSplits:主要是填充如下数据结构,

1
2
3
4
5
6
7
8
// all blocks for all the files in input set
     OneFileInfo[] files;
     // mapping from a rack name to the list of blocks it has
     HashMap<String, List<OneBlockInfo>> rackToBlocks =  new  HashMap<String, List<OneBlockInfo>>();
     // mapping from a block to the nodes on which it has replicas
     HashMap<OneBlockInfo, String[]> blockToNodes =  new  HashMap<OneBlockInfo, String[]>();
     // mapping from a node to the list of blocks that it contains
     HashMap<String, List<OneBlockInfo>> nodeToBlocks =  new  HashMap<String, List<OneBlockInfo>>();


大概流程则是(这里blockInfo生成略过不表,可以参考MAPREDUCE-2046):

1.首先处理每个DatanodeblockInfo,先按照>=maxsplitsize来切分split,剩余的再按照blockinfo>=minSplitSizeNode切分,其余的等和rack的其余blockinfo进行合并

2.其次对每个Rack进行处理:先按照>=maxsplitsize来切分split,剩余的再按照blockinfo>=minSplitSizeRack切分,其余的等和overflow的其余blockinfo进行合并

3.对于overflow blockInfo直接根据maxsplitsize来进行切分.

其余影响MAP数的参数比较好理解了:

1.影响在MAPREDUCE后是否会启动MAP进行文件合并

hive.merge.mapfiles,hive.merge.mapredfiles,hive.merge.size.per.task(default=256 * 1000 * 1000),hive.merge.smallfiles.avgsize(default=16 * 1000 * 1000)

2.影响是否存在skew开启多MAP:

hive.groupby.skewindata=false:

当该参数有true时会生成2个MR:

第一个MR的分区键是grouping key+distinct key,通过hash分配到reduce进行第一次聚合操作

第二个MR的分区键则是grouping key进行第二次聚合;(2个MR的sort key都是grouping key+distinct key)

https://issues.apache.org/jira/browse/HIVE-5118

hive.optimize.skewjoin=false

hive.optimize.skewjoin.compiletime=false

hive.skewjoin.key=100000

hive.skewjoin.mapjoin.map.tasks=10000

hive.skewjoin.mapjoin.min.split=33554432

3.mapreduce参数,是否开启map speculative

4.bucket table.

对于MAP/REDUCE的性能分析放到下一篇再说吧



本文转自MIKE老毕 51CTO博客,原文链接:http://blog.51cto.com/boylook/1316432,如需转载请自行联系原作者


相关文章
|
7月前
|
SQL 存储 Java
Hive 特殊的数据类型 Array、Map、Struct
在Hive中,`Array`、`Map`和`Struct`是三种特殊的数据类型。`Array`用于存储相同类型的列表,如`select array(1, &quot;1&quot;, 2, 3, 4, 5)`会产生一个整数数组。`Map`是键值对集合,键值类型需一致,如`select map(1, 2, 3, &quot;4&quot;)`会产生一个整数到整数的映射。`Struct`表示结构体,有固定数量和类型的字段,如`select struct(1, 2, 3, 4)`创建一个无名结构体。这些类型支持嵌套使用,允许更复杂的结构数据存储。例如,可以创建一个包含用户结构体的数组来存储多用户信息
559 0
|
存储 SQL HIVE
数据仓库的Hive的数据类型的复杂数据类型的map
在数据仓库领域,Hive是一个常用的工具。它提供了一种简单的方式来查询和分析大量数据。
173 0