文章目录
前言
一、离线数据的主要挑战:“数据倾斜”
二、Hive 的优化
三、Join 无关的优化
3.1 group by 引起的倾斜优化
3.2 count distinct 优化
四、大表 Join 小表优化
五、大表 Join 大表优化
5.1 问题场景
方案 1:转化为 mapjoin
方案 2:join 时用 case when 语句
方案 3:倍数B表,再取模join
方案 4:动态一分为二
前言
前面,我们陆陆续续聊过了 Hadoop原理实战、 Hive 的底层原理实践,今天就来聊一聊大家最关心的 Hive 优化实践。
实际搞过离线数据处理的同学都知道,Hive SQL 的各种优化方法都是和数据倾斜密切相关的,所以我会先来聊一聊 “数据倾斜” 的基本概念,然后再在此基础上为大家介绍各种场景下的 Hive 优化方案。
Hive 的优化分为 join 相关的优化 和 join 无关的优化 。
从项目实际来说, join 相关的优化其实占据了 Hive 优化的大部分内容,而 join 相关的优化又分为 mapjoin 可以解决的 join 优化和 mapjoin 无法解决的 join 优化。
一、离线数据的主要挑战:“数据倾斜”
首先介绍 “数据倾斜” 的概念。
“倾斜”应该来自于统计学里的的偏态分布,数据处理种的倾斜和此相关。
对于分布式数据处理来说,我们希望数据平均分布到每个处理节点,如下图所示:
但是实际上由于业务数据本身的问题或者分布算法的问题,每个节点分配到的数据量很可能并不是我们预想的那样,比如:
甚至还会出现更极端的情况:
也就是说,只有待分到最多数据的节点处理完数据,整个数据处理任务才能完成,时分布式的意义就大打折扣 ,想想那个卡死的 99% 。
实际上,即使每个节点分配到的数据量大致相同,数据仍可能倾斜,比如考虑统计词频的极端问题,如果某个节点分配到的词都是一个词,那么显此节点需要的耗时将很长,即使其数据量和其他节点的数据量相同。
Hive 的优化正是采用各种措施和方法对上述场景的倾斜问题进行优化和处理。
二、Hive 的优化
其实在实际 Hive SQL 开发的过程中, Hive SQL 性能的问题上实际只有一小部分和数据倾相关。
很多时候, Hive SQL 运行得慢是由开发人员对于使用的数据了解不够以及一些不良的使用习惯引起的。
开发人员 要确定以下几点:
需要计算的指标真的需要从数据仓库的公共明细层来自行汇总么? 是不是数据公共层团队开发的公共汇总层已经可以满足自己的需求?对于大众的、 KPI 相关的指标等通常设计良好的数据仓库公共层肯定已经包含了,直接使用即可。
真的需要扫描这么多分区么? 比如对于销售明细事务表来说,扫描一年的分区和扫描一周的分区所带来的计算、 IO 开销完全是两个量级,所耗费的时间肯定也是不同的。作为开发人员,我们需要仔细考虑业务的需求,尽量不要浪费计算和存储资源!
尽量不要使用 select * from your_table 这样的方式,用到哪些列就指定哪些列。 如 select coll, col2 from your_table ,另外, where 条件中也尽量添加过滤条件,以去掉无关的数据行,从而减少整个 MapReduce 任务中需要处理、分发的数据量。
输入文件不要是大量的小文件。 Hive 的默认 Input Split 是 128MB (可配置),小文件可先合并成大文件。
在保证了上述几点之后,有的时候发现 Hive SQL 还是要运行很长时间,甚至运行不出来, 这时就需要真正的 Hive 优化技术了!
三、Join 无关的优化
Hive SQL 性能问题基本上大部分都和 join 相关,对于和 join 无关的问题主要有 groupby 相关的倾斜和 count distinct 相关的优化。
3.1 group by 引起的倾斜优化
group by 引起的倾斜主要是输入数据行按照 group by 列分布不均匀 引起的。
比如,假设按照供应商对销售明细事实表来统计订单数,那么部分大供应商的订单量显然非常多,而多数供应商的订单量就一般,由于 group by 的时候是按照供应商的 ID 分发到每个 Reduce Task ,那么此时分配到大供应商的 Reduce Task 就分配了更多的订单,从而导致数据倾斜。
对于 group by 引起的倾斜,优化措施非常简单,只需设置下面参数即可:
set hive.map.aggr = true set hive.groupby.skewindata=true
此时 Hive 在数据倾斜的时候会进行负载均衡,生成的查询计划会有两个 MapReduce Job。
第一个 MapReduce Job 中,Map 的输出结果集合会随机分布到 Reduce 中,每个Reduce 做部分聚合操作并输出结果。这样处理的结果是相同的 GroupBy Key 有可能被分布到不同的 Reduce 中,从而达到负载均衡的目的;
第二个 MapReduce Job 再根据预处理的数据结果,按照 GroupBy Key 分布到 Reduce 中(这过程可以保证相同的GroupBy Key被分布到同一个Reduce中),最后完成最终的聚合操作。
3.2 count distinct 优化
在 Hive 开发过程中,应该小心使用 count distinct ,因为很容易引起性能问题,比如下面的 SQL:
select count(distinct user) from some_table
由于必须去重,因此 Hive 将会把 Map 阶段的输出全部分布到 Reduce Task 上,此时很容易引起性能问题。对于这种情况,可以通过先 group by 再 count 的方式来优化,优化后的 SQL 如下:
select count(*) from ( select user from some_table group by user ) tmp;
原理为:先利用 group by 去重,再统计 group by 的行数目。
四、大表 Join 小表优化
join 相关的优化主要分为 mapjoin 可以解决的优化 ( 即大表 join 小表) 和 mapjoin 无法解决的优化( 即大表 join 大表 )。 大表 join 小表相对容易解决,大表 join 大表相对复杂和难以解决,但也不是不可解决的,只是相对比较麻烦而已。
首先介绍大表 join 小表优化 。仍以销售明细事实表为例来说明大表 join 小表的场景。
假如供应商会进行评级,比如(五星、四星、 两星、 一星),此时业务人员希望能够分析各供应商星级的每天销售情况及其占比。
开发人员一般会写出如下 SQL:
select Seller_srar, count(order_id) as ordre_cnt from ( select order_id,seller_id from dwd_sls_fact_detail_table where partition_value ='20170101' ) a Left outer join( select seller_id,seller_star from dim_seller where partition_value='20170101' ) b on a.seller_id = b.seller_id group by b.seller_star;
但正如上述所言,现实世界的二八准则将导致订单集中在部分供应商上,而好的供应商的评级通常会更高,此时更加剧了数据倾斜的程度。如果不加以优化,上述 SQL 将会耗费很长时间,甚至运行不出结果!
通常来说,供应商是有限的,比如上千家、上万家,数据量不会很大,而销售明细事实表比较大,这就是典型的大表 join 小表问题,可以通过 mapjoin 的方式来优化,只需添加 mapjoin hint 即可,优化后的 SQL 如下:
select /*+mapjoin(b)*/ Seller_srar, count(order_id) as ordre_cnt from ( select order_id,seller_id from dwd_sls_fact_detail_table where partition_value ='20170101' ) a Left outer join( select seller_id,seller_star from dim_seller where partition_value='20170101' ) b on a.seller_id = b.seller_id group by b.seller_star;
/*+mapjoin(b)*/ 即 mapjoin himt,如果需要 mapjoin 多个表,则格式为/*+mapjoin(b,c,d)*/ 。
Hive 对于 mapjoin 是默认开启的,设置参数为:
Set hive.auto.convert.join=ture;
mapjoin 优化是在 Map 阶段进行 join ,而不是像通常那样在 Reduce 阶段按照 join 列进行分发后在每个 Reduce 任务节点上进行 join ,不需要分发也就没有倾斜的问题,相反 Hive 会将小表全量复制到每个 Map 任务节点(对于本例是 dim_seller ,当然仅全量复制 b表 sql 指定的列),然后每个 Map 任务节点执行 lookup 小表即可。
从上述分析可以看出,小表不能太大,否则全量复制分发得不偿失。
实际上 Hive 根据参数 hive.mapjoin.smalltable.filesize ( 0.11.0 本后是 hive.auto.convert.join.noconditionaltask.size )来确定小表的大小是否满足条件(默认 25M)。
实际中此参数值所允许的最大值可以修改,但是一般最大不能超过 1GB (太大的话 Map 任务所在的节点内存会撑爆, Hive 会报错 。另外需要注意的是, HDFS 显示的文件大小是压缩后的大小, 当实际加载到内存的时候,容量会增大很多,很多场景下可能会膨胀 10 倍)。