union all 优化
利用hive对UNION ALL的优化的特性,hive对union all优化只局限于非嵌套查询。
示例:--3个JOB select * from (select ci,c2,c3 from t1 Group by c1,c2,c3 Union all Select c1,c2,c3 from t2 Group by c1,c2,c3 ) t3; --优化后的示例代码如下:--1个JOB select * from (select * from t1 Union all Select * from t2 ) t3 Group by c1,c2,c3;
不同表太多的union ALL,不推荐使用;
通常采用建临时分区表,将不同表的结果insert到不同的分区(可并行),最后再统一处理;
示例: INSERT overwrite TABLE lxw_test(flag = '1') SELECT sndaid,mobileFROM lxw_test1; INSERT overwrite TABLE lxw_test(flag = '2') SELECT sndaid,mobileFROM lxw_test2; INSERT overwrite TABLE lxw_test(flag = '3') SELECT sndaid,mobileFROM lxw_test3;
CBO优化
CBO,成本优化器,代价最小的执行计划就是最好的执行计划。传统的数据库,成本优化器做出最优化的执行计划是依据统计信息来计算的。Hive 的成本优化器也一样。join 的时候表的顺序的关系:前面的表都会被加载到内存中。后面的表进行磁盘扫描
Hive 自 0.14.0 开始,加入了一项 "Cost based Optimizer" 来对 HQL 执行计划进行优化,这个功能通过 "hive.cbo.enable" 来开启。在 Hive 1.1.0 之后,这个 feature 是默认开启的,它可以自动优化 HQL中多个 Join 的顺序,并选择合适的 Join 算法。
当你要做那种连结条件相同的表和表 join ,例如
select a.*, b.*, c.* from a join b on a.id = b.id join c on a.id = c.id;
如果你要使用这个功能,可以把下面的参数都打开即可
set hive.cbo.enable=true; set hive.compute.query.using.stats=true; set hive.stats.fetch.column.stats=true; set hive.stats.fetch.partition.stats=true;
合理设置 Map 及 Reduce 数
调整mapper数
mapper数量与输入文件的split数息息相关,在Hadoop源码org.apache.hadoop.mapreduce.lib.input.FileInputFormat类中可以看到split划分的具体逻辑。这里不贴代码,直接叙述mapper数是如何确定的。
可以直接通过参数mapred.map.tasks(默认值2)来设定mapper数的期望值,但它不一定会生效,下面会提到。
设输入文件的总大小为total_input_size。HDFS中,一个块的大小由参数dfs.block.size指定,默认值64MB或128MB。在默认情况下,mapper数就是:
default_mapper_num = total_input_size / dfs.block.size。 参数mapred.min.split.size(默认值1B)和mapred.max.split.size(默认值64MB)分别用来指定split的最小和最大大小。split大小和split数计算规则是: split_size = MAX(mapred.min.split.size, MIN(mapred.max.split.size, dfs.block.size)); split_num = total_input_size / split_size。 得出mapper数: mapper_num = MIN(split_num, MAX(default_num, mapred.map.tasks))。
可见,如果想减少mapper数,就适当调高mapred.min.split.size,split数就减少了。如果想增大mapper数,除了降低mapred.min.split.size之外,也可以调高mapred.map.tasks。
一般来讲,如果输入文件是少量大文件,就减少mapper数;如果输入文件是大量非小文件,就增大mapper数;至于大量小文件的情况,得参考下面“合并小文件”一节的方法处理。
参数设置:
set mapred.map.tasks=10; ## 默认值是2
调整reducer数
reducer数量的确定方法比mapper简单得多。使用参数mapred.reduce.tasks可以直接设定reducer数量,不像mapper一样是期望值。但如果不设这个参数的话,Hive就会自行推测,逻辑如下:
参数hive.exec.reducers.bytes.per.reducer用来设定每个reducer能够处理的最大数据量,默认值1G(1.2版本之前)或256M(1.2版本之后)。
参数hive.exec.reducers.max用来设定每个job的最大reducer数量,默认值999(1.2版本之前)或1009(1.2版本之后)。
得出reducer数:
reducer_num = MIN(total_input_size / reducers.bytes.per.reducer, reducers.max)。
reducer数量与输出文件的数量相关。如果reducer数太多,会产生大量小文件,对HDFS造成压力。如果reducer数太少,每个reducer要处理很多数据,容易拖慢运行时间或者造成OOM。
// 每个 reduceTask 处理的最大数据大小 参数1:hive.exec.reducers.bytes.per.reducer (默认256M) // reduceTask 的个数上限 参数2:hive.exec.reducers.max (默认为1009) 参数3:mapreduce.job.reduces (默认值为-1,表示没有设置,那么就按照以上两个参数进行设置) reduceTask_num = Math.min(参数2,总输入数据大小 / 参数1)
合并小文件
小文件的产生有三个地方,map输入,map输出,reduce输出,小文件过多也会影响hive的分析效率:
设置map输入的小文件合并
set mapred.max.split.size=256000000; //一个节点上split的至少的大小(这个值决定了多个DataNode上的文件是否需要合并) set mapred.min.split.size.per.node=100000000; //一个交换机下split的至少的大小(这个值决定了多个交换机上的文件是否需要合并) set mapred.min.split.size.per.rack=100000000; //执行Map前进行小文件合并 set hive.input.format=org.apache.hadoop.hive.ql.io.CombineHiveInputFormat;
设置map输出和reduce输出进行合并的相关参数:
//设置map端输出进行合并,默认为true set hive.merge.mapfiles = true //设置reduce端输出进行合并,默认为false set hive.merge.mapredfiles = true //设置合并文件的大小 set hive.merge.size.per.task = 256*1000*1000 //当输出文件的平均大小小于该值时,启动一个独立的MapReduce任务进行文件merge。 set hive.merge.smallfiles.avgsize=16000000
sql方式合并小文件
insert overwrite table test [partition(hour=...)] select * from tes
参数优化
set hive.optimize.countdistinct=true开启对count(distinct )的自动优化 set hive.auto.convert.join = true;开启自动mapjoin set hive.mapjoin.smalltable.filesize=26214400;大表小表的阈值设置(默认25M一下认为是小表) set hive.exec.parallel=true;打开任务并行执行 set hive.exec.parallel.thread.number=16;同一个sql允许最大并行度,默认值为8。默认情况下,Hive一次只会执行一个阶段。开启并行执行时会把一个sql语句中没有相互依赖的阶段并行去运行,这样可能使得整个job的执行时间缩短。提高集群资源利用率,不过这当然得是在系统资源比较空闲的时候才有优势,否则没资源,并行也起不来。 set hive.map.aggr=true;默认值是true,当选项设定为true时,开启map端部分聚合 set hive.groupby.skewindata = ture;默认值是false,当有数据倾斜的时候进行负载均衡,生成的查询计划有两个MapReduce任务,第一个MR Job中,Map的输出结果会随机分布到Reduce中,每个Reduce做部分聚合操作,并输出结果,这样处理的结果是相同的Group By Key有可能被分发到不同的Reduce中,从而达到负载均衡的目的;第二个MR Job再根据预处理的数据结果按照Group By Key分布到Reduce中(这个过程可以保证相同的Group By Key被分布到同一个Reduce中),最后完成最终的聚合操作 set hive.mapred.mode=strict;设置严格模式,默认值是nonstrict非严格模式。严格模式下会禁止以下3种类型不合理查询,即以下3种情况会报错 对于查询分区表,必须where加上分区限制条件 使用order by全局排序时,必须加上limit限制数据查询条数 限制了笛卡尔积查询 set hive.input.format=org.apache.hadoop.hive.ql.io.CombineHiveInputFormat;设置map端执行前合并小文件 set hive.exec.compress.output=true;设置hive的查询结果输出是否进行压缩 set mapreduce.output.fileoutputformat.compress=true;设置MapReduce Job的结果输出是否使用压缩 set hive.cbo.enable=false;关闭CBO优化,默认值true开启,可以自动优化HQL中多个JOIN的顺序,并选择合适的JOIN算法
数据倾斜优化
数据倾斜表现:任务进度长时间维持在99%(或100%),查看任务监控页面,发现只有少量(1个或几个)reduce子任务未完成。因为其处理的数据量和其他reduce差异过大。
单一reduce的记录数与平均记录数差异过大,通常可能达到3倍甚至更多。最长时长远大于平均时长。
数据倾斜常见原因:
数据倾斜解决方案:
(a)参数调节:hive.map.aggr = true 在map端部分聚合,hive.groupby.mapaggr.checkinterval = 100000在 Map 端进行聚合操作的条目数目 (b)参数调节:hive.groupby.skewindata=true 数据倾斜时负载均衡 (c)sql语句调节:join时选择key值分布较均匀的表作为驱动表,同时做好列裁剪和分区裁剪,以减少数据量 (d)sql语句调节:大小表join时,小表先进内存 (e)参数条件:开启mapSide join (f)sql语句调节:大表join大表时,把key值为空的key变成一个字符串加上随机数,把倾斜的数据分到不同的reduce上,由于null值关联不上,因此处理后不影响最终结果 (h)sql语句调节:大表join大表时,sort merge bucket join (i)sql语句调节:聚合计算依赖的 key 分布不均匀时就会发生数据倾斜 用两次 group by 代替 count distinct 不同指标的 count distinct 放到多段 SQL 中执行,执行后再 UNION 或 JOIN 合并 (j)参数条件:set hive.optimize.skewjoin = true; (k)参数条件:set hive.skewjoin.key = 250000000 (l) sql语句调节: group by维度过小:采用sum() ,group by的方式来替换count(distinct(字段名))完成计算。
常见调整参数:
set hive.exec.reducers.max=200; set mapred.reduce.tasks= 200;---增大Reduce个数 set hive.groupby.mapaggr.checkinterval=100000 ;--这个是group的键对应的记录条数超过这个值则会进行分拆,值根据具体数据量设置 set hive.groupby.skewindata=true; --如果是group by过程出现倾斜 应该设置为true set hive.skewjoin.key=100000; --这个是join的键对应的记录条数超过这个值则会进行分拆,值根据具体数据量设置 set hive.optimize.skewjoin=true;--如果是join 过程出现倾斜 应该设置为true set mapreduce.reduce.memory.mb=1024mb --调整reducer所需的执行内存 set hive.skewjoin.mapjoin.map.tasks=10000; set hive.skewjoin.mapjoin.min.split=33554432;
使用Hive元数据做监控
监控普通表存储的文件的平均大小
对于大的文件块可能导致数据在读取时产生数据倾斜,影响集群任务的运行效率。下面的代码是对大于两倍HDFS文件块大小的表。
--整体逻辑通过DBS找到对应库下面的表TBLS --再通过找到每个表对应的表属性,取得totalsize和numFiles两个属性,前者表示文件大小,后者表示文件数量 select tbl_name,avgfilesize'fileSize(MB)' from ( select tp.totalSize/(1024*1024)/numFiles avgfilesize, TBL NAME from DBS d /*DBS的主键DB_ID*/ inner join TBLS t on d.DB_ID=t.DB_ID left join( select TBL ID, /*每个表存储的文件个数*/ max(case PARAM_KEY when 'numFiles' then PARAM_VALUE else 0 end) numFiles, /*文件存储的大小*/ max(case PARAM KEY when 'totalSize' then PARAM_VALUE else 0 end) totalSize /*TABLE PARAMS 记录的表属性*/ from TABLE PARAMS GROUP BY TBL ID )tp on t.TBL ID=tp.TBL_ID where d.`NAME`='数据库名, and tp.numFiles is not null and tp.numFiles>0 )a where avgfilesize> hdfs的文件块大小*2
监控分区存储的文件平均大小
大于两倍HDFS文件块大小的分区,示例如下:
--先用DBS关联TBLS表,TBLS表关联PARTITIONS表 PARTITION 表关联 PARTITION PARAMS select tbl name,PART_NAME, avgfilesize'fileSize (MB) ' from ( select pp.totalSize/(1024*1024)/numFiles avgfilesize, TBL NAME, part.PART NAME from DBS d inner join TBLS t on d.DB ID=t.DB_ID inner join `PARTITIONS` part on t.TBL _ID=part.TBL_ID left join ( select PART_ID, /*每个表存储的文件个数*/ max(case PARAM_KEY when 'numFiles' then PARAM VALUE else O end) numFiles, /*文件存储的大小*/ max(case PARAM_KEY when 'totalSize' then PARAM_VALUE else O end) totalSize /*TABLE_PARAMS 记录的表属性*/ from PARTITION_PARAMS GROUP BY PART_ID )pp on part.PART_ID=pp.PART_ID where d.`NAME`='要监控的数据库名, and pp.numFiles is not null and pp.numFiles>0 ) a where avgfilesize> hdfs的文件块大小*2
监控大表不分区的表
对于大数据量的表,如果不进行分区,意味着程序在读取
相同的数据时需要遍历更多的文件块。下面是监控该示例的代码:
/*监控大表不分区的表*/ select t.TBL_NAME '表名',d.`NAME`'库名', totalSize/1024/1024/1024 '文件大小(GB)' from DBS d /*DBS的主键DB_ID*/ inner join TBLS t on d.DB_ID=t.DB_ID inner join ( select TBL ID, /*文件存储的大小*/ max(case PARAM_KEY when 'totalSize' then PARAM_VALUE else 0 end) totalSize /*TABLE_PARAMS 记录的表属性*/ from TABLEPARAMS GROUP BY TBL_ID )tp on t.TBL ID=tp.TBL ID left join( select distinct TBL ID from`PARTITIONS` ) part on t.TBL ID=part.TBL ID /*part.TBL_ID is null表示不存在分区*/ where d.`NAME`='需要监控的库名' and part.TBL ID is nu11 /*数据量大于30GB的表*/ and totalSize/1024/1024/1024>30
监控分区数据不均匀的表
分区不均匀的数据,可能意味着自己的分区列设计存在问题,或者某个分区的数据写入业务有调整,导致数据急速上升或者下跌,这时我们需要做特别的关注。监控的示例如下:
select TBL_NAME, max(totalSize), min(totalSize), avg(totalSize) from ( select pp.totalSize, TBL NAME, part.PART_NAME from DBS d inner join TBLS t on d.DB_ID=t.DB_ID inner join 'PARTITIONS' part on t.TBL_ID=part.TBL_ID inner join ( select PART_ID, /*文件存储的大小*/ max(case PARAM_KEY when 'totalSize' then PARAM_VALUE else 0 end)/1024/1024 totalSize /*TABLE_PARAMS 记录的表属性*/ from PARTITION_PARAMS GROUP BY PART_ID )pp on part.PART_ID=pp.PART_ID where d.`NAME`='default' and pp.totalSize is not null and pp.totalSize>0 ) a group by TBL_NAME having max(totalSize)>avg(totalSize)*5
并行执行与本地执行
- 并行执行Hive中互相没有依赖关系的job间是可以并行执行的,最典型的就是多个子查询union all。在集群资源相对充足的情况下,可以开启并行执行,即将参数
hive.exec.parallel
设为true。另外hive.exec.parallel.thread.number
可以设定并行执行的线程数,默认为8,一般都够用。 - 本地模式Hive也可以不将任务提交到集群进行运算,而是直接在一台节点上处理。因为消除了提交到集群的overhead,所以比较适合数据量很小,且逻辑不复杂的任务。 设置
hive.exec.mode.local.auto
为true可以开启本地模式。但任务的输入数据总量必须小于hive.exec.mode.local.auto.inputbytes.max
(默认值128MB),且mapper数必须小于hive.exec.mode.local.auto.tasks.max
(默认值4),reducer数必须为0或1,才会真正用本地模式执行。
严格模式
严格模式就是强制不允许用户执行3种有风险的HiveSQL语句,一旦执行会直接失败。这3种语句是:
- 查询分区表时不限定分区列的语句;
- 两表join产生了笛卡尔积的语句;
- 用order by来排序但没有指定limit的语句。
要开启严格模式,需要将参数hive.mapred.mode
设为strict。
JVM容器重用
在MR job中,默认是每执行一个task就启动一个JVM。如果task非常小而碎,那么JVM启动和关闭的耗时就会很长。可以通过调节参数mapred.job.reuse.jvm.num.tasks
来重用。例如将这个参数设成5,那么就代表同一个MR job中顺序执行的5个task可以重复使用一个JVM,减少启动和关闭的开销。但它对不同MR job中的task无效。