3.4 Bucket Join
⚫ 应用场景
适合于大表Join大表
⚫ 原理
◼ 将两张表按照相同的规则将数据划分,根据对应的规则的数据进行join,减少了比较次数,
提高了性能
⚫ 使用
◼ Bucket Join
语法:clustered by colName
参数
– 开启分桶 join
set hive.optimize.bucketmapjoin = true;
要求
分桶字段 = Join字段 ,桶的个数相等或者成倍数
◼ Sort Merge Bucket Join(SMB):基于有序的数据Join
语法:clustered by colName sorted by (colName)
参数
– 开启分桶 SMB join
set hive.optimize.bucketmapjoin = true;
set hive.auto.convert.sortmerge.join= true;
set hive.optimize.bucketmapjoin.sortedmerge = true;
set hive.auto.convert.sortmerge.join.noconditionaltask= true;
要求
分桶字段 = Join字段 = 排序字段 ,桶的个数相等或者成倍数
4 优化器
4.1 关联优化
在使用Hive的过程中经常会遇到一些特殊的问题,例如当一个程序中如果有一些操作彼此之间
有关联性,是可以放在一个MapReduce中实现的,但是Hive会不智能的选择,Hive会使用两个
MapReduce来完成这两个操作。
例如:当我们执行以下SQL语句:
select …… from table group by id order by id desc;
该SQL语句转换为MapReduce时,我们可以有两种方案来实现:
⚫ 方案一
◼ 第一个MapReduce做group by,经过shuffle阶段对id做分组
◼ 第二个MapReduce对第一个MapReduce的结果做order by,经过shuffle阶段对id
进行排序
⚫ 方案二
◼ 因为都是对id处理,可以使用一个MapReduce的shuffle既可以做分组也可以排序
在这种场景下,Hive会默认选择用第一种方案来实现,这样会导致性能相对较差,我们可以在
Hive中开启关联优化,对有关联关系的操作进行解析时,可以尽量放在同一个MapReduce中实现。
⚫ 配置
set hive.optimize.correlation= true;
4.2 CBO优化器引擎
在使用MySQL或者Hive等工具时,我们经常会遇到一个问题,默认的优化器在底层解析一些
聚合统计类的处理的时候,底层解析的方案有时候不是最佳的方案。
例如:当前有一张表【共1000条数据】,id构建了索引,id =100值有900条,我们现在的需
求是查询所有id = 100的数据,所以SQL语句为:select * from table where id = 100;
由于id这一列构建了索引,索引默认的优化器引擎RBO,会选择先从索引中查询id = 100的值
所在的位置,再根据索引记录位置去读取对应的数据,但是这并不是最佳的执行方案。有id=100
的值有900条,占了总数据的90%,这时候是没有必要检索索引以后再检索数据的,可以直接检索
数据返回,这样的效率会更高,更节省资源,这种方式就是CBO优化器引擎会选择的方案。
使用Hive时,Hive中也支持RBO与CBO这两种引擎,默认使用的是RBO优化器引擎。
⚫ RBO
◼ rule basic optimise:基于规则的优化器
◼ 根据设定好的规则来对程序进行优化
⚫ CBO
◼ cost basic optimise:基于代价的优化器
◼ 根据不同场景所需要付出的代价来合适选择优化的方案
◼ 对数据的分布的信息【数值出现的次数,条数,分布】来综合判断用哪种处理的方案
是最佳方案
很明显CBO引擎更加智能,所以在使用Hive时,我们可以配置底层的优化器引擎为CBO引擎。
⚫ 配置
set hive.cbo.enable= true;
set hive.compute.query.using.stats= true;
set hive.stats.fetch.column.stats= true;
set hive.stats.fetch.partition.stats= true;
⚫ 要求
◼ 要想使用CBO引擎,必须构建数据的元数据【表行数、列的信息、分区的信息……】
◼ 提前获取这些信息,CBO才能基于代价选择合适的处理计划
◼ 所以CBO引擎一般搭配analyze分析优化器一起使用
4.3 Analyze分析优化器
⚫ 功能
用于提前运行一个MapReduce程序将表或者分区的信息构建一些元数据【表的信息、
分区信息、列的信息】,搭配CBO引擎一起使用
⚫ 语法
– 构建分区信息元数据
ANALYZE TABLE tablename
[ PARTITION(partcol1[=val1], partcol2[=val2], …)]
COMPUTE STATISTICS [noscan];
– 构建列的元数据
ANALYZE TABLE tablename
[ PARTITION(partcol1[=val1], partcol2[=val2], …)]
COMPUTE STATISTICS FOR COLUMNS ( columns name1, columns name2…) [noscan];
– 查看元数据
DESC FORMATTED [tablename] [columnname];
⚫ 举例
◼ 构建表中分区数据的元数据信息
ANALYZE TABLE tb_login_part PARTITION(logindate) COMPUTE STATISTICS;
◼ 构建表中列的数据的元数据信息
ANALYZE TABLE tb_login_part COMPUTE STATISTICS FOR COLUMNS userid;
◼ 查看构建的列的元数据
desc formatted tb_login_part userid;
5 谓词下推(PPD)
5.1 基本思想
谓词下推 Predicate Pushdown(PPD)的思想简单点说就是在不影响最终结果的情况下,尽
量将过滤条件提前执行。谓词下推后,过滤条件在map端执行,减少了map端的输出,降低了数据
在集群上传输的量,降低了Reduce端的数据负载,节约了集群的资源,也提升了任务的性能。
5.2 基本规则
⚫ 开启参数
– 默认自动开启谓词下推
hive.optimize.ppd= true;
⚫ 不同Join场景下的Where谓词下推测试
⚫ 试验结论
◼ Inner Join和Full outer Join,条件写在on后面,还是where后面,性能上面没有区别
◼ Left outer Join时 ,右侧的表写在on后面,左侧的表写在where后面,性能上有提高
◼ Right outer Join时,左侧的表写在on后面、右侧的表写在where后面,性能上有提高
◼ 如果SQL语句中出现不确定结果的函数,也无法实现下推
6 数据倾斜
6.1 数据倾斜的现象
分布式计算中最常见的,最容易遇到的问题就是数据倾斜,数据倾斜的现象是,当我们提交运
行一个程序时,我们通过监控发现,这个程序的大多数的Task都已经运行结束了,只有某一个Task
一直在运行,迟迟不能结束,导致整体的进度卡在99%或者100%,这时候我们就可以判定程序出
现了数据倾斜的问题。
6.2 数据倾斜的原因
表面上看,发生数据倾斜的原因在于这个Task运行过慢,但是仔细分析我们会发现,这个Task
运行过慢的原因在于这个Task的负载要比其他Task的负载要高,所以发生数据倾斜的直观原因在于
Task的数据分配不均衡。
那为什么会出现多个Task数据分配不均衡的情况呢?
从两方面考虑,第一:数据本身就是倾斜的,数据中某种数据出现的次数过多。第二:分区规
则导致这些相同的数据都分配给了同一个Task,导致这个Task拿到了大量的数据,而其他Task拿到
的数据比较少,所以运行起来相比较于其他Task就比较慢一些。
综上所述,产生数据倾斜的根本原因在于分区规则。
6.3 group By的数据倾斜
当程序中出现group by或者count(distinct)等分组聚合的场景时,如果数据本身是倾斜的根
据MapReduce的Hash分区规则,肯定会出现数据倾斜的现象。根本原因是因为分区规则导致的,
所以我们可以通过以下几种方案来解决group by导致的数据倾斜的问题。
⚫ 方案一:开启Map端聚合
– 开启 Map 端聚合:Combiner
hive.map.aggr= true;
◼ 通过减少Reduce的输入量,避免每个Task数据差异过大导致数据倾斜
⚫ 方案二:实现随机分区
– SQL 中避免数据倾斜,构建随机分区
select * from table distribute by rand();
◼ distribute by用于指定底层的MapReduce按照哪个字段作为Key实现分区、分组等
◼ 默认由Hive自己选择,我们可以通过distribute by自己指定,通过rank函数随机值实
现随机分区,避免数据倾斜
⚫ 方案三:自动构建随机分区并自动聚合
– 开启随机分区,走两个 MapReduce
hive.groupby.skewindata= true;
◼ 开启该参数以后,当前程序会自动通过两个MapReduce来运行
◼ 第一个MapReduce自动进行随机分区,然后实现聚合
◼ 第二个MapReduce将聚合的结果再按照业务进行处理,得到结果
6.4 Join的数据倾斜
实际业务需求中往往需要构建两张表的Join实现,如果两张表比较大,无法实现Map Join,
只能走Reduce Join,那么当关联字段中某一种值过多的时候依旧会导致数据倾斜的问题,面对Join
产生的数据倾斜,我们核心的思想是尽量避免Reduce Join的产生,优先使用Map Join来实现,但
往往很多的Join场景不满足Map Join的需求,那么我们可以以下几种方案来解决Join产生的数据倾
斜问题:
⚫ 方案一:提前过滤,将大数据变成小数据,实现Map Join
实现两张表的Join时,我们要尽量考虑是否可以使用Map Join来实现Join过程。有些
场景下看起来是大表Join大表,但是我们可以通过转换将大表Join大表变成大表Join小表,
来实现Map Join。
例如:现在有两张表订单表A与用户表B,需要实现查询今天所有订单的用户信息,
关联字段为userid。
A表:今天的订单,1000万条,字段:orderId,userId,produceId,price等
B表:用户信息表,100万条,字段:userid,username,age,phone等
◼ 需求:两张表关联得到今天每个订单的用户信息
◼ 实现1:直接关联,实现大表Join大表
select a. * ,b. * from A join B on a.userid = b.userid;
由于两张表比较大,无法走Map Join,只能走Reduce Join,容易产生数据倾斜。
◼ 实现2:将下了订单的用户的数据过滤出来,再Join
select a. * ,d. * from ( -- 获取所有下订单的用户信息 select b. * from -- 获取所有下订单的 userid ( select distinct a.userid from A a ) c join B b on c.userid = b.userid ) d join A a on d.userid = a.userid;
◼ 100万个用户中,在今天下订单的人数可能只有一小部分,大量数据是不会
Join成功的
◼ 可以提前将订单表中的userid去重,获取所有下订单的用户id
◼ 再使用所有下订单的用户id关联用户表,得到所有下订单的用户的信息
◼ 最后再使用下订单的用户信息关联订单表
◼ 通过多次Map Join来代替Reduce Join,性能更好也可以避免数据倾斜
⚫ 方案二:使用Bucket Join
◼ 如果使用方案一来避免Reduce Join ,有些场景下依旧无法满足,例如过滤后的数据
依旧是一张大表,那么最后的Join依旧是一个Reduce Join
◼ 这种场景下,我们可以将两张表的数据构建为桶表,实现Bucket Map Join,避免数
据倾斜
⚫ 方案三:使用Skew Join
Skew Join是Hive中一种专门为了避免数据倾斜而设计的特殊的Join过程,这种Join的
原理是将Map Join和Reduce Join进行合并,如果某个值出现了数据倾斜,就会将产生数
据倾斜的数据单独使用Map Join来实现,其他没有产生数据倾斜的数据由Reduce Join来
实现,这样就避免了Reduce Join中产生数据倾斜的问题,最终将Map Join的结果和
Reduce Join的结果进行Union合并
◼ 配置
– 开启运行过程中 skewjoin
set hive.optimize.skewjoin= true;
– 如果这个 key 的出现的次数超过这个范围
set hive.skewjoin.key=100000;
– 在编译时判断是否会产生数据倾斜
set hive.optimize.skewjoin.compiletime= true;
– 不合并,提升性能
set hive.optimize.union.remove= true;
– 如果 Hive 的底层走的是 MapReduce,必须开启这个属性,才能实现不合并
set mapreduce.input.fileinputformat.input.dir.recursive= true;
◼ 示例图