Hive数据倾斜的原因以及常用解决方案

简介: Hive数据倾斜的原因以及常用解决方案

在Hadoop平台的hive数据库进行开发的时候,数据倾斜也是比较容易遇到的问题,这边文章对数据倾斜的定义以及产生的原因、对应的解决方案进行学习。

一、数据倾斜的定义

数据倾斜:数据分布不均匀,造成数据大量的集中到一点,造成数据热点。主要表现为任务进度长时间维持在 99%或者 100%的附近,查看任务监控页面,发现只有少量 reduce 子任务未完成,因为其处理的数据量和其他的 reduce 差异过大。 单一 reduce 处理的记录数和平均记录数相差太大,通常达到好几倍之多,最长时间远大 于平均时长。

二、数据倾斜产生的原因

1,key 分布不均匀。
2,业务数据本身的特性。
3,建表考虑不周全。
4,某些 HQL 语句本身就存在数据倾斜。

三、数据倾斜的类型以及对应解决方案

1,空值产生的数据倾斜

原理:使本身为 null 的所有记录不会拥挤在同一个 reduceTask 了,会由于有替代的 随机字符串值,而分散到了多个 reduceTask 中了,由于 null 值关联不上,处理后并不影响最终结果。

示例:在日志中,常会有信息丢失的问题,比如日志中的 user_id,如果取其中的 user_id 和用户表中的 user_id 相关联,就会碰到数据倾斜的问题。

常用解决方案:
(1)user_id 为空的不参与关联

select * from log a join user b on a.user_id is not null and a.user_id = b.user_id
union all
select * from log c where c.user_id is null;

(2)赋予空值新的 key 值
原理:把空值的 key 变成一个字符串加上一个随机数,就能把造成数据倾斜的 数据分到不同的 reduce 上解决数据倾斜的问题。

select * from log a left outer join user b on
case when a.user_id is null then concat('hive',rand()) else a.user_id end = b.user_id

2,不同数据类型关联产生数据倾斜

示例:用户表中 user_id 字段为 int,log 表中 user_id 为既有 string 也有 int 的类型, 当按照两个表的 user_id 进行 join 操作的时候,默认的 hash 操作会按照 int 类型的 id 进 行分配,这样就会导致所有的 string 类型的 id 就被分到同一个 reducer 当中。

把数字类型 id 转换成 string 类型的 id

select * from user a 
left outer join log b on b.user_id = cast(a.user_id as string)

3,join 倾斜

map join 概念:将其中做连接的小表(全量数据)分发到所有 MapTask 端进行 Join,从 而避免了 reduceTask,前提要求是内存足以装下该全量数据。

自动开启 map join 优化,由两个参数控制:

set hive.auto.convert.join=true; //设置 MapJoin 优化自动开启
set hive.mapjoin.smalltable.filesize=25000000 //设置小表不超过多大时开启 mapjoin 优化

以大表 a 和小表 b 为例,所有的 maptask 节点都装载小表 b 的所有数据,然后大表 a 的 一个数据块数据比如说是 a1 去跟 b 全量数据做链接,就省去了 reduce 做汇总的过程。 所以相对来说,在内存允许的条件下使用 map join 比直接使用 MapReduce 效率还高些, 当然这只限于做 join 查询的时候。

select /* +mapjoin(a) */ a.id aid, name, age from a join b on a.id = b.id;
select /* +mapjoin(movies) */ a.title, b.rating from movies a join ratings b on a.movieid =
b.movieid;

注意:使用 map join 解决小表(记录数少)关联大表的数据倾斜问题,这个方法使用的频率非常 高,但如果小表很大,大到 map join 会出现 bug 或异常,这时就需要特别的处理

4,group by 倾斜

group by的维度太少,某值的数量太多(如性别sex,只有男和女,group by时只有两个维度,每个维度的数据量都很大),从而导致处理某个值数据的reduce处理非常耗时。

参数调整:

hive.map.aggr = true    // Map 端部分聚合,相当于Combiner;
hive.groupby.skewindata=true    //有数据倾斜的时候进行负载均衡,当选项设定为 true,生成的查询计划会有两个 MR Job。第一个 MR Job 中,Map 的输出结果集合会随机分布到 Reduce 中,每个 Reduce 做部分聚合操作,并输出结果,这样处理的结果是相同的 Group By Key 有可能被分发到不同的 Reduce 中,从而达到负载均衡的目的;第二个 MR Job 再根据预处理的数据结果按照 Group By Key 分布到 Reduce 中(这个过程可以保证相同的 Group By Key 被分布到同一个 Reduce 中),最后完成最终的聚合操作。

5,count distinct 倾斜

某特殊值较多,数据将会在一个reduce中处理,处理此特殊值的reduce耗时。

count distinct时,将值为空的情况单独处理,如果是计算count distinct,可以不用处理,直接过滤,在最后结果中加1。如果还有其他计算,需要进行group by,可以先将值为空的记录单独处理,再和其他计算结果进行union。

四、数据倾斜解决方法总结

1,参数调节:hive.map.aggr = true 在map端部分聚合。
2,参数调节:hive.groupby.skewindata=true 数据倾斜时负载均衡。
3,sql语句调节:join时选择key值分布较均匀的表作为驱动表,同时做好列裁剪和分区裁剪,以减少数据量。
4,sql语句调节:大小表join时,小表先进内存。
5,sql语句调节:大表join大表时,把key值为空的key变成一个字符串加上随机数,把倾斜的数据分到不同的reduce上,由于null值关联不上,因此处理后不影响最终结果。

五、数据倾斜解决其他方法

1,调整mapper数

mapper数量与输入文件的split数息息相关,在Hadoop源码org.apache.hadoop.mapreduce.lib.input.FileInputFormat类中可以看到split划分的具体逻辑。这里不贴代码,直接叙述mapper数是如何确定的。

可以直接通过参数mapred.map.tasks(默认值2)来设定mapper数的期望值,但它不一定会生效,下面会提到。

设输入文件的总大小为total_input_size。HDFS中,一个块的大小由参数dfs.block.size指定,默认值64MB或128MB。在默认情况下,mapper数就是:
default_mapper_num = total_input_size / dfs.block.size。
参数mapred.min.split.size(默认值1B)和mapred.max.split.size(默认值64MB)分别用来指定split的最小和最大大小。split大小和split数计算规则是:
split_size = MAX(mapred.min.split.size, MIN(mapred.max.split.size, dfs.block.size));
split_num = total_input_size / split_size。

得出mapper数:
mapper_num = MIN(split_num, MAX(default_num, mapred.map.tasks))。
可见,如果想减少mapper数,就适当调高mapred.min.split.size,split数就减少了。如果想增大mapper数,除了降低mapred.min.split.size之外,也可以调高mapred.map.tasks。
一般来讲,如果输入文件是少量大文件,就减少mapper数;如果输入文件是大量非小文件,就增大mapper数;至于大量小文件的情况,

2,调整reducer数

reducer数量的确定方法比mapper简单得多。使用参数mapred.reduce.tasks可以直接设定reducer数量,不像mapper一样是期望值。但如果不设这个参数的话,Hive就会自行推测,逻辑如下:

参数hive.exec.reducers.bytes.per.reducer用来设定每个reducer能够处理的最大数据量,默认值1G(1.2版本之前)或256M(1.2版本之后)。

参数hive.exec.reducers.max用来设定每个job的最大reducer数量,默认值999(1.2版本之前)或1009(1.2版本之后)。

得出reducer数:
reducer_num = MIN(total_input_size / reducers.bytes.per.reducer, reducers.max)。
reducer数量与输出文件的数量相关。如果reducer数太多,会产生大量小文件,对HDFS造成压力。如果reducer数太少,每个reducer要处理很多数据,容易拖慢运行时间或者造成OOM。

3,合并小文件

(1)输入阶段合并
需要更改Hive的输入文件格式,即参数hive.input.format,默认值是org.apache.hadoop.hive.ql.io.HiveInputFormat,我们改成org.apache.hadoop.hive.ql.io.CombineHiveInputFormat。
这样比起上面调整mapper数时,又会多出两个参数,分别是mapred.min.split.size.per.node和mapred.min.split.size.per.rack,含义是单节点和单机架上的最小split大小。如果发现有split大小小于这两个值(默认都是100MB),则会进行合并。具体逻辑可以参看Hive源码中的对应类。

(2)输出阶段合并
直接将hive.merge.mapfiles和hive.merge.mapredfiles都设为true即可,前者表示将map-only任务的输出合并,后者表示将map-reduce任务的输出合并。
另外,hive.merge.size.per.task可以指定每个task输出后合并文件大小的期望值,hive.merge.size.smallfiles.avgsize可以指定所有输出文件大小的均值阈值,默认值都是1GB。如果平均大小不足的话,就会另外启动一个任务来进行合并。

参考文档:https://www.jianshu.com/p/deb4a6f91d3b

相关文章
|
3月前
|
SQL 数据处理 HIVE
HIVE的数据倾斜调优
hive数据倾斜主要是由shuffle引起的,而引起shuffle的又主要有四种情况,分别为: 1.group by 2.join 3.count(distinct) 4.开窗函数
75 8
|
4月前
|
SQL 存储 分布式计算
|
7月前
|
SQL 缓存 分布式计算
手把手教你解决 Hive 的数据倾斜
数据倾斜是 Hive 中影响任务执行效率的现象,表现为某些任务处理的数据量或耗时远超其他任务。根本原因是 Shuffle 后 Key 分布不均,导致部分 Reduce 负载过高。常见场景包括空值聚合、不可拆分大文件、数值膨胀、不同数据类型 Join、Count(distinct) 计算以及表 Join 操作。解决方法包括过滤空值、转换数据类型、调整聚合策略、使用 MapJoin 等。通过合理优化,如设置 `hive.groupby.skewindata` 和 `hive.map.aggr` 参数,可以有效缓解数据倾斜问题。
760 2
|
7月前
|
SQL 分布式计算 算法
【Hive】数据倾斜怎么解决?
【4月更文挑战第16天】【Hive】数据倾斜怎么解决?
|
7月前
|
SQL HIVE
Hive数据倾斜处理集合
Hive数据倾斜处理集合
148 0
|
7月前
|
SQL 数据采集 分布式计算
Hadoop和Hive中的数据倾斜问题及其解决方案
Hadoop和Hive中的数据倾斜问题及其解决方案
116 0
|
7月前
|
SQL HIVE
Hive group by 数据倾斜问题处理
Hive group by 数据倾斜问题处理
105 0
|
SQL 分布式计算 运维
【大数据开发运维解决方案】Sqoop增量同步mysql/oracle数据到hive(merge-key/append)测试文档
上一篇文章介绍了sqoop全量同步数据到hive, 本片文章将通过实验详细介绍如何增量同步数据到hive,以及sqoop job与crontab定时结合无密码登录的增量同步实现方法。
【大数据开发运维解决方案】Sqoop增量同步mysql/oracle数据到hive(merge-key/append)测试文档
|
SQL 运维 分布式计算
【大数据开发运维解决方案】Sqoop全量同步mysql/Oracle数据到hive
前面文章写了如何部署一套伪分布式的handoop+hive+hbase+kylin环境,也介绍了如何在这个搭建好的伪分布式环境安装配置sqoop工具以及安装完成功后简单的使用过程中出现的错误及解决办法, 接下来本篇文章详细介绍一下使用sqoop全量同步oracle/mysql数据到hive,这里实验采用oracle数据库为例,
【大数据开发运维解决方案】Sqoop全量同步mysql/Oracle数据到hive
|
SQL 分布式计算 运维
【大数据开发运维解决方案】sqoop增量导入oracle/mysql数据到hive时时间字段为null处理
前面几篇文章详细介绍了sqoop全量增量导入数据到hive,大家可以看到我导入的数据如果有时间字段的话我都是在hive指定成了string类型,虽然这样可以处理掉时间字段在hive为空的问题,但是在kylin创建增量cube时需要指定一个时间字段来做增量,所以上面那种方式不行,这里的处理方式为把string改成timestamp类型,看实验:
【大数据开发运维解决方案】sqoop增量导入oracle/mysql数据到hive时时间字段为null处理