12.1.2 MapReduce资源配置
MapReduce资源配置主要包括Map Task的内存和CPU核数,以及Reduce Task的内存和CPU核数。核心配置参数如下:
1)mapreduce.map.memory.mb
该参数的含义是,单个Map Task申请的container容器内存大小,其默认值为1024。该值不能超出yarn.scheduler.maximum-allocation-mb和yarn.scheduler.minimum-allocation-mb规定的范围。
该参数需要根据不同的计算任务单独进行配置,在hive中,可直接使用如下方式为每个SQL语句单独进行配置:
set mapreduce.map.memory.mb=2048;
2)mapreduce.map.cpu.vcores
该参数的含义是,单个Map Task申请的container容器cpu核数,其默认值为1。该值一般无需调整。
3)mapreduce.reduce.memory.mb
该参数的含义是,单个Reduce Task申请的container容器内存大小,其默认值为1024。该值同样不能超出yarn.scheduler.maximum-allocation-mb和yarn.scheduler.minimum-allocation-mb规定的范围。
该参数需要根据不同的计算任务单独进行配置,在hive中,可直接使用如下方式为每个SQL语句单独进行配置:
set mapreduce.reduce.memory.mb=2048;
4)mapreduce.reduce.cpu.vcores
该参数的含义是,单个Reduce Task申请的container容器cpu核数,其默认值为1。该值一般无需调整。
12.2 测试用表
12.2.1 订单表(2000w条数据)
1)表结构
2)建表语句
hive (default)>
drop table if exists order_detail;
create table order_detail(
id string comment '订单id',
user_id string comment '用户id',
product_id string comment '商品id',
province_id string comment '省份id',
create_time string comment '下单时间',
product_num int comment '商品件数',
total_amount decimal(16, 2) comment '下单金额'
)
partitioned by (dt string)
row format delimited fields terminated by '\t';
3)数据装载
将order_detail.txt文件上传到hadoop102节点的/opt/module/hive/datas/目录,并执行以下导入语句。
注:文件较大,请耐心等待。
hive (default)>
load data local inpath '/opt/module/hive/datas/order_detail.txt' overwrite into table order_detail partition(dt='2020-06-14');
12.2.2 支付表(600w条数据)
1)表结构
2)建表语句
hive (default)>
drop table if exists payment_detail;
create table payment_detail(
id string comment '支付id',
order_detail_id string comment '订单明细id',
user_id string comment '用户id',
payment_time string comment '支付时间',
total_amount decimal(16, 2) comment '支付金额'
)
partitioned by (dt string)
row format delimited fields terminated by '\t';
3)数据装载
将payment_detail.txt文件上传到hadoop102节点的/opt/module/hive/datas/目录,并执行以下导入语句。
注:文件较大,请耐心等待。
hive (default)>
load data local inpath '/opt/module/hive/datas/payment_detail.txt' overwrite into table payment_detail partition(dt='2020-06-14');
12.2.3 商品信息表(100w条数据)
1)表结构
id (商品id) |
product_name (商品名称) |
price (价格) |
category_id (分类id) |
1000001 |
CuisW |
4517.00 |
219 |
1000002 |
TBtbp |
9357.00 |
208 |
2)建表语句
hive (default)>
drop table if exists product_info;
create table product_info(
id string comment '商品id',
product_name string comment '商品名称',
price decimal(16, 2) comment '价格',
category_id string comment '分类id'
)
row format delimited fields terminated by '\t';
3)数据装载
将product_info.txt文件上传到hadoop102节点的/opt/module/hive/datas/目录,并执行以下导入语句。
hive (default)>
load data local inpath '/opt/module/hive/datas/product_info.txt' overwrite into table product_info;
12.2.4 省份信息表(34条数据)
1)表结构
id (省份id) |
province_name (省份名称) |
1 |
北京 |
2 |
天津 |
2)建表语句
hive (default)>
drop table if exists province_info;
create table province_info(
id string comment '省份id',
province_name string comment '省份名称'
)
row format delimited fields terminated by '\t';
3)数据装载
将province_info.txt文件上传到hadoop102节点的/opt/module/hive/datas/目录,并执行以下导入语句。
hive (default)>
load data local inpath '/opt/module/hive/datas/province_info.txt' overwrite into table province_info;
12.3 Explain查看执行计划(重点)
12.3.1 Explain执行计划概述
Explain呈现的执行计划,由一系列Stage组成,这一系列Stage具有依赖关系,每个Stage对应一个MapReduce Job,或者一个文件系统操作等。
若某个Stage对应的一个MapReduce Job,其Map端和Reduce端的计算逻辑分别由Map Operator Tree和Reduce Operator Tree进行描述,Operator Tree由一系列的Operator组成,一个Operator代表在Map或Reduce阶段的一个单一的逻辑操作,例如TableScan Operator,Select Operator,Join Operator等。
下图是由一个执行计划绘制而成:
常见的Operator及其作用如下:
TableScan:表扫描操作,通常map端第一个操作肯定是表扫描操作
Select Operator:选取操作
Group By Operator:分组聚合操作
Reduce Output Operator:输出到 reduce 操作
Filter Operator:过滤操作
Join Operator:join 操作
File Output Operator:文件输出操作
Fetch Operator :客户端获取数据操作
这里解释一下Fetch Operator,假如在hive里执行了一个select操作,那么会返回一个查询结果,然后这个查询结果其实就是Fetch Operator 从hdfs里的一个临时文件里读出来的(执行select操作后会把hql语句转成MapReduce job,然后MapReduce会去hdfs里查询数据,然后把查询到的数据放到hdfs的一个临时文件里,最后由Fetch Operator 去这个临时文件里去获取数据展示出来)
12.3.2 基本语法
EXPLAIN [FORMATTED | EXTENDED | DEPENDENCY] query-sql
注:FORMATTED、EXTENDED、DEPENDENCY关键字为可选项,各自作用如下。
FORMATTED:将执行计划以JSON字符串的形式输出
EXTENDED:输出执行计划中的额外信息,通常是读写的文件名等信息
DEPENDENCY:输出执行计划读取的表及分区
12.3.3 案例实操
1)查看下面这条语句的执行计划
hive (default)>
explain
select
user_id,
count(*)
from order_detail
group by user_id;
2)执行计划如下图
执行计划案例
explain select age,count(*) from test_user where age>20 group by age;
# 描述了stage之间的依赖关系 STAGE DEPENDENCIES: Stage-1 is a root stage Stage-0 depends on stages: Stage-1 //stage0是依赖于stage1的 # stage详细信息 STAGE PLANS: Stage: Stage-1 Map Reduce # MAP端的执行计划树 Map Operator Tree: # 对alias: test_user进行表扫描 TableScan alias: test_user # 当前阶段的统计信息,有行数、数据量 Statistics: Num rows: 4 Data size: 1105 Basic stats: COMPLETE Column stats: NONE # 对TableScan的表进行过滤 Filter Operator # 过滤条件,过滤谓词 # 有些sql隐藏的过滤条件在这里都可以看到,如join是否对null的过滤 predicate: (age > 20) (type: boolean) # 过滤后的信息 Statistics: Num rows: 1 Data size: 276 Basic stats: COMPLETE Column stats: NONE # 这里开启了map端聚合,所以会在map端先聚合 # 查看配置sql如下 # set hive.map.aggr; # 对结果进行groupby Group By Operator # 聚合方式 aggregations: count() # 分组key,分组字段 keys: age (type: int) # 聚合模式 # hash:随机聚合,hash partition; # partial:局部聚合; # final:最终聚合 mode: hash outputColumnNames: _col0, _col1 # 聚合后的信息 Statistics: Num rows: 1 Data size: 276 Basic stats: COMPLETE Column stats: NONE # map端的输出操作,也是一个operator Reduce Output Operator # map阶段输出的kv数据的key key expressions: _col0 (type: int) # 升序排序 sort order: + # map阶段输出时,会进行分区字段 Map-reduce partition columns: _col0 (type: int) # 输出的信息 Statistics: Num rows: 1 Data size: 276 Basic stats: COMPLETE Column stats: NONE # map阶段输出的kv数据的value value expressions: _col1 (type: bigint) # Reduce端的执行计划树 Reduce Operator Tree: # groupby操作 Group By Operator # 聚合方式 aggregations: count(VALUE._col0) # 分组字段 keys: KEY._col0 (type: int) # 聚合方式:局部合并 # complete:表示都在reduce端聚合 mode: mergepartial outputColumnNames: _col0, _col1 Statistics: Num rows: 1 Data size: 276 Basic stats: COMPLETE Column stats: NONE File Output Operator # 输出是否压缩 compressed: false Statistics: Num rows: 1 Data size: 276 Basic stats: COMPLETE Column stats: NONE # 当前操作表的信息 table: input format: org.apache.hadoop.mapred.SequenceFileInputFormat output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe Stage: Stage-0 Fetch Operator # -1为不限制条数 limit: -1 Processor Tree: ListSink
案例地址:https://blog.csdn.net/xiao_yi_xiao/article/details/126744076
12.4 HQL语法优化之分组聚合优化
12.4.1 优化说明
Hive中未经优化的分组聚合,是通过一个MapReduce Job实现的。Map端负责读取数据,并按照分组字段分区,通过Shuffle,将数据发往Reduce端,各组数据在Reduce端完成最终的聚合运算。
Hive对分组聚合的优化主要围绕着减少Shuffle数据量进行,具体做法是map-side聚合。所谓map-side聚合,就是在map端维护一个hash table,利用其完成部分的聚合,然后将部分聚合的结果,按照分组字段分区,发送至reduce端,完成最终的聚合。map-side聚合能有效减少shuffle的数据量,提高分组聚合运算的效率。
如果没有开map-side聚合,那么在map阶段不进行聚合,最后都在reduce阶段进行总聚合,开了map-side聚合的话,那就是会在map阶段先局部聚合,聚合后那么map的数据就会减少,那么shuffle传输给reduce的数据量也会减少,reduce阶段总聚合的数据量也会减少,从而提高效率
map-side 聚合相关的参数如下:
--启用map-side聚合 set hive.map.aggr=true; --用于检测源表数据是否适合进行map-side聚合。检测的方法是:先对若干条数据进行map-side聚合,若聚合后的条数和聚合前的条数比值小于该值,则认为该表适合进行map-side聚合;否则,认为该表数据不适合进行map-side聚合,后续数据便不再进行map-side聚合。 set hive.map.aggr.hash.min.reduction=0.5; --用于检测源表是否适合map-side聚合的条数。也就是上面检查方法中会对若干条数据中的若干条数据的设置 set hive.groupby.mapaggr.checkinterval=100000; --map-side聚合所用的hash table,占用map task堆内存的最大比例,若超出该值,则会对hash table进行一次flush。 set hive.map.aggr.hash.force.flush.memory.threshold=0.9;
并不是所有的情况都适合用map-side聚合,例如上面这样的情况,最后聚合后和没聚合之前的数据是一样多的,或者跟没聚合之前差不多,这就不需要做map-side聚合,没啥意义,聚合的一样就是为了减少shuffle的数据量
12.4.2 优化案例
1)示例SQL
hive (default)>
select
product_id,
count(*)
from order_detail
group by product_id;
2)优化前
未经优化的分组聚合,执行计划如下图所示:
3)优化思路
可以考虑开启map-side聚合,配置以下参数:
--启用map-side聚合,默认是true set hive.map.aggr=true; --用于检测源表数据是否适合进行map-side聚合。检测的方法是:先对若干条数据进行map-side聚合,若聚合后的条数和聚合前的条数比值小于该值,则认为该表适合进行map-side聚合;否则,认为该表数据不适合进行map-side聚合,后续数据便不再进行map-side聚合。 set hive.map.aggr.hash.min.reduction=0.5; --用于检测源表是否适合map-side聚合的条数。 set hive.groupby.mapaggr.checkinterval=100000; --map-side聚合所用的hash table,占用map task堆内存的最大比例,若超出该值,则会对hash table进行一次flush。 set hive.map.aggr.hash.force.flush.memory.threshold=0.9;
优化后的执行计划如图所示:
对比这两个图就可以发现Map Operator Tree多了一个Group By Operator,也就是在map阶段进行了map-side的局部聚合,然后再reduce阶段进行总聚合
12.5 HQL语法优化之Join优化
12.5.1 Join算法概述
Hive拥有多种join算法,包括Common Join,Map Join,Bucket Map Join,Sort Merge Buckt Map Join等,下面对每种join算法做简要说明:
1)Common Join
Common Join是Hive中最稳定的join算法(默认),其通过一个MapReduce Job完成一个join操作。Map端负责读取join操作所需表的数据,并按照关联字段进行分区,通过Shuffle,将其发送到Reduce端,相同key的数据在Reduce端完成最终的Join操作。如下图所示:
需要注意的是,sql语句中的join操作和执行计划中的Common Join任务并非一对一的关系,一个sql语句中的相邻的且关联字段相同的多个join操作可以合并为一个Common Join任务。
例如:
hive (default)> select a.val, b.val, c.val from a join b on (a.key = b.key1) join c on (c.key = b.key1)
上述sql语句中两个join操作的关联字段均为b表的key1字段,则该语句中的两个join操作可由一个Common Join任务实现,也就是可通过一个Map Reduce任务实现。
hive (default)> select a.val, b.val, c.val from a join b on (a.key = b.key1) join c on (c.key = b.key2)
上述sql语句中的两个join操作关联字段各不相同,则该语句的两个join操作需要各自通过一个Common Join任务实现,也就是通过两个Map Reduce任务实现。
2)Map Join
Map Join算法可以通过两个只有map阶段的Job完成一个join操作。其适用场景为大表join小表。若某join操作满足要求,则第一个Job会读取小表数据,将其制作为hash table,并上传至Hadoop分布式缓存(本质上是上传至HDFS)。第二个Job会先从分布式缓存中读取小表数据,并缓存在Map Task的内存中,然后扫描大表数据,这样在map端即可完成关联操作。如下图所示:
3)Bucket Map Join
Bucket Map Join是对Map Join算法的改进,其打破了Map Join只适用于大表join小表的限制,可用于大表join大表的场景。
Bucket Map Join的核心思想是:若能保证参与join的表均为分桶表,且关联字段为分桶字段,且其中一张表的分桶数量是另外一张表分桶数量的整数倍,就能保证参与join的两张表的分桶之间具有明确的关联关系,所以就可以在两表的分桶间进行Map Join操作了。这样一来,第二个Job的Map端就无需再缓存小表的全表数据了,而只需缓存其所需的分桶即可。其原理如图所示:
Bucket Map join和Map join差不多,思路基本是一样的,区别就在于Bucket Map join是一个job先把小表的每个桶制作成Hash Table放到hdfs中,然后其他job去hdfs中去一个个的把桶放到自己的内存中,然后在和自己的桶进行join操作,注意不是一次性获取所有的桶,而是一个桶一个桶的来。而Map join 是把整个表制作成一个Hash Table放到hdfs中,Hash Table本质就是分而治之,把大表分为多个桶
4)Sort Merge Bucket Map Join
Sort Merge Bucket Map Join(简称SMB Map Join)基于Bucket Map Join。SMB Map Join要求,参与join的表均为分桶表,且需保证分桶内的数据是有序的,且分桶字段、排序字段和关联字段为相同字段,且其中一张表的分桶数量是另外一张表分桶数量的整数倍。
SMB Map Join同Bucket Join一样,同样是利用两表各分桶之间的关联关系,在分桶之间进行join操作,不同的是,分桶之间的join操作的实现原理。Bucket Map Join,两个分桶之间的join实现原理为Hash Join算法;而SMB Map Join,两个分桶之间的join实现原理为Sort Merge Join算法。
Hash Join和Sort Merge Join均为关系型数据库中常见的Join实现算法。Hash Join的原理相对简单,就是对参与join的一张表构建hash table,然后扫描另外一张表,然后进行逐行匹配。Sort Merge Join需要在两张按照关联字段排好序的表中进行,其原理如图所示:
Hive中的SMB Map Join就是对两个分桶的数据按照上述思路进行Join操作。可以看出,SMB Map Join与Bucket Map Join相比,在进行Join操作时,Map端是无需对整个Bucket构建hash table,也无需在Map端缓存整个Bucket数据的,每个Mapper只需按顺序逐个key读取两个分桶的数据进行join即可。
12.5.2 Map Join
12.5.2.1 优化说明
Map Join有两种触发方式,一种是用户在SQL语句中增加hint提示,另外一种是Hive优化器根据参与join表的数据量大小,自动触发。
1)Hint提示
用户可通过如下方式,指定通过map join算法,并且ta将作为map join中的小表。这种方式已经过时,不推荐使用。
hive (default)>
select /*+ mapjoin(ta) */
ta.id,
tb.id
from table_a ta
join table_b tb
on ta.id=tb.id;
2)自动触发
Hive在编译SQL语句阶段,起初所有的join操作均采用Common Join算法实现。
之后在物理优化阶段,Hive会根据每个Common Join任务所需表的大小判断该Common Join任务是否能够转换为Map Join任务,若满足要求,便将Common Join任务自动转换为Map Join任务。
但有些Common Join任务所需的表大小,在SQL的编译阶段是未知的(例如对子查询进行join操作),所以这种Common Join任务是否能转换成Map Join任务在编译阶是无法确定的。
针对这种情况,Hive会在编译阶段生成一个条件任务(Conditional Task),其下会包含一个计划列表,计划列表中包含转换后的Map Join任务以及原有的Common Join任务。最终具体采用哪个计划,是在运行时决定的。大致思路如下图所示:
Map join自动转换的具体判断逻辑如下图所示:
图中涉及到的参数如下:
--启动Map Join自动转换 set hive.auto.convert.join=true; --一个Common Join operator转为Map Join operator的判断条件,若该Common Join相关的表中,存在n-1张表的已知大小总和<=该值,则生成一个Map Join计划,此时可能存在多种n-1张表的组合均满足该条件,则hive会为每种满足条件的组合均生成一个Map Join计划,同时还会保留原有的Common Join计划作为后备(back up)计划,实际运行时,优先执行Map Join计划,若不能执行成功,则启动Common Join后备计划。 set hive.mapjoin.smalltable.filesize=250000; --开启无条件转Map Join set hive.auto.convert.join.noconditionaltask=true; --无条件转Map Join时的小表之和阈值,若一个Common Join operator相关的表中,存在n-1张表的大小总和<=该值,此时hive便不会再为每种n-1张表的组合均生成Map Join计划,同时也不会保留Common Join作为后备计划。而是只生成一个最优的Map Join计划。 set hive.auto.convert.join.noconditionaltask.size=10000000;
12.5.2.2 优化案例
1)示例SQL
hive (default)>
select
*
from order_detail od
join product_info product on od.product_id = product.id
join province_info province on od.province_id = province.id;
2)优化前
上述SQL语句共有三张表进行两次join操作,且两次join操作的关联字段不同。故优化前的执行计划应该包含两个Common Join operator,也就是由两个MapReduce任务实现。执行计划如下图所示:
3)优化思路
经分析,参与join的三张表,数据量如下
表名 |
大小 |
order_detail |
1176009934(约1122M) |
product_info |
25285707(约24M) |
province_info |
369(约0.36K) |
注:可使用如下语句获取表/分区的大小信息
hive (default)>
desc formatted table_name partition(partition_col='partition');
三张表中,product_info和province_info数据量较小,可考虑将其作为小表,进行Map Join优化。
根据前文Common Join任务转Map Join任务的判断逻辑图,可得出以下优化方案:
方案一:
启用Map Join自动转换。
hive (default)>
set hive.auto.convert.join=true;
不使用无条件转Map Join。
hive (default)>
set hive.auto.convert.join.noconditionaltask=false;
调整hive.mapjoin.smalltable.filesize参数,使其大于等于product_info。
hive (default)>
set hive.mapjoin.smalltable.filesize=25285707;
这样可保证将两个Common Join operator均可转为Map Join operator,并保留Common Join作为后备计划,保证计算任务的稳定。调整完的执行计划如下图: