优化的根本思想
- 尽早尽量过滤数据,减少每个阶段的数据量
- 减少job数
- 解决数据倾斜问题
- 常见优化整理:
- 列裁剪和分区裁剪
- 谓词下推(PPD)
- 合理选择排序
- group by代替distinct
- job优化
- 表join的优化
- 合理选择文件存储格式和压缩方式
- union all优化
- CBO优化
- 合理设置Map和Reduce的个数
- 解决小文件过多问题
- 参数调数
- 解决数据倾斜问题
- 并行执行与本地模式
- 严格模式
- JVM重用
列裁剪与分区裁剪
- 这其实就是查询时只读取需要的列,分区裁剪就是只读取需要的分区。所以尽量不要 select * ,还有就是要指定分区
- Hive 在读数据的时候,可以只读取查询中所需要用到的列,而忽略其他的列。这样做可以节省读取开销:中间表存储开销和数据整合开销。
set hive.optimize.cp = true; ## 列裁剪,取数只取查询中需要用到的列,默认是true set hive.optimize.pruner=true; ## 分区裁剪 默认是true
在 HiveQL 解析阶段对应的则是 ColumnPruner 逻辑优化器
谓词下推
指的是在不影响数据结果的情况下,将过滤表达式尽可能移动至靠近数据源的位置,以使真正执行时能直接跳过无关的数据,这样在map执行过滤条件,可以减少map端数据输出,起到了数据收敛的作用,降低了数据在集群上传输的量,节约了集群的资源,也提升了任务的性能。
一句话讲:将SQL语句中的where谓词逻辑从reduce端提前到map端执行,减少下游处理的数据量
set hive.optimize.ppd=true; ## 默认是true # 谓词不下推示例 select a.*, b.* from a join b on a.id = b.id where b.age > 20; # 谓词下推示例 // 这条就预先把 b 表的满足 age>20 的条件先筛选出来为一个c表再进行聚合 // 这是一种手动谓词下推 select a.*, c.* from a join (select * from b where age > 20) c on a.id = c.id;
选择合理的排序
- order by
全局排序,只走一个reducer
,当表数据量较大时容易计算不出来,性能不佳慎用,在严格模式下需要加limit - sort by
局部排序,即保证单个reduce内结果有序,但没有全局排序的能力。
把map端随机分发给reduce端执行,如果是要实现全局排序且走多个reducer的优化需求时,可以在外层嵌套一层,例如:select * from (select * from 表名 sort by 字段名 limit N) order by 字段名 limit N
,这样就有2个Job,一个是内层的局部排序,一个是外层的归并全局排序 - distribute by
按照指定的字段把数据划分输出到不同的reducer中,是控制数据如何从map端输出到reduce端
,hive会根据distribute by后面的字段和对应reducer的个数进行hash分发 - cluster by
拥有distrubute by的能力,同时也拥有sort by的能力,所以可以理解cluster by是 distrubute by+sort by
以下举个排序方式优化案例,取用户信息表(10亿数据量)中年龄排前100的用户信息:
以下案例实现也体现了一个大数据思想,分而治之,大job拆分小job。
-- 原脚本 select * from tmp.user_info_table where dt = '2022-07-04' order by age -- 全局排序,只走一个reduce limit 100 ; -- 优化脚本 set mapred.reduce.tasks=50; -- 设置reduce个数为50 select * from tmp.user_info_table where dt = '2022-07-04' distribute by (case when age<20 then 0 when age >=20 and age <= 40 then 1 else 2 end ) -- distribute by主要是为了控制map端输出的数据在reduce端中是如何划分的,防止map端数据随机分配到reduce。这里字段做case when判断是因为用户年龄的零散值会导致分布不均匀,起太多reduce本身也耗时浪费资源 sort by age -- 起多个reduce排序,保证单个reduce结果有序 limit 100 -- 取前100,因为是按照年龄局部排序过,所以前100个也一定是年龄最小的 ;
如果使用sort by,那么还是会视情况启动多个reducer进行排序,并且保证每个reducer内局部有序。为了控制map端数据分配到reducer的key,往往还要配合distribute by一同使用。如果不加distribute by的话,map端数据就会随机分配到reducer。
count(distinct ) 和 group by
count(distinct)逻辑只会有很少的reducer来处理。这时可以用count(列名)+group by来改写。
但是这样写会启动两个MR job(单纯distinct只会启动一个),所以要确保数据量大到启动job的overhead远小于计算耗时,才考虑这种方法。
当数据集很小或者key的倾斜比较明显时,group by还可能会比distinct慢。
那么如何用group by方式同时统计多个列?下面是解决方法:group by +union all
select t.a,sum(t.b),count(t.c),count(t.d) from ( select a,b,null c,null d from some_table union all select a,0 b,c,null d from some_table group by a,c union all select a,0 b,null c,d from some_table group by a,d ) t;
group by配置调整
# map端预聚合
group by时,如果先起一个combiner在map端做部分预聚合,可以有效减少shuffle数据量。预聚合的配置项是hive.map.aggr,默认值true,对应的优化器为GroupByOptimizer,简单方便。
通过hive.groupby.mapaggr.checkinterval参数也可以设置map端预聚合的行数阈值,超过该值就会分拆job,默认值100000。
# 倾斜均衡配置项
group by时如果某些key对应的数据量过大,就会发生数据倾斜。Hive自带了一个均衡数据倾斜的配置项hive.groupby.skewindata,默认值false。
其实现方法是在group by时启动两个MR job。第一个job会将map端数据随机输入reducer,每个reducer做部分聚合,相同的key就会分布在不同的reducer中。第二个job再将前面预处理过的数据按key聚合并输出结果,这样就起到了均衡的效果。
但是,配置项毕竟是死的,单纯靠它有时不能根本上解决问题,因此还是建议自行了解数据倾斜的细节,并优化查询语句。
job优化
# 减少job数
不论是外关联outer join还是内关联inner join,如果Join的key相同,不管有多少个表,都会合并为一个MapReduce任务。
示例1:1个JOB SELECT a.val, b.val, c.val FROM a JOIN b ON (a.key= b.key1) JOIN c ON (c.key= b.key1 ) 示例2:2个JOB SELECT a.val, b.val, c.val FROM a JOIN b ON (a.key= b.key1) JOIN c ON (c.key= b.key2)
# JOB输入输出优化
善用muti-insert、union all,不同表的union all相当于multiple inputs,同一个表的union all,相当map一次输出多条。
示例: insert overwrite table tmp1 select ... from a where 条件1; insert overwrite table tmp2 select ... from a where 条件2; --优化后的示例代码如下: from a insert overwrite table tmp1 select ... where 条件1 insert overwrite table tmp2 select ... where 条件2;
表join优化
小表大表 Join
build table(小表)前置 小表进内存。
将 key 相对分散,并且数据量小的表放在 join 的左边,可以使用 map join 让小的维度表 先进内存。在 map 端完成 join。 实际测试发现:新版的 hive 已经对小表 JOIN 大表和大表 JOIN 小表进行了优化。小表放 在左边和右边已经没有区别
多表join
多表joins时根据相同的key关联 会将多个join合并为一个MR job来处理
利用map join特性
设置自动选择 Mapjoin
set hive.auto.convert.join = true; 默认为 true
大表小表的阈值设置(默认 25M 以下认为是小表)
set hive.mapjoin.smalltable.filesize = 25000000;
map join工作原理
大表join 大表(sort merge bucket join)
分桶其实就是把大表化成了“小表”,然后 Map-Side Join 解决之,这是典型的分而治之的思想。
思想:首先进行排序,继而合并,然后放到所对应的bucket中去,bucket是hive中和分区表类似的技术,就是按照key进行hash,相同的hash值都放到相同的buck中去。在进行两个表联合的时候。我们首先进行分桶,在join会大幅度的对性能进行优化。也就是说,在进行联合的时候,是table1中的一小部分和table1中的一小部分进行联合,table联合都是等值连接,相同的key都放到了同一个bucket中去了,那么在联合的时候就会大幅度的减小无关项的扫描。
限制条件:
针对参与join的这两张做相同的hash散列,每个桶里面的数据还要排序 2、这两张表的分桶个数要成倍数。
具体操作:
1) 首先设置如下: set hive.auto.convert.sortmerge.join=true; set hive.optimize.bucketmapjoin = true;//正常的情况下,应该是启动smbjoin的但是这里的数据量太小啦,还是启动了mapjoin set hive.optimize.bucketmapjoin.sortedmerge = true; set hive.auto.convert.sortmerge.join.noconditionaltask=true; 2) 小表的bucket数=大表bucket数 3) Bucket 列 == Join 列 == sort 列 ## 当用户执行bucket map join的时候,发现不能执行时,禁止查询 set hive.enforce.sortmergebucketmapjoin=false; ## 如果join的表通过sort merge join的条件,join是否会自动转换为sort merge join set hive.auto.convert.sortmerge.join=true; ## 当两个分桶表 join 时,如果 join on的是分桶字段,小表的分桶数是大表的倍数时,可以启用mapjoin 来提高效率。 # bucket map join优化,默认值是 false set hive.optimize.bucketmapjoin=false; ## bucket map join 优化,默认值是 false set hive.optimize.bucketmapjoin.sortedmerge=false;
注意:
hive并不检查两个join的表是否已经做好bucket且sorted,需要用户自己去保证join的表,否则可能数据不正确。有两个办法
1)hive.enforce.sorting 设置为true。 2)手动生成符合条件的数据,通过在sql中用distributed c1 sort by c1 或者 cluster by c1 表创建时必须是CLUSTERED且SORTED,如下 create table test_smb_2(mid string,age_id string) CLUSTERED BY(mid) SORTED BY(mid) INTO 500 BUCKETS;
大表join 大表(空key过滤)
有时 join 超时是因为某些 key 对应的数据太多,而相同 key 对应的数据都会发送到相同
的 reducer 上,从而导致内存不够。此时我们应该仔细分析这些异常的 key,很多情况下,
这些 key 对应的数据是异常数据,我们需要在 SQL 语句中进行过滤。例如 key 对应的字段为
空,操作如下
insert overwrite table jointable select n.* from (select * from nullidtable where id is not null) n left join bigtable o on n.id = o.id;
大表join 大表(空key转换)
有时虽然某个 key 为空对应的数据很多,但是相应的数据不是异常数据,必须要包含在
join 的结果中,此时我们可以表 a 中 key 为空的字段赋一个随机的值,使得数据随机均匀地
分不到不同的 reducer 上。例如
- 做空key转换优化时的hql,利用case when判断加随机数 select a.id from a.left join b on case when a.id is null then concat('hive'+rand()) else a.id end = b.id
不同数据类型
这种情况不太常见,主要出现在相同业务含义的列发生过逻辑上的变化时。
举个例子,假如我们有一旧一新两张日历记录表,旧表的记录类型字段是(event_type int),新表的是(event_type string)。为了兼容旧版记录,新表的event_type也会以字符串形式存储旧版的值,比如'17'。当这两张表join时,经常要耗费很长时间。其原因就是如果不转换类型,计算key的hash值时默认是以int型做的,这就导致所有“真正的”string型key都分配到一个reducer上。所以要注意类型转换:
select a.uid,a.event_type,b.record_data from calendar_record_log a left outer join ( select uid,event_type from calendar_record_log_2 where pt_date = 20190228 ) b on a.uid = b.uid and b.event_type = cast(a.event_type as string) where a.pt_date = 20190228;
文件存储格式
hive主要有textfile、sequencefile、orc、parquet 这四种存储格式,其中sequencefile很少使用,常见的主要就是orc和parquet这两种,往往也搭配着压缩方式合理使用。
建表声明语句是:stored as textfile/orc/parquet
01Textfile
行式存储,这是hive表的默认存储格式,默认不做数据压缩,磁盘开销大,数据解析开销大,数据不支持分片(即代表着会带来无法对数据进行并行操作)
02Orc
行列式存储,将数据按行分块,每个块按列存储,其中每个块都存储着一个索引,支持none和zlib和snappy这3种压缩方式,默认采用zlib压缩方式,不支持切片,orc存储格式能提高hive表的读取写入和处理的性能。
03Parquet
列式存储,是一个面向列的二进制文件格式(不可直接读取),文件中包含数据和元数据,所以该存储格式是自解析的,在大型查询时效率很快高效,parquet主要用在存储多层嵌套式数据上提供良好的性能支持,默认采用uncompressed不压缩方式。
压缩方式
hive主要支持gzip、zlib、snappy、lzo 这四种压缩方式。
压缩不会改变元数据的分割性,即压缩后原来的值不变。
建表声明语句是:tblproperties("orc.compress"="SNAPPY")
压缩方式的评判标准主要有以下几点
- 压缩比,越高越好
- 压缩时间,越快越好
- 已经压缩后是否可以再分割,可分割的话则可以为多个mapper程序并行处理,提高大数据分布式计算并行度
针对压缩方式做一个小结对比
- 压缩率的话:gzip压缩率最佳,但压缩解压缩速度较慢
- 压缩速度的话:snappy压缩解压缩速度最佳,但压缩率较低
- 是否可切片的话:gzip/snappy/zlib是不支持切片,而lzo支持切片
适场景而选定压缩方式
- 数据量极其大且不经常用来做计算的数据,可采用GZip,因为其压缩占比最高,但压缩解压缩速度最慢。
- 数据量不大且经常需要用来计算的数据,可采用Snappy或者Lzo,常常还用来搭配orc和parquet存储格式实现大幅度的数据压缩存储。
适场景而选定存储格式
- hive生产环境下时常是采用orc或者parquet这2种存储格式,但最好是做好统一,别一个数仓里的表存储格式百花齐放
- 我建议就是数仓各层统一采用orc存储格式,拥有一定的压缩率且压缩解压缩速度也适中
- orc存储格式默认搭配的zlib压缩方式适合用作数仓ODS层表设计,因为这层一般是业务贴源层来入库数据和备份,查询频率打不大,而orc存储格式搭配snappy压缩方式适合用作数仓DW层表设计,这公共层表一般查询较频繁,所以要考虑下查询时解压缩速度
- 一般数据量预测会很大的话才不选用orc存储格式,主要是为了避免map端数据倾斜,因为orc+snappy不支持分割文件操作,所以压缩文件只会被一个任务读取,压缩文件很大的话就会造成mapper处理该文件极其耗时,这就是所谓的map读取文件出现数据倾斜
压缩方式一般选择Snappy,效率最高。要启用中间压缩,需要设定hive.exec.compress.intermediate为true,同时指定压缩方式hive.intermediate.compression.codec为org.apache.hadoop.io.compress.SnappyCodec。另外,参数hive.intermediate.compression.type可以选择对块(BLOCK)还是记录(RECORD)压缩,BLOCK的压缩率比较高。输出压缩的配置基本相同,打开hive.exec.compress.output即可 Parquet和ORC都是Apache旗下的开源列式存储格式。列式存储比起传统的行式存储更适合批量OLAP查询,并且也支持更好的压缩和编码。我们选择Parquet的原因主要是它支持Impala查询引擎,并且我们对update、delete和事务性操作需求很低。
结论,一般选择orcfile/parquet + snappy 方式