hive性能优化小结(2)

简介: hive性能优化小结(2)

union all 优化

 利用hive对UNION ALL的优化的特性,hive对union all优化只局限于非嵌套查询

示例:--3个JOB
select * from
(select ci,c2,c3 from t1 Group by c1,c2,c3
Union all
Select c1,c2,c3 from t2 Group by c1,c2,c3
) t3;
--优化后的示例代码如下:--1个JOB
select * from
(select * from t1
Union all
Select * from t2
) t3
Group by c1,c2,c3;

不同表太多的union ALL,不推荐使用;

通常采用建临时分区表,将不同表的结果insert到不同的分区(可并行),最后再统一处理;

示例:
INSERT overwrite TABLE lxw_test(flag = '1')
SELECT sndaid,mobileFROM lxw_test1;
INSERT overwrite TABLE lxw_test(flag = '2')
SELECT sndaid,mobileFROM lxw_test2;
INSERT overwrite TABLE lxw_test(flag = '3')
SELECT sndaid,mobileFROM lxw_test3;

CBO优化

CBO,成本优化器,代价最小的执行计划就是最好的执行计划。传统的数据库,成本优化器做出最优化的执行计划是依据统计信息来计算的。Hive 的成本优化器也一样。join 的时候表的顺序的关系:前面的表都会被加载到内存中。后面的表进行磁盘扫描

Hive 自 0.14.0 开始,加入了一项 "Cost based Optimizer" 来对 HQL 执行计划进行优化,这个功能通过 "hive.cbo.enable" 来开启。在 Hive 1.1.0 之后,这个 feature 是默认开启的,它可以自动优化 HQL中多个 Join 的顺序,并选择合适的 Join 算法。

当你要做那种连结条件相同的表和表 join ,例如

select a.*, b.*, c.* from a join b on a.id = b.id join c on a.id = c.id;

 如果你要使用这个功能,可以把下面的参数都打开即可

set hive.cbo.enable=true;
set hive.compute.query.using.stats=true;
set hive.stats.fetch.column.stats=true;
set hive.stats.fetch.partition.stats=true;

合理设置 Map 及 Reduce 数

调整mapper数

mapper数量与输入文件的split数息息相关,在Hadoop源码org.apache.hadoop.mapreduce.lib.input.FileInputFormat类中可以看到split划分的具体逻辑。这里不贴代码,直接叙述mapper数是如何确定的。

可以直接通过参数mapred.map.tasks(默认值2)来设定mapper数的期望值,但它不一定会生效,下面会提到。

设输入文件的总大小为total_input_size。HDFS中,一个块的大小由参数dfs.block.size指定,默认值64MB或128MB。在默认情况下,mapper数就是:

default_mapper_num = total_input_size / dfs.block.size。
参数mapred.min.split.size(默认值1B)和mapred.max.split.size(默认值64MB)分别用来指定split的最小和最大大小。split大小和split数计算规则是:
split_size = MAX(mapred.min.split.size, MIN(mapred.max.split.size, dfs.block.size));
split_num = total_input_size / split_size。
得出mapper数:
mapper_num = MIN(split_num, MAX(default_num, mapred.map.tasks))。

可见,如果想减少mapper数,就适当调高mapred.min.split.size,split数就减少了。如果想增大mapper数,除了降低mapred.min.split.size之外,也可以调高mapred.map.tasks。


一般来讲,如果输入文件是少量大文件,就减少mapper数;如果输入文件是大量非小文件,就增大mapper数;至于大量小文件的情况,得参考下面“合并小文件”一节的方法处理。

参数设置:

set mapred.map.tasks=10; ## 默认值是2

 

调整reducer数

reducer数量的确定方法比mapper简单得多。使用参数mapred.reduce.tasks可以直接设定reducer数量,不像mapper一样是期望值。但如果不设这个参数的话,Hive就会自行推测,逻辑如下:

参数hive.exec.reducers.bytes.per.reducer用来设定每个reducer能够处理的最大数据量,默认值1G(1.2版本之前)或256M(1.2版本之后)。

参数hive.exec.reducers.max用来设定每个job的最大reducer数量,默认值999(1.2版本之前)或1009(1.2版本之后)。

得出reducer数:

reducer_num = MIN(total_input_size / reducers.bytes.per.reducer, reducers.max)。

reducer数量与输出文件的数量相关。如果reducer数太多,会产生大量小文件,对HDFS造成压力。如果reducer数太少,每个reducer要处理很多数据,容易拖慢运行时间或者造成OOM。

// 每个 reduceTask 处理的最大数据大小
参数1:hive.exec.reducers.bytes.per.reducer (默认256M)
// reduceTask 的个数上限
参数2:hive.exec.reducers.max (默认为1009)
参数3:mapreduce.job.reduces (默认值为-1,表示没有设置,那么就按照以上两个参数进行设置)
reduceTask_num = Math.min(参数2,总输入数据大小 / 参数1)

合并小文件

小文件的产生有三个地方,map输入,map输出,reduce输出,小文件过多也会影响hive的分析效率:

设置map输入的小文件合并

set mapred.max.split.size=256000000;  
//一个节点上split的至少的大小(这个值决定了多个DataNode上的文件是否需要合并)
set mapred.min.split.size.per.node=100000000;
//一个交换机下split的至少的大小(这个值决定了多个交换机上的文件是否需要合并)  
set mapred.min.split.size.per.rack=100000000;
//执行Map前进行小文件合并
set hive.input.format=org.apache.hadoop.hive.ql.io.CombineHiveInputFormat;

设置map输出和reduce输出进行合并的相关参数:

//设置map端输出进行合并,默认为true
set hive.merge.mapfiles = true
//设置reduce端输出进行合并,默认为false
set hive.merge.mapredfiles = true
//设置合并文件的大小
set hive.merge.size.per.task = 256*1000*1000
//当输出文件的平均大小小于该值时,启动一个独立的MapReduce任务进行文件merge。
set hive.merge.smallfiles.avgsize=16000000

    sql方式合并小文件

    insert overwrite table test [partition(hour=...)] select * from tes

    参数优化

    set hive.optimize.countdistinct=true开启对count(distinct )的自动优化
    set hive.auto.convert.join = true;开启自动mapjoin
    set hive.mapjoin.smalltable.filesize=26214400;大表小表的阈值设置(默认25M一下认为是小表)
    set hive.exec.parallel=true;打开任务并行执行
    set hive.exec.parallel.thread.number=16;同一个sql允许最大并行度,默认值为8。默认情况下,Hive一次只会执行一个阶段。开启并行执行时会把一个sql语句中没有相互依赖的阶段并行去运行,这样可能使得整个job的执行时间缩短。提高集群资源利用率,不过这当然得是在系统资源比较空闲的时候才有优势,否则没资源,并行也起不来。
    set hive.map.aggr=true;默认值是true,当选项设定为true时,开启map端部分聚合
    set hive.groupby.skewindata = ture;默认值是false,当有数据倾斜的时候进行负载均衡,生成的查询计划有两个MapReduce任务,第一个MR Job中,Map的输出结果会随机分布到Reduce中,每个Reduce做部分聚合操作,并输出结果,这样处理的结果是相同的Group By Key有可能被分发到不同的Reduce中,从而达到负载均衡的目的;第二个MR Job再根据预处理的数据结果按照Group By Key分布到Reduce中(这个过程可以保证相同的Group By Key被分布到同一个Reduce中),最后完成最终的聚合操作
    set hive.mapred.mode=strict;设置严格模式,默认值是nonstrict非严格模式。严格模式下会禁止以下3种类型不合理查询,即以下3种情况会报错
                对于查询分区表,必须where加上分区限制条件
                使用order by全局排序时,必须加上limit限制数据查询条数
                限制了笛卡尔积查询
    set hive.input.format=org.apache.hadoop.hive.ql.io.CombineHiveInputFormat;设置map端执行前合并小文件
    set hive.exec.compress.output=true;设置hive的查询结果输出是否进行压缩
    set mapreduce.output.fileoutputformat.compress=true;设置MapReduce Job的结果输出是否使用压缩
    set hive.cbo.enable=false;关闭CBO优化,默认值true开启,可以自动优化HQL中多个JOIN的顺序,并选择合适的JOIN算法

    数据倾斜优化

    数据倾斜表现:任务进度长时间维持在99%(或100%),查看任务监控页面,发现只有少量(1个或几个)reduce子任务未完成。因为其处理的数据量和其他reduce差异过大。

    单一reduce的记录数与平均记录数差异过大,通常可能达到3倍甚至更多。最长时长远大于平均时长。

    数据倾斜常见原因:

    60a6bcefe26f4b118e50f46e4d0afd1d.png

    数据倾斜解决方案:

    (a)参数调节:hive.map.aggr = true 在map端部分聚合,hive.groupby.mapaggr.checkinterval = 100000在 Map 端进行聚合操作的条目数目
    (b)参数调节:hive.groupby.skewindata=true 数据倾斜时负载均衡
    (c)sql语句调节:join时选择key值分布较均匀的表作为驱动表,同时做好列裁剪和分区裁剪,以减少数据量
    (d)sql语句调节:大小表join时,小表先进内存
    (e)参数条件:开启mapSide join
    (f)sql语句调节:大表join大表时,把key值为空的key变成一个字符串加上随机数,把倾斜的数据分到不同的reduce上,由于null值关联不上,因此处理后不影响最终结果
    (h)sql语句调节:大表join大表时,sort merge bucket join
    (i)sql语句调节:聚合计算依赖的 key 分布不均匀时就会发生数据倾斜
    用两次 group by 代替 count distinct
    不同指标的 count distinct 放到多段 SQL 中执行,执行后再 UNION 或 JOIN 合并
    (j)参数条件:set hive.optimize.skewjoin = true;
    (k)参数条件:set hive.skewjoin.key = 250000000
     (l) sql语句调节: group by维度过小:采用sum() ,group by的方式来替换count(distinct(字段名))完成计算。

    常见调整参数:

    set hive.exec.reducers.max=200;
    set mapred.reduce.tasks= 200;---增大Reduce个数
    set hive.groupby.mapaggr.checkinterval=100000 ;--这个是group的键对应的记录条数超过这个值则会进行分拆,值根据具体数据量设置
    set hive.groupby.skewindata=true; --如果是group by过程出现倾斜 应该设置为true
    set hive.skewjoin.key=100000; --这个是join的键对应的记录条数超过这个值则会进行分拆,值根据具体数据量设置
    set hive.optimize.skewjoin=true;--如果是join 过程出现倾斜 应该设置为true
    set mapreduce.reduce.memory.mb=1024mb  --调整reducer所需的执行内存
    set hive.skewjoin.mapjoin.map.tasks=10000;
    set hive.skewjoin.mapjoin.min.split=33554432;

    使用Hive元数据做监控

    监控普通表存储的文件的平均大小

    对于大的文件块可能导致数据在读取时产生数据倾斜,影响集群任务的运行效率。下面的代码是对大于两倍HDFS文件块大小的表。

    --整体逻辑通过DBS找到对应库下面的表TBLS
    --再通过找到每个表对应的表属性,取得totalsize和numFiles两个属性,前者表示文件大小,后者表示文件数量
    select tbl_name,avgfilesize'fileSize(MB)'
    from (
    select tp.totalSize/(1024*1024)/numFiles avgfilesize,
    TBL NAME
    from DBS d
    /*DBS的主键DB_ID*/
    inner join TBLS t on d.DB_ID=t.DB_ID
    left join(
    select TBL ID,
    /*每个表存储的文件个数*/
    max(case PARAM_KEY when 'numFiles'
    then PARAM_VALUE else 0 end) numFiles,
    /*文件存储的大小*/
    max(case PARAM KEY when 'totalSize'
    then PARAM_VALUE else 0 end) totalSize
    /*TABLE PARAMS 记录的表属性*/
    from TABLE PARAMS
    GROUP BY TBL ID
    )tp on t.TBL ID=tp.TBL_ID
    where d.`NAME`='数据库名,
    and tp.numFiles is not null
    and tp.numFiles>0
    )a where avgfilesize> hdfs的文件块大小*2

    监控分区存储的文件平均大小

    大于两倍HDFS文件块大小的分区,示例如下:

    --先用DBS关联TBLS表,TBLS表关联PARTITIONS表
    PARTITION 表关联 PARTITION PARAMS
    select tbl name,PART_NAME, avgfilesize'fileSize (MB) ' from (
    select pp.totalSize/(1024*1024)/numFiles avgfilesize,
    TBL NAME,
    part.PART NAME
    from DBS d
    inner join TBLS t on d.DB ID=t.DB_ID
    inner join `PARTITIONS` part on t.TBL _ID=part.TBL_ID
    left join (
    select PART_ID,
    /*每个表存储的文件个数*/
    max(case PARAM_KEY when 'numFiles'
    then PARAM VALUE else O end) numFiles,
    /*文件存储的大小*/
    max(case PARAM_KEY when 'totalSize'
    then PARAM_VALUE else O end) totalSize
    /*TABLE_PARAMS 记录的表属性*/
    from PARTITION_PARAMS
    GROUP BY PART_ID
    )pp on part.PART_ID=pp.PART_ID
    where d.`NAME`='要监控的数据库名,
    and pp.numFiles is not null
    and pp.numFiles>0
    ) a where avgfilesize> hdfs的文件块大小*2

    监控大表不分区的表

    对于大数据量的表,如果不进行分区,意味着程序在读取

    相同的数据时需要遍历更多的文件块。下面是监控该示例的代码:

    /*监控大表不分区的表*/
    select t.TBL_NAME '表名',d.`NAME`'库名',
    totalSize/1024/1024/1024 '文件大小(GB)'
    from DBS d
    /*DBS的主键DB_ID*/
    inner join TBLS t on d.DB_ID=t.DB_ID
    inner join (
    select TBL ID,
    /*文件存储的大小*/
    max(case PARAM_KEY when 'totalSize'
    then PARAM_VALUE else 0 end) totalSize
    /*TABLE_PARAMS 记录的表属性*/
    from TABLEPARAMS
    GROUP BY TBL_ID
     )tp on t.TBL ID=tp.TBL ID
    left join(
    select distinct TBL ID from`PARTITIONS`
    ) part on t.TBL ID=part.TBL ID
    /*part.TBL_ID is null表示不存在分区*/
    where d.`NAME`='需要监控的库名'
    and part.TBL ID is nu11
    /*数据量大于30GB的表*/
    and totalSize/1024/1024/1024>30

    监控分区数据不均匀的表

    分区不均匀的数据,可能意味着自己的分区列设计存在问题,或者某个分区的数据写入业务有调整,导致数据急速上升或者下跌,这时我们需要做特别的关注。监控的示例如下:

    select TBL_NAME,
    max(totalSize),
    min(totalSize),
    avg(totalSize)
    from (
    select pp.totalSize,
    TBL NAME,
    part.PART_NAME
    from DBS d
    inner join TBLS t on d.DB_ID=t.DB_ID
    inner join 'PARTITIONS' part on t.TBL_ID=part.TBL_ID inner join (
    select PART_ID,
    /*文件存储的大小*/
    max(case PARAM_KEY when 'totalSize'
    then PARAM_VALUE else 0 end)/1024/1024 totalSize
    /*TABLE_PARAMS 记录的表属性*/
    from PARTITION_PARAMS
    GROUP BY PART_ID
    )pp on part.PART_ID=pp.PART_ID
    where d.`NAME`='default'
    and pp.totalSize is not null
    and pp.totalSize>0
    ) a group by TBL_NAME
    having max(totalSize)>avg(totalSize)*5

    并行执行与本地执行

    • 并行执行Hive中互相没有依赖关系的job间是可以并行执行的,最典型的就是多个子查询union all。在集群资源相对充足的情况下,可以开启并行执行,即将参数hive.exec.parallel设为true。另外hive.exec.parallel.thread.number可以设定并行执行的线程数,默认为8,一般都够用。
    • 本地模式Hive也可以不将任务提交到集群进行运算,而是直接在一台节点上处理。因为消除了提交到集群的overhead,所以比较适合数据量很小,且逻辑不复杂的任务。 设置hive.exec.mode.local.auto为true可以开启本地模式。但任务的输入数据总量必须小于hive.exec.mode.local.auto.inputbytes.max(默认值128MB),且mapper数必须小于hive.exec.mode.local.auto.tasks.max(默认值4),reducer数必须为0或1,才会真正用本地模式执行。

    严格模式

    严格模式就是强制不允许用户执行3种有风险的HiveSQL语句,一旦执行会直接失败。这3种语句是:

    • 查询分区表时不限定分区列的语句;
    • 两表join产生了笛卡尔积的语句;
    • 用order by来排序但没有指定limit的语句。

    要开启严格模式,需要将参数hive.mapred.mode设为strict。

    JVM容器重用

    在MR job中,默认是每执行一个task就启动一个JVM。如果task非常小而碎,那么JVM启动和关闭的耗时就会很长。可以通过调节参数mapred.job.reuse.jvm.num.tasks来重用。例如将这个参数设成5,那么就代表同一个MR job中顺序执行的5个task可以重复使用一个JVM,减少启动和关闭的开销。但它对不同MR job中的task无效。









    相关文章
    |
    SQL 分布式计算 监控
    Hive性能优化之计算Job执行优化 2
    Hive性能优化之计算Job执行优化
    254 1
    |
    SQL 存储 分布式计算
    Hive性能优化之表设计优化1
    Hive性能优化之表设计优化1
    92 1
    |
    9月前
    |
    SQL 存储 分布式计算
    Hive的性能优化有哪些方法?请举例说明。
    Hive的性能优化有哪些方法?请举例说明。
    199 0
    |
    9月前
    |
    SQL 存储 关系型数据库
    Presto【实践 01】Presto查询性能优化(数据存储+SQL优化+无缝替换Hive表+注意事项)及9个实践问题分享
    Presto【实践 01】Presto查询性能优化(数据存储+SQL优化+无缝替换Hive表+注意事项)及9个实践问题分享
    1027 0
    |
    SQL 存储 分布式计算
    Hive性能优化之表设计优化2
    Hive性能优化之表设计优化2
    110 1
    |
    SQL 分布式计算 资源调度
    Hive性能优化之计算Job执行优化 1
    Hive性能优化之计算Job执行优化
    192 0
    Hive性能优化之计算Job执行优化 1
    |
    SQL 存储 分布式计算
    Hive企业级性能优化
    Hive作为大数据平台举足轻重的框架,以其稳定性和简单易用性也成为当前构建企业级数据仓库时使用最多的框架之一。
    365 0
    Hive企业级性能优化
    |
    SQL 分布式计算 大数据
    分享一个 HIVE SQL 性能优化点-使用公共表表达式 CTE 替换临时表
    分享一个 HIVE SQL 性能优化点-使用公共表表达式 CTE 替换临时表
    |
    存储 SQL 分布式计算
    hive性能优化小结(1)
    hive性能优化小结(1)
    hive性能优化小结(1)
    |
    SQL 分布式计算 负载均衡
    Hive性能优化(全面)
    Hadoop的计算框架特性下的HIve有效的优化手段
    Hive性能优化(全面)

    热门文章

    最新文章