hive性能优化小结(1)

本文涉及的产品
云原生数据仓库AnalyticDB MySQL版,8核32GB 100GB 1个月
简介: hive性能优化小结(1)

优化的根本思想

  • 尽早尽量过滤数据,减少每个阶段的数据量
  • 减少job数
  • 解决数据倾斜问题

60a6bcefe26f4b118e50f46e4d0afd1d.png

  • 常见优化整理:

  • 列裁剪和分区裁剪
  • 谓词下推(PPD

  • 合理选择排序

  • group by代替distinct
  • job优化

  • 表join的优化
  • 合理选择文件存储格式和压缩方式
  • union all优化
  • CBO优化
  • 合理设置Map和Reduce的个数

  • 解决小文件过多问题
  • 参数调数

  • 解决数据倾斜问题

  • 并行执行与本地模式

  • 严格模式
  • JVM重用

列裁剪与分区裁剪

  • 这其实就是查询时只读取需要的列,分区裁剪就是只读取需要的分区。所以尽量不要 select * ,还有就是要指定分区
  • Hive 在读数据的时候,可以只读取查询中所需要用到的列,而忽略其他的列。这样做可以节省读取开销:中间表存储开销和数据整合开销。
set hive.optimize.cp = true; ## 列裁剪,取数只取查询中需要用到的列,默认是true
set hive.optimize.pruner=true; ## 分区裁剪 默认是true

在 HiveQL 解析阶段对应的则是 ColumnPruner 逻辑优化器

谓词下推

指的是在不影响数据结果的情况下,将过滤表达式尽可能移动至靠近数据源的位置,以使真正执行时能直接跳过无关的数据,这样在map执行过滤条件,可以减少map端数据输出,起到了数据收敛的作用,降低了数据在集群上传输的量,节约了集群的资源,也提升了任务的性能。

一句话讲:将SQL语句中的where谓词逻辑从reduce端提前到map端执行,减少下游处理的数据量

set hive.optimize.ppd=true;   ## 默认是true
# 谓词不下推示例
select a.*, b.* from a join b on a.id = b.id where b.age > 20;
# 谓词下推示例
// 这条就预先把 b 表的满足 age>20 的条件先筛选出来为一个c表再进行聚合
// 这是一种手动谓词下推
select a.*, c.* from a join (select * from b where age > 20) c on a.id = c.id;

选择合理的排序

  • order by
    全局排序,只走一个reducer,当表数据量较大时容易计算不出来,性能不佳慎用,在严格模式下需要加limit
  • sort by
    局部排序,即保证单个reduce内结果有序,但没有全局排序的能力。

    把map端随机分发给reduce端执行,如果是要实现全局排序且走多个reducer的优化需求时,可以在外层嵌套一层,例如:select * from (select * from 表名 sort by 字段名 limit N) order by 字段名 limit N,这样就有2个Job,一个是内层的局部排序,一个是外层的归并全局排序
  • distribute by
    按照指定的字段把数据划分输出到不同的reducer中,是控制数据如何从map端输出到reduce端,hive会根据distribute by后面的字段和对应reducer的个数进行hash分发
  • cluster by
    拥有distrubute by的能力,同时也拥有sort by的能力,
    所以可以理解cluster by是 distrubute by+sort by

以下举个排序方式优化案例,取用户信息表(10亿数据量)中年龄排前100的用户信息以下案例实现也体现了一个大数据思想,分而治之,大job拆分小job。

-- 原脚本
select *
from tmp.user_info_table
where dt = '2022-07-04'
order by age -- 全局排序,只走一个reduce
limit 100
;
-- 优化脚本
set mapred.reduce.tasks=50; -- 设置reduce个数为50
select *
from tmp.user_info_table
where dt = '2022-07-04'
distribute by (case when age<20 then 0
        when age >=20 and age <= 40 then 1
        else 2
    end
) -- distribute by主要是为了控制map端输出的数据在reduce端中是如何划分的,防止map端数据随机分配到reduce。这里字段做case when判断是因为用户年龄的零散值会导致分布不均匀,起太多reduce本身也耗时浪费资源
sort by age -- 起多个reduce排序,保证单个reduce结果有序
limit 100 -- 取前100,因为是按照年龄局部排序过,所以前100个也一定是年龄最小的
;

如果使用sort by,那么还是会视情况启动多个reducer进行排序,并且保证每个reducer内局部有序。为了控制map端数据分配到reducer的key,往往还要配合distribute by一同使用。如果不加distribute by的话,map端数据就会随机分配到reducer。

count(distinct ) 和 group by

count(distinct)逻辑只会有很少的reducer来处理。这时可以用count(列名)+group by来改写。

但是这样写会启动两个MR job(单纯distinct只会启动一个),所以要确保数据量大到启动job的overhead远小于计算耗时,才考虑这种方法。

当数据集很小或者key的倾斜比较明显时,group by还可能会比distinct慢。

那么如何用group by方式同时统计多个列?下面是解决方法:group by +union all

select t.a,sum(t.b),count(t.c),count(t.d) from (
select a,b,null c,null d from some_table
union all
select a,0 b,c,null d from some_table group by a,c
union all
select a,0 b,null c,d from some_table group by a,d
) t;

group by配置调整

# map端预聚合

group by时,如果先起一个combiner在map端做部分预聚合,可以有效减少shuffle数据量。预聚合的配置项是hive.map.aggr,默认值true,对应的优化器为GroupByOptimizer,简单方便。

通过hive.groupby.mapaggr.checkinterval参数也可以设置map端预聚合的行数阈值,超过该值就会分拆job,默认值100000。

# 倾斜均衡配置项

group by时如果某些key对应的数据量过大,就会发生数据倾斜。Hive自带了一个均衡数据倾斜的配置项hive.groupby.skewindata,默认值false。

其实现方法是在group by时启动两个MR job。第一个job会将map端数据随机输入reducer,每个reducer做部分聚合,相同的key就会分布在不同的reducer中。第二个job再将前面预处理过的数据按key聚合并输出结果,这样就起到了均衡的效果。

但是,配置项毕竟是死的,单纯靠它有时不能根本上解决问题,因此还是建议自行了解数据倾斜的细节,并优化查询语句。

job优化

# 减少job数

不论是外关联outer join还是内关联inner join,如果Join的key相同,不管有多少个表,都会合并为一个MapReduce任务。

示例1:1个JOB
SELECT a.val, b.val, c.val
FROM a
JOIN b ON (a.key= b.key1)
JOIN c ON (c.key= b.key1 )
示例2:2个JOB
SELECT a.val, b.val, c.val
FROM a
JOIN b ON (a.key= b.key1)
JOIN c ON (c.key= b.key2)

# JOB输入输出优化

善用muti-insert、union all不同表的union all相当于multiple inputs,同一个表的union all,相当map一次输出多条。

示例:
insert overwrite table tmp1
select ... from a where 条件1;
insert overwrite table tmp2
select ... from a where 条件2;
--优化后的示例代码如下:
from a
insert overwrite table tmp1
select ... where 条件1
insert overwrite table tmp2
select ... where 条件2;

表join优化

小表大表 Join

build table(小表)前置      小表进内存。

将 key 相对分散,并且数据量小的表放在 join 的左边,可以使用 map join 让小的维度表
先进内存。在 map 端完成 join。
实际测试发现:新版的 hive 已经对小表 JOIN 大表和大表 JOIN 小表进行了优化。小表放
在左边和右边已经没有区别

多表join

多表joins时根据相同的key关联            会将多个join合并为一个MR job来处理

利用map join特性

设置自动选择 Mapjoin

set hive.auto.convert.join = true; 默认为 true

大表小表的阈值设置(默认 25M 以下认为是小表)

set hive.mapjoin.smalltable.filesize = 25000000;

map join工作原理

60a6bcefe26f4b118e50f46e4d0afd1d.png

大表join 大表(sort merge bucket join)
分桶其实就是把大表化成了“小表”,然后 Map-Side Join 解决之,这是典型的
分而治之的思想。

思想:首先进行排序,继而合并,然后放到所对应的bucket中去,bucket是hive中和分区表类似的技术,就是按照key进行hash,相同的hash值都放到相同的buck中去。在进行两个表联合的时候。我们首先进行分桶,在join会大幅度的对性能进行优化。也就是说,在进行联合的时候,是table1中的一小部分和table1中的一小部分进行联合,table联合都是等值连接,相同的key都放到了同一个bucket中去了,那么在联合的时候就会大幅度的减小无关项的扫描。

 限制条件:

针对参与join的这两张做相同的hash散列,每个桶里面的数据还要排序
2、这两张表的分桶个数要成倍数。

具体操作:

1) 首先设置如下:
set hive.auto.convert.sortmerge.join=true;
set hive.optimize.bucketmapjoin = true;//正常的情况下,应该是启动smbjoin的但是这里的数据量太小啦,还是启动了mapjoin
set hive.optimize.bucketmapjoin.sortedmerge = true;
set hive.auto.convert.sortmerge.join.noconditionaltask=true;
2) 小表的bucket数=大表bucket数 
3) Bucket 列 == Join 列 == sort 列 
## 当用户执行bucket map join的时候,发现不能执行时,禁止查询
set hive.enforce.sortmergebucketmapjoin=false;
## 如果join的表通过sort merge join的条件,join是否会自动转换为sort merge join
set hive.auto.convert.sortmerge.join=true;
## 当两个分桶表 join 时,如果 join on的是分桶字段,小表的分桶数是大表的倍数时,可以启用mapjoin 来提高效率。
# bucket map join优化,默认值是 false
set hive.optimize.bucketmapjoin=false;
## bucket map join 优化,默认值是 false
set hive.optimize.bucketmapjoin.sortedmerge=false;

注意:

hive并不检查两个join的表是否已经做好bucket且sorted,需要用户自己去保证join的表,否则可能数据不正确。有两个办法

1)hive.enforce.sorting 设置为true。
2)手动生成符合条件的数据,通过在sql中用distributed c1 sort by c1 或者 cluster by c1 
表创建时必须是CLUSTERED且SORTED,如下 
create table test_smb_2(mid string,age_id string) 
CLUSTERED BY(mid) SORTED BY(mid) INTO 500 BUCKETS;

大表join 大表(空key过滤)

有时 join 超时是因为某些 key 对应的数据太多,而相同 key 对应的数据都会发送到相同

的 reducer 上,从而导致内存不够。此时我们应该仔细分析这些异常的 key,很多情况下,

这些 key 对应的数据是异常数据,我们需要在 SQL 语句中进行过滤。例如 key 对应的字段为

空,操作如下

insert overwrite table jointable select n.* from (select 
* from nullidtable where id is not null) n left join bigtable o on n.id = 
o.id;

大表join 大表(空key转换)

有时虽然某个 key 为空对应的数据很多,但是相应的数据不是异常数据,必须要包含在

join 的结果中,此时我们可以表 a 中 key 为空的字段赋一个随机的值,使得数据随机均匀地

分不到不同的 reducer 上。例如

- 做空key转换优化时的hql,利用case when判断加随机数
select a.id 
from a.left join b
on case when a.id is null then concat('hive'+rand()) else a.id end = b.id
不同数据类型
这种情况不太常见,主要出现在相同业务含义的列发生过逻辑上的变化时。
举个例子,假如我们有一旧一新两张日历记录表,旧表的记录类型字段是(event_type int),新表的是(event_type string)。为了兼容旧版记录,新表的event_type也会以字符串形式存储旧版的值,比如'17'。当这两张表join时,经常要耗费很长时间。其原因就是如果不转换类型,计算key的hash值时默认是以int型做的,这就导致所有“真正的”string型key都分配到一个reducer上。所以要注意类型转换:
select a.uid,a.event_type,b.record_data
from calendar_record_log a
left outer join (
select uid,event_type from calendar_record_log_2
where pt_date = 20190228
) b on a.uid = b.uid and b.event_type = cast(a.event_type as string)
where a.pt_date = 20190228;

60a6bcefe26f4b118e50f46e4d0afd1d.png

文件存储格式

hive主要有textfile、sequencefile、orc、parquet 这四种存储格式,其中sequencefile很少使用,常见的主要就是orc和parquet这两种,往往也搭配着压缩方式合理使用。

建表声明语句是:stored as textfile/orc/parquet
01Textfile

行式存储,这是hive表的默认存储格式,默认不做数据压缩,磁盘开销大,数据解析开销大,数据不支持分片(即代表着会带来无法对数据进行并行操作)

02Orc

行列式存储,将数据按行分块,每个块按列存储,其中每个块都存储着一个索引,支持none和zlib和snappy这3种压缩方式,默认采用zlib压缩方式,不支持切片,orc存储格式能提高hive表的读取写入和处理的性能。

03Parquet

列式存储,是一个面向列的二进制文件格式(不可直接读取),文件中包含数据和元数据,所以该存储格式是自解析的,在大型查询时效率很快高效,parquet主要用在存储多层嵌套式数据上提供良好的性能支持,默认采用uncompressed不压缩方式。

压缩方式

hive主要支持gzip、zlib、snappy、lzo 这四种压缩方式。
压缩不会改变元数据的分割性,即压缩后原来的值不变。

建表声明语句是:tblproperties("orc.compress"="SNAPPY")

压缩方式的评判标准主要有以下几点                                                  

  • 压缩比,越高越好
  • 压缩时间,越快越好
  • 已经压缩后是否可以再分割,可分割的话则可以为多个mapper程序并行处理,提高大数据分布式计算并行度

针对压缩方式做一个小结对比

  • 压缩率的话:gzip压缩率最佳,但压缩解压缩速度较慢
  • 压缩速度的话:snappy压缩解压缩速度最佳,但压缩率较低
  • 是否可切片的话:gzip/snappy/zlib是不支持切片,而lzo支持切片



适场景而选定压缩方式

  1. 数据量极其大且不经常用来做计算的数据,可采用GZip,因为其压缩占比最高,但压缩解压缩速度最慢。
  2. 数据量不大且经常需要用来计算的数据,可采用Snappy或者Lzo,常常还用来搭配orc和parquet存储格式实现大幅度的数据压缩存储。

适场景而选定存储格式

  1. hive生产环境下时常是采用orc或者parquet这2种存储格式,但最好是做好统一,别一个数仓里的表存储格式百花齐放
  2. 我建议就是数仓各层统一采用orc存储格式,拥有一定的压缩率且压缩解压缩速度也适中
  3. orc存储格式默认搭配的zlib压缩方式适合用作数仓ODS层表设计,因为这层一般是业务贴源层来入库数据和备份,查询频率打不大,而orc存储格式搭配snappy压缩方式适合用作数仓DW层表设计,这公共层表一般查询较频繁,所以要考虑下查询时解压缩速度
  4. 一般数据量预测会很大的话才不选用orc存储格式,主要是为了避免map端数据倾斜,因为orc+snappy不支持分割文件操作,所以压缩文件只会被一个任务读取,压缩文件很大的话就会造成mapper处理该文件极其耗时,这就是所谓的map读取文件出现数据倾斜
压缩方式一般选择Snappy,效率最高。要启用中间压缩,需要设定hive.exec.compress.intermediate为true,同时指定压缩方式hive.intermediate.compression.codec为org.apache.hadoop.io.compress.SnappyCodec。另外,参数hive.intermediate.compression.type可以选择对块(BLOCK)还是记录(RECORD)压缩,BLOCK的压缩率比较高。输出压缩的配置基本相同,打开hive.exec.compress.output即可
Parquet和ORC都是Apache旗下的开源列式存储格式。列式存储比起传统的行式存储更适合批量OLAP查询,并且也支持更好的压缩和编码。我们选择Parquet的原因主要是它支持Impala查询引擎,并且我们对update、delete和事务性操作需求很低。

结论,一般选择orcfile/parquet + snappy 方式


相关实践学习
AnalyticDB MySQL海量数据秒级分析体验
快速上手AnalyticDB MySQL,玩转SQL开发等功能!本教程介绍如何在AnalyticDB MySQL中,一键加载内置数据集,并基于自动生成的查询脚本,运行复杂查询语句,秒级生成查询结果。
阿里云云原生数据仓库AnalyticDB MySQL版 使用教程
云原生数据仓库AnalyticDB MySQL版是一种支持高并发低延时查询的新一代云原生数据仓库,高度兼容MySQL协议以及SQL:92、SQL:99、SQL:2003标准,可以对海量数据进行即时的多维分析透视和业务探索,快速构建企业云上数据仓库。 了解产品 https://www.aliyun.com/product/ApsaraDB/ads
相关文章
|
10月前
|
SQL 分布式计算 监控
Hive性能优化之计算Job执行优化 2
Hive性能优化之计算Job执行优化
122 1
|
10月前
|
SQL 存储 分布式计算
Hive性能优化之表设计优化1
Hive性能优化之表设计优化1
51 1
|
2月前
|
SQL 存储 分布式计算
Hive的性能优化有哪些方法?请举例说明。
Hive的性能优化有哪些方法?请举例说明。
93 0
|
2月前
|
SQL 存储 关系型数据库
Presto【实践 01】Presto查询性能优化(数据存储+SQL优化+无缝替换Hive表+注意事项)及9个实践问题分享
Presto【实践 01】Presto查询性能优化(数据存储+SQL优化+无缝替换Hive表+注意事项)及9个实践问题分享
228 0
|
10月前
|
SQL 存储 分布式计算
Hive性能优化之表设计优化2
Hive性能优化之表设计优化2
63 1
|
10月前
|
SQL 分布式计算 资源调度
Hive性能优化之计算Job执行优化 1
Hive性能优化之计算Job执行优化
108 0
Hive性能优化之计算Job执行优化 1
|
9月前
|
SQL 分布式计算 大数据
分享一个 HIVE SQL 性能优化点-使用公共表表达式 CTE 替换临时表
分享一个 HIVE SQL 性能优化点-使用公共表表达式 CTE 替换临时表
|
SQL 存储 分布式计算
Hive企业级性能优化
Hive作为大数据平台举足轻重的框架,以其稳定性和简单易用性也成为当前构建企业级数据仓库时使用最多的框架之一。
312 0
Hive企业级性能优化
|
SQL 存储 分布式计算
hive性能优化小结(2)
hive性能优化小结(2)
hive性能优化小结(2)
|
SQL 分布式计算 负载均衡
Hive性能优化(全面)
Hadoop的计算框架特性下的HIve有效的优化手段
Hive性能优化(全面)