Hive 调优集锦,让 Hive 调优想法不再碎片化2

本文涉及的产品
传统型负载均衡 CLB,每月750个小时 15LCU
EMR Serverless StarRocks,5000CU*H 48000GB*H
应用型负载均衡 ALB,每月750个小时 15LCU
简介: Hive 调优集锦,让 Hive 调优想法不再碎片化2


三、HQL语法和运行参数层面优化


3.1 查看Hive执行计划


Hive 的 SQL 语句在执行之前需要将 SQL 语句转换成 MapReduce 任务,因此需要了解具体的转换过程,可以在 SQL 语句中输入如下命令查看具体的执行计划。


查看执行计划,添加extended关键字可以查看更加详细的执行计划

explain [extended] query

3.2 列裁剪


列裁剪就是在查询时只读取需要的列,分区裁剪就是只读取需要的分区。当列很多或者数据量很大时,如果 select * 或者不指定分区,全列扫描和全表扫描效率都很低。Hive 在读数据的时候,可以只读取查询中所需要用到的列,而忽略其他的列。这样做可以节省读取开销:中间表存储开销和数据整合开销。


set hive.optimize.cp = true; ## 列裁剪,取数只取查询中需要用到的列,默认是true


3.3 谓词下推


将 SQL 语句中的 where 谓词逻辑都尽可能提前执行,减少下游处理的数据量。对应逻辑优化器是PredicatePushDown

set hive.optimize.ppd=true; ## 默认是true


示例程序:

select a.*, b.* from a join b on a.id = b.id where b.age > 20;


select a.*, c.* from a join (select * from b where age > 20) c on a.id = c.id;


3.4 分区裁剪


列裁剪就是在查询时只读取需要的列,分区裁剪就是只读取需要的分区。当列很多或者数据量很大时,

如果 select * 或者不指定分区,全列扫描和全表扫描效率都很低。

在查询的过程中只选择需要的分区,可以减少读入的分区数目,减少读入的数据量


Hive 中与分区裁剪优化相关的则是:

set hive.optimize.pruner=true; ## 默认是true


在 HiveQL 解析阶段对应的则是 ColumnPruner 逻辑优化器。

select * from student where department = "AAAA";


3.5 合并小文件


如果一个mapreduce job碰到一对小文件作为输入,一个小文件启动一个Task


Map 输入合并


在执行 MapReduce 程序的时候,一般情况是一个文件的一个数据分块需要一个 mapTask 来处理。但是如果数据源是大量的小文件,这样就会启动大量的 mapTask 任务,这样会浪费大量资源。可以将输入的小文件进行合并,从而减少 mapTask 任务数量。

## Map端输入、合并文件之后按照block的大小分割(默认)
set hive.input.format=org.apache.hadoop.hive.ql.io.CombineHiveInputFormat;
## Map端输入,不合并
set hive.input.format=org.apache.hadoop.hive.ql.io.HiveInputFormat;


Map/Reduce输出合并


大量的小文件会给 HDFS 带来压力,影响处理效率。可以通过合并 Map 和 Reduce 的结果文件来消除影响。

## 是否合并Map输出文件, 默认值为true
set hive.merge.mapfiles=true;
## 是否合并Reduce端输出文件,默认值为false
set hive.merge.mapredfiles=true;
## 合并文件的大小,默认值为256000000
set hive.merge.size.per.task=256000000;
## 每个Map 最大分割大小
set mapred.max.split.size=256000000;
## 一个节点上split的最少值
set mapred.min.split.size.per.node=1; // 服务器节点
## 一个机架上split的最少值
set mapred.min.split.size.per.rack=1; // 服务器机架


hive.merge.size.per.task 和 mapred.min.split.size.per.node 联合起来

1、默认情况先把这个节点上的所有数据进行合并,如果合并的那个文件的大小超过了256M就开启另外一个文件继续合并
2、如果当前这个节点上的数据不足256M,那么就都合并成一个逻辑切片。


现在有 100 个Task,总共有 10000M 的数据, 平均一下,每个 Task 执行 100M 的数据的计算。假设只启动 10 个Task,每个Task就要执行1000M 的数据。 如果只有2个Task,5000M。


3.6 合理设置 Map Task 并行度



  • 第一:MapReduce 中的 MapTask 的并行度机制


Map 数过大:当输入文件特别大,MapTask 特别多,每个计算节点分配执行的 MapTask 都很多,这时候可以考虑减少 MapTask 的数量。增大每个 MapTask 处理的数据量。而且 MapTask 过多,最终生成的结果文件数也太多。


1、Map阶段输出文件太小,产生大量小文件

2、初始化和创建Map的开销很大


Map 数太小:当输入文件都很大,任务逻辑复杂,MapTask 执行非常慢的时候,可以考虑增加

MapTask 数,来使得每个 MapTask 处理的数据量减少,从而提高任务的执行效率。


1、文件处理或查询并发度小,Job执行时间过长

2、大量作业时,容易堵塞集群


一个 MapReduce Job 的 MapTask 数量是由输入分片 InputSplit 决定的。


而输入分片是由 FileInputFormat.getSplit() 决定的。一个输入分片对应一个 MapTask,而输入分片是由三个参数决定。


image.png


输入分片大小的计算是这么计算出来的

long splitSize = Math.max(minSize, Math.min(maxSize, blockSize))


默认情况下,输入分片大小和 HDFS 集群默认数据块大小一致,也就是默认一个数据块,启用一个 MapTask 进行处理,这样做的好处是避免了服务器节点之间的数据传输,提高 job 处理效率。


两种经典的控制 MapTask 的个数方案:减少 MapTask 数或者增加 MapTask 数。


1、减少 MapTask 数是通过合并小文件来实现,这一点主要是针对数据源

2、增加 MapTask 数可以通过控制上一个 job 的 reduceTask 个数


重点注意:不推荐把这个值进行随意设置!


推荐的方式:使用默认的切块大小即可,如果非要调整,最好是切块的 N 倍数。


NodeManager节点个数:N ===》 Task = ( N * 0.95) * M


第二:合理控制 MapTask 数量

1、减少 MapTask 数可以通过合并小文件来实现

2、增加 MapTask 数可以通过控制上一个 ReduceTask 默认的 MapTask 个数


计算方式


输入文件总大小:total_size

HDFS 设置的数据块大小:dfs_block_size

default_mapper_num = total_size / dfs_block_size


MapReduce 中提供了如下参数来控制 map 任务个数,从字面上看,貌似是可以直接设置 MapTask 个数的样子,但是很遗憾不行,这个参数设置只有在大于 default_mapper_num 的时候,才会生效。


set mapred.map.tasks=10; ## 默认值是 2


那如果我们需要减少 MapTask 数量,但是文件大小是固定的,那该怎么办呢?


可以通过 mapred.min.split.size 设置每个任务处理的文件的大小,这个大小只有在大于dfs_block_size 的时候才会生效。


split_size = max(mapred.min.split.size, dfs_block_size)
split_num = total_size / split_size
compute_map_num = Math.min(split_num, Math.max(default_mapper_num,mapred.map.tasks))

这样就可以减少 MapTask 数量了。


总结一下控制 mapper 个数的方法。


1、如果想增加 MapTask 个数,可以设置 mapred.map.tasks 为一个较大的值

2、如果想减少 MapTask 个数,可以设置 maperd.min.split.size 为一个较大的值

3、如果输入是大量小文件,想减少 mapper 个数,可以通过设置 hive.input.format 合并小文件


如果想要调整 mapper 个数,在调整之前,需要确定处理的文件大概大小以及文件的存在形式(是大量小文件,还是单个大文件),然后再设置合适的参数。


不能盲目进行暴力设置,不然适得其反。MapTask 数量与输入文件的 split 数息息相关,在 Hadoop 源码org.apache.hadoop.mapreduce.lib.input.FileInputFormat 类中可以看到 split 划分的具体逻辑。


可以直接通过参数 mapred.map.tasks (默认值2)来设定 MapTask 数的期望值,但它不一定会生效。


3.7 合理设置 Reduce Task 并行度


如果 ReduceTask 数量过多,一个 ReduceTask 会产生一个结果文件,这样就会生成很多小文件,那么如果这些结果文件会作为下一个 Job 的输入,则会出现小文件需要进行合并的问题,而且启动和初始化ReduceTask 需要耗费资源。


如果 ReduceTask 数量过少,这样一个 ReduceTask 就需要处理大量的数据,并且还有可能会出现数据倾斜的问题,使得整个查询耗时长。


默认情况下,Hive 分配的 reducer 个数由下列参数决定:


Hadoop MapReduce 程序中,ReducerTask 个数的设定极大影响执行效率,ReducerTask 数量与输出文件的数量相关。


如果 ReducerTask 数太多,会产生大量小文件,对HDFS造成压力。


如果 ReducerTask 数太少,每个 ReducerTask 要处理很多数据,容易拖慢运行时间或者造成 OOM。


这使得Hive 怎样决定 ReducerTask 个数成为一个关键问题。


遗憾的是 Hive 的估计机制很弱,不指定 ReducerTask 个数的情况下,Hive 会猜测确定一个ReducerTask 个数,


基于以下两个设定:


参数1:hive.exec.reducers.bytes.per.reducer (默认256M)

参数2:hive.exec.reducers.max (默认为1009)

参数3:mapreduce.job.reduces (默认值为-1,表示没有设置,那么就按照以上两个参数进行设置)


ReduceTask 的计算公式为:


N = Math.min(参数2,总输入数据大小 / 参数1)


可以通过改变上述两个参数的值来控制 ReduceTask 的数量,也可以通过。


set mapred.map.tasks=10;
set mapreduce.job.reduces=10;

通常情况下,有必要手动指定 ReduceTask 个数。考虑到 Mapper 阶段的输出数据量通常会比输入有大幅减少,因此即使不设定 ReduceTask 个数,重设参数2 还是必要的。


依据经验,可以将 参数2 设定为 M * (0.95 * N) (N为集群中 NodeManager 个数)。一般来说,NodeManage 和 DataNode 的个数是一样的。


3.8 Join 优化


Join优化整体原则:


1、优先过滤后再进行 join 操作,最大限度的减少参与 join 的数据量
2、小表 join 大表,最好启动 mapjoin,hive 自动启用 mapjoin, 小表不能超过25M,可以更改
3、Join on的条件相同的话,最好放入同一个job,并且 join 表的排列顺序从小到大:
  select a.*, b.*, c.* 
  from a join b 
  on a.id = b.id 
  join c 
  on a.id = c.i
4、如果多张表做 join, 如果多个链接条件都相同,会转换成一个JOb
  • 优先过滤数据


尽量减少每个阶段的数据量,对于分区表能用上分区字段的尽量使用,同时只选择后面需要使用到的列,最大限度的减少参与 Join 的数据量。


  • 小表 join 大表原则


小表 join 大表的时应遵守小表 join 大表原则,原因是 join 操作的 reduce 阶段,位于 join 左边的表内容会被加载进内存,将条目少的表放在左边,可以有效减少发生内存溢出的几率。join 中执行顺序是从左到右生成 Job,应该保证连续查询中的表的大小从左到右是依次增加的。


  • 使用相同的连接键


在 hive 中,当对 3 个或更多张表进行 join 时,如果 on 条件使用相同字段,那么它们会合并为一个 MapReduce Job,利用这种特性,可以将相同的 join on 放入一个 job 来节省执行时间。


  • 尽量原子操作


尽量避免一个SQL包含复杂的逻辑,可以使用中间表来完成复杂的逻辑。


  • 大表 Join 大表


1、 空 key 过滤:有时 join 超时是因为某些 key 对应的数据太多,而相同key 对应的数据都会发送到相同的 reducer 上,从而导致内存不够。此时我们应该仔细分析这些异常的 key,很多情况下,这些 key 对应的数据是异常数据,我们需要在SQL语句中进行过滤。

2、 空 key 转换:有时虽然某个 key 为空对应的数据很多,但是相应的数据不是异常数据,必须要包含在 join 的结果中,此时我们可以表a中 key 为空的字段赋一个随机的值,使得数据随机均匀地分不到不同的 reducer 上。


3.9 启用 MapJoin


这个优化措施,但凡能用就用! 大表 join 小表 小表满足需求: 小表数据小于控制条件时。


MapJoin 是将 join 双方比较小的表直接分发到各个 map 进程的内存中,在 map 进程中进行 join 操作,这样就不用进行 reduce 步骤,从而提高了速度。只有 join 操作才能启用 MapJoin。


## 是否根据输入小表的大小,自动将reduce端的common join 转化为map join,将小表刷入内存中。
## 对应逻辑优化器是MapJoinProcessor
set hive.auto.convert.join = true;
## 刷入内存表的大小(字节)
set hive.mapjoin.smalltable.filesize = 25000000;
## hive会基于表的size自动的将普通join转换成mapjoin
set hive.auto.convert.join.noconditionaltask=true;
## 多大的表可以自动触发放到内层LocalTask中,默认大小10M
set hive.auto.convert.join.noconditionaltask.size=10000000;

Hive 可以进行多表 Join。Join 操作尤其是 Join 大表的时候代价是非常大的。MapJoin 特别适合大小表join的情况。


在Hive join场景中,一般总有一张相对小的表和一张相对大的表,小表叫 build table,大表叫 probe table。


Hive 在解析带 join 的 SQL 语句时,会默认将最后一个表作为 probe table,将前面的表作为 build table 并试图将它们读进内存。


如果表顺序写反,probe table 在前面,引发 OOM 的风险就高了。


在维度建模数据仓库中,事实表就是 probe table,维度表就是 build table。


这种 Join 方式在 map 端直接完成 join 过程,消灭了 reduce,效率很高。而且 MapJoin 还支持非等值连接。


当 Hive 执行 Join 时,需要选择哪个表被流式传输(stream),哪个表被缓存(cache)。


Hive 将JOIN 语句中的最后一个表用于流式传输,因此我们需要确保这个流表在两者之间是最大的。


如果要在不同的 key 上 join 更多的表,那么对于每个 join 集,只需在 ON 条件右侧指定较大的表。


也可以手动开启 mapjoin:


--SQL方式,在SQL语句中添加MapJoin标记(mapjoin hint)
--将小表放到内存中,省去shffle操作
// 在没有开启mapjoin的情况下,执行的是reduceJoin
SELECT /*+ MAPJOIN(smallTable) */ smallTable.key, bigTable.value FROM
smallTable JOIN bigTable ON smallTable.key = bigTable.key;
/*+mapjoin(smalltable)*/
  • Sort-Merge-Bucket(SMB) Map Join


它是另一种 Hive Join 优化技术,使用这个技术的前提是所有的表都必须是分桶表(bucket)和分桶排序的(sort)。分桶表的优化!


具体实现:


1、针对参与 join 的这两张做相同的 hash 散列,每个桶里面的数据还要排序

2、这两张表的分桶个数要成倍数。

3、开启 SMB join 的开关!


一些常见参数设置:


## 当用户执行bucket map join的时候,发现不能执行时,禁止查询
set hive.enforce.sortmergebucketmapjoin=false;
## 如果join的表通过sort merge join的条件,join是否会自动转换为sort merge join
set hive.auto.convert.sortmerge.join=true;
## 当两个分桶表 join 时,如果 join on的是分桶字段,小表的分桶数是大表的倍数时,可以启用mapjoin 来提高效率。
# bucket map join优化,默认值是 false
set hive.optimize.bucketmapjoin=false;
## bucket map join 优化,默认值是 false
set hive.optimize.bucketmapjoin.sortedmerge=false;

3.10 Join数据倾斜优化


在编写 Join 查询语句时,如果确定是由于 join 出现的数据倾斜,那么请做如下设置:


# join的键对应的记录条数超过这个值则会进行分拆,值根据具体数据量设置
set hive.skewjoin.key=100000;
# 如果是join过程出现倾斜应该设置为true
set hive.optimize.skewjoin=false;

如果开启了,在 Join 过程中 Hive 会将计数超过阈值 hive.skewjoin.key(默认100000)的倾斜 key 对应的行临时写进文件中,然后再启动另一个 job 做 map join 生成结果。


通过 hive.skewjoin.mapjoin.map.tasks 参数还可以控制第二个 job 的 mapper 数量,默认10000。


set hive.skewjoin.mapjoin.map.tasks=10000

3.11 CBO 优化


join 的时候表的顺序的关系:前面的表都会被加载到内存中。后面的表进行磁盘扫描。


select a., b., c.* 
from a 
join b on a.id = b.id 
join c on a.id = c.id;

Hive 自 0.14.0 开始,加入了一项 “Cost based Optimizer” 来对 HQL 执行计划进行优化,这个功能通过 “hive.cbo.enable” 来开启。


在 Hive 1.1.0 之后,这个 feature 是默认开启的,它可以 自动优化 HQL 中多个 Join 的顺序,并选择合适的 Join 算法。


CBO,成本优化器,代价最小的执行计划就是最好的执行计划。


传统的数据库,成本优化器做出最优化的执行计划是依据统计信息来计算的,Hive 的成本优化器也一样。


Hive 在提供最终执行前,优化每个查询的执行逻辑和物理执行计划。

这些优化工作是交给底层来完成的。


根据查询成本执行进一步的优化,从而产生潜在的不同决策:如何排序连接,执行哪种类型的连接,并行度等等。要使用基于成本的优化(也称为CBO),请在查询开始设置以下参数:


set hive.cbo.enable=true;
set hive.compute.query.using.stats=true;
set hive.stats.fetch.column.stats=true;
set hive.stats.fetch.partition.stats=true;

3.12 怎样做笛卡尔积


当 Hive 设定为严格模式(hive.mapred.mode=strict)时,不允许在 HQL 语句中出现笛卡尔积,这实际说明了 Hive 对笛卡尔积支持较弱。因为找不到 Join key,Hive 只能使用 1 个 reducer 来完成笛卡尔积。


当然也可以使用 limit 的办法来减少某个表参与 join 的数据量,但对于需要笛卡尔积语义的需求来说,经常是一个大表和一个小表的 Join 操作,结果仍然很大(以至于无法用单机处理),这时 MapJoin 才是最好的解决办法。MapJoin,顾名思义,会在 Map 端完成 Join 操作。这需要将 Join 操作的一个或多个表完全读入内存。


PS:MapJoin 在子查询中可能出现未知 BUG。在大表和小表做笛卡尔积时,规避笛卡尔积的方法是,给 Join 添加一个 Join key,原理很简单:将小表扩充一列 join key,并将小表的条目复制数倍,join key 各不相同;将大表扩充一列 join key 为随机数。


精髓就在于复制几倍,最后就有几个 reduce 来做,而且大表的数据是前面小表扩张 key 值范围里面随机出来的,所以复制了几倍 n,就相当于这个随机范围就有多大 n,那么相应的,大表的数据就被随机的分为了 n 份。并且最后处理所用的 reduce 数量也是 n,而且也不会出现数据倾斜。


3.13 Group By 优化


默认情况下,Map 阶段同一个 Key 的数据会分发到一个 Reduce 上,当一个 Key 的数据过大时会产生数据倾斜。进行 group by 操作时可以从以下两个方面进行优化。


Map 端部分聚合

事实上并不是所有的聚合操作都需要在 Reduce 部分进行,很多聚合操作都可以先在 Map 端进行部分聚合,然后在 Reduce 端的得出最终结果。


## 开启Map端聚合参数设置
set hive.map.aggr=true;
# 设置map端预聚合的行数阈值,超过该值就会分拆job,默认值100000
set hive.groupby.mapaggr.checkinterval=100000


有数据倾斜时进行负载均衡


当 HQL 语句使用 group by 时数据出现倾斜时,如果该变量设置为 true,那么 Hive 会自动进行负载均衡。


策略就是把 MapReduce 任务拆分成两个:第一个先做预汇总,第二个再做最终汇总。


# 自动优化,有数据倾斜的时候进行负载均衡(默认是false)
set hive.groupby.skewindata=false;

当选项设定为 true 时,生成的查询计划有两个 MapReduce 任务。


1、在第一个 MapReduce 任务中,map 的输出结果会随机分布到 reduce 中,每个 reduce 做部分聚合操作,并输出结果,这样处理的结果是相同的 group by key 有可能分发到不同的 reduce 中,从而达到负载均衡的目的;

2、第二个 MapReduce 任务再根据预处理的数据结果按照 group by key 分布到各个 reduce 中,最后完成最终的聚合操作


Map 端部分聚合:并不是所有的聚合操作都需要在 Reduce 端完成,很多聚合操作都可以先在 Map 端进行部分聚合,最后在 Reduce 端得出最终结果,对应的优化器为 GroupByOptimizer。


那么如何用 group by 方式同时统计多个列?


select t.a, sum(t.b), count(t.c), count(t.d) from some_table t group by t.a;


下面是解决方法:


select t.a, sum(t.b), count(t.c), count(t.d) from (
  select a, b, null c,null d 
  from some_table
  union all
  select a,0 b, c,null d    
  from some_table group by a,c
  union all
  select a,0 b,null c,d    
  from some_table group by a,d
) t;


相关实践学习
SLB负载均衡实践
本场景通过使用阿里云负载均衡 SLB 以及对负载均衡 SLB 后端服务器 ECS 的权重进行修改,快速解决服务器响应速度慢的问题
负载均衡入门与产品使用指南
负载均衡(Server Load Balancer)是对多台云服务器进行流量分发的负载均衡服务,可以通过流量分发扩展应用系统对外的服务能力,通过消除单点故障提升应用系统的可用性。 本课程主要介绍负载均衡的相关技术以及阿里云负载均衡产品的使用方法。
目录
相关文章
|
6月前
|
SQL 大数据 HIVE
Hive 任务调优实践总结
Hive 任务调优实践总结
59 0
|
2月前
|
SQL 数据处理 HIVE
HIVE的数据倾斜调优
hive数据倾斜主要是由shuffle引起的,而引起shuffle的又主要有四种情况,分别为: 1.group by 2.join 3.count(distinct) 4.开窗函数
62 8
|
6月前
|
SQL 分布式计算 Java
bigdata-24-Hive调优
bigdata-24-Hive调优
36 0
|
SQL 存储 JSON
Hive学习---7、企业级调优(二)
Hive学习---7、企业级调优(二)
|
SQL 缓存 JSON
Hive学习---7、企业级调优(一)
Hive学习---7、企业级调优(一)
|
SQL 存储 负载均衡
工作常用之Hive 调优【四】HQL 语法优化
列裁剪就是在查询时只读取需要的列,分区裁剪就是只读取需要的分区。当列很多或者数据量很大时,如果 select * 或者不指定分区,全列扫描和全表扫描效率都很低。
292 0
工作常用之Hive 调优【四】HQL 语法优化
|
存储 SQL 分布式计算
工作常用之Hive 调优【三】 Explain 查看执行计划及建表优化
在查询时通过 WHERE 子句中的表达式选择查询所需要的指定的分区,这样的查询效率会提高很多,所以我们需要把常常用在 WHERE 语句中的字段指定为表的分区字段。
359 0
工作常用之Hive 调优【三】 Explain 查看执行计划及建表优化
|
SQL 分布式计算 负载均衡
如何从语法与参数层面对Hive进行调优
作为企业Hadoop应用的核心产品,Hive承载着FaceBook、淘宝等大佬95%以上的离线统计,很多企业里的离线统计甚至全由Hive完成,如电商、金融等行业。Hive在企业云计算平台发挥的作用和影响愈来愈大。因此,如何优化提速已经显得至关重要。
|
SQL 缓存 负载均衡
|
SQL 缓存 负载均衡
Hive调优
Hive调优
206 0