Hive 优化总结

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
检索分析服务 Elasticsearch 版,2核4GB开发者规格 1个月
实时数仓Hologres,5000CU*H 100GB 3个月
简介: Hive优化主要涉及HDFS和MapReduce的使用。问题包括数据倾斜、操作过多和不当使用。识别倾斜可通过检查分区文件大小或执行聚合抽样。解决方案包括整体优化模型设计,如星型、雪花模型,合理分区和分桶,以及压缩。内存管理需调整mapred和yarn参数。倾斜数据处理通过选择均衡连接键、使用map join和combiner。控制Mapper和Reducer数量以避免小文件和资源浪费。减少数据规模可调整存储格式和压缩,动态或静态分区管理,以及优化CBO和执行引擎设置。其他策略包括JVM重用、本地化运算和LLAP缓存。

Hive优化

本质:HDFS + MapReduce

问题原因:

  1. 倾斜
    • 分区:有的分区没有数据,有的分区数据堆积。(若按天分区,每一天数据差别大就叫倾斜。)
    • group by:有的分组键在表中数据很多,有的分组键数据很少。
    • 小表 join 大表:小表数据小,大表数据多,造成倾斜。

如何识别倾斜?

    - 若表为分区分桶表,以分区字段作为聚合条件聚合,并进行抽样。
    - 若有HDFS的权限,查看分区文件夹的大小是否存在明显差异。
  1. 过多

    • join过多导致job过多。
    • 小文件过多。
    • Mapper或Reducer过多。
  2. 使用不当

    • count(distinct) ❌ => select FIELD_NAME FROM ... GROUP BY FIELD_NAME;
    • join ... on ... where 谓词下推(见下文)。
    • select sum(field) from TABLE; 不支持全表聚合。
      • 可能的解决方法:
          select sum(sub_total)
          from(
              select sum(order_amount) as sub_total
              from (
                  select sum(order_amount),user_id%3 as id
                  from TABLE_NAME
              )A group by id
          )A;
        

解决方案

数仓方案考虑整体性地解决问题

模型设计

  • 整体最优,考虑全局。
  • 合理减少表数量:

    • 数据建模:
      • 【星型】,雪花,星座。
      • 维度表(静态数据),事实表(动态数据:4W1H)。
      • 数仓需要将维度表"融进"事实表中,研究的是维度表中"变化"的数据。
      • 维度退化 => 星型。
    • sqoop|maxwell|cancal : query "select ... join ..."
    • ods -> dwd insert into ... select ... join ...
  • 充分了解业务,提前设计好预聚合。

    • 分层 => 轻量聚合(获取结果的时候可以不走MapReduce)。
    • 分区 => 避免交换。
      • 例如:如果表关联条件与分区依据一致,无需进行交换。
      • 如何选取分区字段?整体研究,选取最合适的共性字段。不一定要是商品ID,省市县三级分区等。分区尽量往业务上靠。
    • 分桶 => 拉链表(分桶表)、抽样。
      • "拉链表不一定是分区表,但一定要是分桶表":
        • 拉链表可以是非分区的,也就是说,不需要按照日期、地区等键值对历史数据进行分区。可以简单地存储历史数据。
        • 拉链表应该是分桶表,分桶有助于在执行数据合并、查找和分析操作时提高性能。
    • 压缩 => 减少体量(现在不太强调)。
      • 需考虑压缩和解压缩的成本是否大于时间消耗?
      • 压缩格式,表存储格式(是否支持压缩,是否支持切片)。

        hadoop 内存管理

  • mapred
    • set mapreduce.map.memory.mb=256; Map任务内存。
    • set mapreduce.reduce.memory.mb=512; Reduce任务内存。
    • set mapreduce.map.java.opts=? Map的JVM。
    • set mapreduce.reduce.java.opts=? Reduce的JVM。
  • yarn
    • set yarn.nodemanager.resource.memory-mb=-1; NodeManager的内存。
    • set yarn.scheduler.minimum-allocation-mb=1024; YARN调度器的最小分配内存。
    • set yarn.scheduler.maximum-allocation-mb=8192; YARN调度器的最大分配内存。

倾斜:热点数据

  • join: 非大小表。
    • 原因:连接字段在连接表之间分布不均,或缺乏连接关系(两表的连接字段分配不均)。
    • 手动处理:连接键的选择(优先选择在数据的分布上相对均衡为连接键——通过抽样找到)。
    • 连接键拆分或随机映射(与其选取不均衡的列作为连接键,不如构建随即映射列——select user_id%3 as id group by id)。
    • 引入hash分区或分桶使得数据分布均衡。
    • 增加或减少任务的并行度。
  • map join: 大小表。
    • 默认true,即默认自动开启 mapjoin。
    • set hive.auto.convert.join=true;
    • 默认小表<=25M。
    • set hive.mapjoin.smalltable.filesize=25M;
    • 默认false,分桶表表mapjoin专用。
    • set hive.optimize.bucketmapjoin=true;
  • combiner: 默认true,即默认开启Mapper端聚合。
    • set hive.map.aggr=true;
  • groupby:HashPartitioner。
    • 默认-1,倾斜的倍数(倾斜度) n = 倾斜数据总均量/其他数据总均量 + (其他数据的差异数)。
    • 确定是否倾斜与倾斜程度:抽样(tablesample(bucket COUNT outof TOTAL))。
    • set mapreduce.job.reduces=n; (见下面 Reducer 数量控制)。
    • 默认false。
    • set hive.groupby.skewindata=true;

Mapper或Reduce输出过多小文件合并

若满足以下设置条件,任务结束后会单起MapReduce对输出文件进行合并。

  • 默认为true,map-only输出是否合并。
    • set hive.merge.mapfiles=true;
  • 默认为false,mapreduce输出是否合并。
    • set hive.merge.mapredfiles=true;
  • 默认256M,合并文件操作阈值,如果输入数据超过256M,则触发合并操作。
    • set hive.merge.size.per.task=256M;
  • 默认16M,合并文件平均大小小于该阈值则将他们合并为大文件。
    • set hive.merge.smallfiles.avgsize=16M;

控制Mapper和Reducer数量

Mapper

mapper的启动和初始化开销较大,数量过多导致开销大于逻辑处理,浪费资源。

  • 默认的Mapper数量:
    • int default_num = total_file_size/dfs.block.size;
  • 默认为2, 只有大于2时才会生效。
    • set mapreduce.job.maps=2;
  • Mapper数量有限值:
    • Math.max(min.split.size,Math.min(dfs.block.size,max.split.size))
    • 通过调整以下三项配置来调整Mapper数量。
      • 默认128M。
      • set dfs.block.size=128M;
      • 默认单个Mapper处理数据上限256M。
      • set mapred.max.split.size=256M;
      • 默认1字节。
      • set mapred.min.split.size=1;
  • 默认单个节点处理的数据下限1字节。
    • set mapred.min.split.size.per.node=1;
  • 默认单个机架处理的数据下限
    • set mapred.min.split.size.per.rack=1;
  • Mapper输入多个小文件合并后再切片
    • set hive.input.format=org.apache.hadoop.hive.ql.io.CombineHiveInputFormat;
  • Mapper切片的大小【越接近128M越好】?
    • set mapred.min.split.size = N;
    • 若文件过大,切片大小尽量调大。
    • 需要综合考虑Yarn的内存权限和分布式计算的均衡
  • 若表A单行内容量大,且处理逻辑复杂,需要将文件拆分(列裁剪,行筛选)
  • 将数据通过分区表拆分成更小粒度

  • 将数据随机且均匀地分散到不同的Reducer,这有助于均衡负载。

    set mapreduce.job.reduces=3;
    create table A_SPLITS as
    select * from A distribute by rand(3);
    
Reducer
  • 默认-1,可以根据需要在客户端设置 :
    • int n = Math.min(SIZE/bytes.per.reducer, reducers.max) | num_partitions
    • set mapreduce.job.reduces=n;
  • 若存在数据倾斜,则 Hive 会单独分配Reducer处理倾斜数据
  • 若未设置 Reducer 数量,自动计算 Reducer 数量
  • 默认每个Reducer的数据为256M
    • set hive.exec.reducers.bytes.per.reducer=256M;
  • 默认单个任务最大Reducer数量(<1024台)
    • set hive.exec.reducers.max=1009;
  • Reducer只能为1的情况:
    • 没有使用 GROUP BY 子句来对数据进行分组,并且只是在原始数据上直接使用了聚合函数(如sum、count、max、min、avg、collect_list、concat_ws等)
      • 优化方案
      • select sum(sum_a) from (select sum(a) from A group by STH)T
    • 使用了order by
      • 优化方案
      • select * from (select * from A distribute by a sort by a) order by a;
  • 存在笛卡尔积,尽量不用
    • 若出现小表+大表的笛卡尔积:小表扩展join key,并根据需求复制 DN_COUNT 份,大表扩展join key,根据 DN_COUNT 随机生成
      • 关闭自动mapjoin : set hive.auto.convert.join=false
      • 设置reducer的数量为:set mapreduce.job.reduces=DN_COUNT
        image.png

        减少数据规模

        调整存储格式
  • 设置建表格式
    • set hive.default.fileformat=orc|textfile|rcfile|sequencefile;
  • 压缩:为了减少Shuffle在局域网内数据交换产生的时间。
    • Mapper压缩
      • 开启Mapper输出压缩功能,默认false
        • set mapreduce.map.output.compress=true;
      • 设置Map输出数据的压缩方式:默认DefaultCodec
        • set mapreduce.map.output.compress.codec = org.apache.hadoop.io.compress.SnappyCodec;
          • 设置任务过程输出是否压缩
          • set hive.exec.compress.intermediate=true;
          • Reducer压缩
          • 开启Reducer输出压缩功能;默认false
            • set hive.exec.compress.output=true;
          • reduce最终输出数据压缩;默认false
          • set mapreduce.output.fileoutputformat.compress=true;
          • reduce最终数据输出压缩为块压缩;默认RECORD
          • set mapreduce.output.fileoutputformat.compress.type=BLOCK;
          • reduce最终数据输出压缩方式;默认DefaultCodec
          • set mapreduce.output.fileoutputformat.compress.codec = org.apache.hadoop.io.compress.SnappyCodec;
分区
  • 动态分区
    • 默认开启
      • set hive.exec.dynamic.partition=true;
    • 默认strict
      • set hive.exec.dynamic.partition.mode=nonstrict;
    • 默认最大动态分区数1000
      • set hive.exec.max.dynamic.partitions=1000;
    • 默认单节点最大动态分区数100
      • set hive.exec.max.dynamic.partitions.pernode=100;
    • 动态添加多分区数据(需要一张源数据表)
      • insert into table TABLE_PARTITION partition(partition_field) select *, partition_field from TABLE_SOURCE where ...;
  • 静态分区
    • 静态分区数据挂载
      • load data [local] inpath 'DATA_PATH' [overwrite|into] table TABLE_PARTITION partition(partition_field=VALUE);
    • 查看分区
      • show partitions TABLE_PARTITION;
    • 添加分区
      • alter table TABLE_PARTITION add partition(partition_field=VALUE);
    • 删除分区
      • alter table TABLE_PARTITION drop partition(partition_field=VALUE);

        其他配置

  • count(distinct)
    • 不妥:select count(distict b) from TAB group by a
    • 稳妥:select count(b) from (select a,b from TAB group by a,b) group by a
  • CBO (COST BASED OPTIMIZER)
    • 默认为true
      • set hive.cbo.enable=true;
  • 分区裁剪
    • 以 on,where 多条件字段顺序,建【多重】分区表
  • 设置执行引擎,默认mr, tez|spark(优先)|DAG
    • set hive.execution.engine=tez;
  • 并行执行无依赖job
    • 默认false
      • set hive.exec.parallel=true;
    • 设置最大并行任务数,默认为8
      • set hive.exec.parallel.thread.number=8;
        image.png- JVM重用(Hive3已取消,作了解即可)
    • 每个JVM运行的任务数
      • set mapreduce.job.jvm.numtasks = 8;
  • 本地化运算

    • 默认1,启动本地化模式reducer数量必须为0|1
      • set mapreduce.job.reduces=0/1;
    • 默认 yarn ,需设置为本地模式
      • set mapreduce.framework.name=local;
    • 开启自动本地化模式
      • set hive.exec.mode.local.auto=true;
    • 设置本地化文件数量上限,默认4
      • set hive.exec.mode.local.auto.input.files.max=4;
    • 默认128M,本地化文件大小上限
      • set hive.exec.mode.local.auto.inputbytes.max=128M;
    • 可能会导致内存溢出:java.lang.OutofMemoryError : java heap space
      • 修改 mv hive-env.sh.template hive-env.sh,去掉注释# export HADOOP_HEAPSIZE=1024
  • llap

    • 设置执行模式,默认container,2.0后扩展了llap
      • set hive.execution.mode=llap;
    • llap为DataNode常驻进程,混合模型,小型任务可以由llap解决,大任务由yarn容器执行
  • fetch
    • 默认mode,简单查询不走mr,直接提取
      • set hive.fetch.task.conversion=more;
  • 谓词下推(下推即优化的意思):确定主从表条件应该放在on后还是where后
    • 开启
      • set hive.optimize.ppd=true;
    • 规则
      • 左右外连接
        • 主表:on不可下推,where可下推
        • 从表:on可下推,where不可下推
      • 内连接
        • on和where都下推
      • 全外连接
        • on和where都不下推
目录
相关文章
|
7月前
|
SQL 存储 分布式计算
Hive数据仓库设计与优化策略:面试经验与必备知识点解析
本文深入探讨了Hive数据仓库设计原则(分区、分桶、存储格式选择)与优化策略(SQL优化、内置优化器、统计信息、配置参数调整),并分享了面试经验及常见问题,如Hive与RDBMS的区别、实际项目应用和与其他组件的集成。通过代码样例,帮助读者掌握Hive核心技术,为面试做好充分准备。
651 0
|
SQL 分布式计算 监控
Hive性能优化之计算Job执行优化 2
Hive性能优化之计算Job执行优化
234 1
|
SQL 存储 分布式计算
Hive性能优化之表设计优化1
Hive性能优化之表设计优化1
84 1
|
6月前
|
SQL 资源调度 数据库连接
Hive怎么调整优化Tez引擎的查询?在Tez上优化Hive查询的指南
在Tez上优化Hive查询,包括配置参数调整、理解并行化机制以及容器管理。关键步骤包括YARN调度器配置、安全阀设置、识别性能瓶颈(如mapper/reducer任务和连接操作),理解Tez如何动态调整mapper和reducer数量。例如,`tez.grouping.max-size` 影响mapper数量,`hive.exec.reducers.bytes.per.reducer` 控制reducer数量。调整并发和容器复用参数如`hive.server2.tez.sessions.per.default.queue` 和 `tez.am.container.reuse.enabled`
539 0
|
7月前
|
SQL 存储 大数据
Hive的查询、数据加载和交换、聚合、排序、优化
Hive的查询、数据加载和交换、聚合、排序、优化
153 2
|
7月前
|
SQL 存储 分布式计算
【Hive】Hive优化有哪些?
【4月更文挑战第16天】【Hive】Hive优化有哪些?
|
7月前
|
SQL 分布式计算 资源调度
一文看懂 Hive 优化大全(参数配置、语法优化)
以下是对提供的内容的摘要,总长度为240个字符: 在Hadoop集群中,服务器环境包括3台机器,分别运行不同的服务,如NodeManager、DataNode、NameNode等。集群组件版本包括jdk 1.8、mysql 5.7、hadoop 3.1.3和hive 3.1.2。文章讨论了YARN的配置优化,如`yarn.nodemanager.resource.memory-mb`、`yarn.nodemanager.vmem-check-enabled`和`hive.map.aggr`等参数,以及Map-Side聚合优化、Map Join和Bucket Map Join。
387 0
|
7月前
|
SQL 分布式计算 Hadoop
Hive SQL 优化
Hive SQL 优化
101 1
|
7月前
|
SQL 存储 关系型数据库
Presto【实践 01】Presto查询性能优化(数据存储+SQL优化+无缝替换Hive表+注意事项)及9个实践问题分享
Presto【实践 01】Presto查询性能优化(数据存储+SQL优化+无缝替换Hive表+注意事项)及9个实践问题分享
848 0
|
SQL 存储 分布式计算
Hive性能优化之表设计优化2
Hive性能优化之表设计优化2
96 1
下一篇
DataWorks