Hive优化
本质:HDFS + MapReduce
问题原因:
- 倾斜:
- 分区:有的分区没有数据,有的分区数据堆积。(若按天分区,每一天数据差别大就叫倾斜。)
- group by:有的分组键在表中数据很多,有的分组键数据很少。
- 小表 join 大表:小表数据小,大表数据多,造成倾斜。
如何识别倾斜?
- 若表为分区分桶表,以分区字段作为聚合条件聚合,并进行抽样。
- 若有HDFS的权限,查看分区文件夹的大小是否存在明显差异。
过多:
- join过多导致job过多。
- 小文件过多。
- Mapper或Reducer过多。
使用不当:
count(distinct) ❌ => select FIELD_NAME FROM ... GROUP BY FIELD_NAME;
join ... on ... where
谓词下推(见下文)。select sum(field) from TABLE;
不支持全表聚合。- 可能的解决方法:
select sum(sub_total) from( select sum(order_amount) as sub_total from ( select sum(order_amount),user_id%3 as id from TABLE_NAME )A group by id )A;
- 可能的解决方法:
解决方案
数仓方案考虑整体性地解决问题
模型设计
- 整体最优,考虑全局。
合理减少表数量:
- 数据建模:
- 【星型】,雪花,星座。
- 维度表(静态数据),事实表(动态数据:4W1H)。
- 数仓需要将维度表"融进"事实表中,研究的是维度表中"变化"的数据。
- 维度退化 => 星型。
- sqoop|maxwell|cancal : query "select ... join ..."
- ods -> dwd insert into ... select ... join ...
- 数据建模:
充分了解业务,提前设计好预聚合。
- 分层 => 轻量聚合(获取结果的时候可以不走MapReduce)。
- 分区 => 避免交换。
- 例如:如果表关联条件与分区依据一致,无需进行交换。
- 如何选取分区字段?整体研究,选取最合适的共性字段。不一定要是商品ID,省市县三级分区等。分区尽量往业务上靠。
- 分桶 => 拉链表(分桶表)、抽样。
- "拉链表不一定是分区表,但一定要是分桶表":
- 拉链表可以是非分区的,也就是说,不需要按照日期、地区等键值对历史数据进行分区。可以简单地存储历史数据。
- 拉链表应该是分桶表,分桶有助于在执行数据合并、查找和分析操作时提高性能。
- "拉链表不一定是分区表,但一定要是分桶表":
- 压缩 => 减少体量(现在不太强调)。
- 需考虑压缩和解压缩的成本是否大于时间消耗?
- 压缩格式,表存储格式(是否支持压缩,是否支持切片)。
hadoop 内存管理
- mapred
set mapreduce.map.memory.mb=256;
Map任务内存。set mapreduce.reduce.memory.mb=512;
Reduce任务内存。set mapreduce.map.java.opts=?
Map的JVM。set mapreduce.reduce.java.opts=?
Reduce的JVM。
- yarn
set yarn.nodemanager.resource.memory-mb=-1;
NodeManager的内存。set yarn.scheduler.minimum-allocation-mb=1024;
YARN调度器的最小分配内存。set yarn.scheduler.maximum-allocation-mb=8192;
YARN调度器的最大分配内存。
倾斜:热点数据
- join: 非大小表。
- 原因:连接字段在连接表之间分布不均,或缺乏连接关系(两表的连接字段分配不均)。
- 手动处理:连接键的选择(优先选择在数据的分布上相对均衡为连接键——通过
抽样
找到)。 - 连接键拆分或随机映射(与其选取不均衡的列作为连接键,不如构建随即映射列——
select user_id%3 as id group by id
)。 - 引入hash分区或分桶使得数据分布均衡。
- 增加或减少任务的并行度。
- map join: 大小表。
- 默认true,即默认自动开启 mapjoin。
set hive.auto.convert.join=true;
- 默认小表<=25M。
set hive.mapjoin.smalltable.filesize=25M;
- 默认false,分桶表表mapjoin专用。
set hive.optimize.bucketmapjoin=true;
- combiner: 默认true,即默认开启Mapper端聚合。
set hive.map.aggr=true;
- groupby:HashPartitioner。
- 默认-1,倾斜的倍数(倾斜度) n = 倾斜数据总均量/其他数据总均量 + (其他数据的差异数)。
- 确定是否倾斜与倾斜程度:抽样(tablesample(bucket COUNT outof TOTAL))。
set mapreduce.job.reduces=n;
(见下面 Reducer 数量控制)。- 默认false。
set hive.groupby.skewindata=true;
Mapper或Reduce输出过多小文件合并
若满足以下设置条件,任务结束后会单起MapReduce对输出文件进行合并。
- 默认为true,map-only输出是否合并。
set hive.merge.mapfiles=true;
- 默认为false,mapreduce输出是否合并。
set hive.merge.mapredfiles=true;
- 默认256M,合并文件操作阈值,如果输入数据超过256M,则触发合并操作。
set hive.merge.size.per.task=256M;
- 默认16M,合并文件平均大小小于该阈值则将他们合并为大文件。
set hive.merge.smallfiles.avgsize=16M;
控制Mapper和Reducer数量
Mapper
mapper的启动和初始化开销较大,数量过多导致开销大于逻辑处理,浪费资源。
- 默认的Mapper数量:
- int default_num = total_file_size/dfs.block.size;
- 默认为2, 只有大于2时才会生效。
set mapreduce.job.maps=2;
- Mapper数量有限值:
- Math.max(min.split.size,Math.min(dfs.block.size,max.split.size))
- 通过调整以下三项配置来调整Mapper数量。
- 默认128M。
set dfs.block.size=128M;
- 默认单个Mapper处理数据上限256M。
set mapred.max.split.size=256M;
- 默认1字节。
set mapred.min.split.size=1;
- 默认单个节点处理的数据下限1字节。
set mapred.min.split.size.per.node=1;
- 默认单个机架处理的数据下限
set mapred.min.split.size.per.rack=1;
- Mapper输入多个小文件合并后再切片
set hive.input.format=org.apache.hadoop.hive.ql.io.CombineHiveInputFormat;
- Mapper切片的大小【越接近128M越好】?
set mapred.min.split.size = N;
- 若文件过大,切片大小尽量调大。
- 需要综合考虑Yarn的内存权限和分布式计算的均衡
- 若表A单行内容量大,且处理逻辑复杂,需要将文件拆分(列裁剪,行筛选)
将数据通过分区表拆分成更小粒度
将数据随机且均匀地分散到不同的Reducer,这有助于均衡负载。
set mapreduce.job.reduces=3; create table A_SPLITS as select * from A distribute by rand(3);
Reducer
- 默认-1,可以根据需要在客户端设置 :
int n = Math.min(SIZE/bytes.per.reducer, reducers.max) | num_partitions
set mapreduce.job.reduces=n;
- 若存在数据倾斜,则 Hive 会单独分配Reducer处理倾斜数据
- 若未设置 Reducer 数量,自动计算 Reducer 数量
- 默认每个Reducer的数据为256M
set hive.exec.reducers.bytes.per.reducer=256M;
- 默认单个任务最大Reducer数量(<1024台)
set hive.exec.reducers.max=1009;
- Reducer只能为1的情况:
- 没有使用 GROUP BY 子句来对数据进行分组,并且只是在原始数据上直接使用了聚合函数(如sum、count、max、min、avg、collect_list、concat_ws等)
- 优化方案
select sum(sum_a) from (select sum(a) from A group by STH)T
- 使用了order by
- 优化方案
select * from (select * from A distribute by a sort by a) order by a;
- 没有使用 GROUP BY 子句来对数据进行分组,并且只是在原始数据上直接使用了聚合函数(如sum、count、max、min、avg、collect_list、concat_ws等)
- 存在笛卡尔积,尽量不用
- 若出现
小表+大表
的笛卡尔积:小表扩展join key,并根据需求复制 DN_COUNT 份,大表扩展join key,根据 DN_COUNT 随机生成- 关闭自动mapjoin :
set hive.auto.convert.join=false
- 设置reducer的数量为:
set mapreduce.job.reduces=DN_COUNT
减少数据规模
调整存储格式
- 关闭自动mapjoin :
- 若出现
- 设置建表格式
set hive.default.fileformat=orc|textfile|rcfile|sequencefile;
- 压缩:为了减少Shuffle在局域网内数据交换产生的时间。
- Mapper压缩
- 开启Mapper输出压缩功能,默认false
set mapreduce.map.output.compress=true;
- 设置Map输出数据的压缩方式:默认DefaultCodec
set mapreduce.map.output.compress.codec = org.apache.hadoop.io.compress.SnappyCodec;
- 设置任务过程输出是否压缩
set hive.exec.compress.intermediate=true;
- Reducer压缩
- 开启Reducer输出压缩功能;默认false
set hive.exec.compress.output=true;
- reduce最终输出数据压缩;默认false
set mapreduce.output.fileoutputformat.compress=true;
- reduce最终数据输出压缩为块压缩;默认RECORD
set mapreduce.output.fileoutputformat.compress.type=BLOCK;
- reduce最终数据输出压缩方式;默认DefaultCodec
set mapreduce.output.fileoutputformat.compress.codec = org.apache.hadoop.io.compress.SnappyCodec;
- 开启Mapper输出压缩功能,默认false
- Mapper压缩
分区
- 动态分区
- 默认开启
set hive.exec.dynamic.partition=true;
- 默认strict
set hive.exec.dynamic.partition.mode=nonstrict;
- 默认最大动态分区数1000
set hive.exec.max.dynamic.partitions=1000;
- 默认单节点最大动态分区数100
set hive.exec.max.dynamic.partitions.pernode=100;
- 动态添加多分区数据(需要一张源数据表)
insert into table TABLE_PARTITION partition(partition_field) select *, partition_field from TABLE_SOURCE where ...;
- 默认开启
- 静态分区
- 静态分区数据挂载
load data [local] inpath 'DATA_PATH' [overwrite|into] table TABLE_PARTITION partition(partition_field=VALUE);
- 查看分区
show partitions TABLE_PARTITION;
- 添加分区
alter table TABLE_PARTITION add partition(partition_field=VALUE);
- 删除分区
alter table TABLE_PARTITION drop partition(partition_field=VALUE);
其他配置
- 静态分区数据挂载
- count(distinct)
- 不妥:
select count(distict b) from TAB group by a
- 稳妥:
select count(b) from (select a,b from TAB group by a,b) group by a
- 不妥:
- CBO (COST BASED OPTIMIZER)
- 默认为true
set hive.cbo.enable=true;
- 默认为true
- 分区裁剪
- 以 on,where 多条件字段顺序,建【多重】分区表
- 设置执行引擎,默认mr, tez|spark(优先)|DAG
set hive.execution.engine=tez;
- 并行执行无依赖job
- 默认false
set hive.exec.parallel=true;
- 设置最大并行任务数,默认为8
set hive.exec.parallel.thread.number=8;
- JVM重用(Hive3已取消,作了解即可)
- 每个JVM运行的任务数
set mapreduce.job.jvm.numtasks = 8;
- 默认false
本地化运算
- 默认1,启动本地化模式reducer数量必须为0|1
set mapreduce.job.reduces=0/1;
- 默认 yarn ,需设置为本地模式
set mapreduce.framework.name=local;
- 开启自动本地化模式
set hive.exec.mode.local.auto=true;
- 设置本地化文件数量上限,默认4
set hive.exec.mode.local.auto.input.files.max=4;
- 默认128M,本地化文件大小上限
set hive.exec.mode.local.auto.inputbytes.max=128M;
- 可能会导致内存溢出:java.lang.OutofMemoryError : java heap space
- 修改 mv hive-env.sh.template hive-env.sh,去掉注释
# export HADOOP_HEAPSIZE=1024
- 修改 mv hive-env.sh.template hive-env.sh,去掉注释
- 默认1,启动本地化模式reducer数量必须为0|1
llap
- 设置执行模式,默认container,2.0后扩展了llap
set hive.execution.mode=llap;
- llap为DataNode常驻进程,混合模型,小型任务可以由llap解决,大任务由yarn容器执行
- 设置执行模式,默认container,2.0后扩展了llap
- fetch
- 默认mode,简单查询不走mr,直接提取
set hive.fetch.task.conversion=more;
- 默认mode,简单查询不走mr,直接提取
- 谓词下推(下推即优化的意思):确定主从表条件应该放在on后还是where后
- 开启
- set hive.optimize.ppd=true;
- 规则
- 左右外连接
- 主表:on不可下推,where可下推
- 从表:on可下推,where不可下推
- 内连接
- on和where都下推
- 全外连接
- on和where都不下推
- 左右外连接
- 开启