3.14 Order By 优化
order by 只能是在一个 reduce 进程中进行,所以如果对一个大数据集进行 order by ,会导致一个reduce 进程中处理的数据相当大,造成查询执行缓慢。
1、在最终结果上进行order by,不要在中间的大数据集上进行排序。如果最终结果较少,可以在一个reduce上进行排序时,那么就在最后的结果集上进行order by。
2、如果是取排序后的前N条数据,可以使用distribute by和sort by在各个reduce上进行排序后前N条,然后再对各个reduce的结果集合合并后在一个reduce中全局排序,再取前N条,因为参与全局排序的order by的数据量最多是reduce个数 * N,所以执行效率会有很大提升。
在 Hive 中,关于数据排序,提供了四种语法,一定要区分这四种排序的使用方式和适用场景。
1、order by:全局排序,缺陷是只能使用一个reduce 2、sort by:单机排序,单个reduce结果有序 3、cluster by:对同一字段分桶并排序,不能和sort by 连用 4、distribute by + sort by:分桶,保证同一字段值只存在一个结果文件当中,结合 sort by 保证每个 reduceTask 结果有序
Hive HQL 中的 order by 与其他 SQL 方言中的功能一样,就是将结果按某字段全局排序,这会导致所有 map 端数据都进入一个 reducer 中,在数据量大时可能会长时间计算不完。
如果使用 sort by,那么还是会视情况启动多个 reducer 进行排序,并且保证每个 reducer 内局部有序。
为了控制map 端数据分配到 reducer 的 key,往往还要配合 distribute by 一同使用。
如果不加distribute by 的话,map 端数据就会随机分配到 reducer。
提供一种方式实现全局排序:两种方式:
1、建表导入数据准备
create table if not exists student(id int, name string, sex string, age int,department string) row format delimited fields terminated by ","; load data local inpath "/home/bigdata/students.txt" into table student;
2、第一种方式
-- 直接使用order by来做。如果结果数据量很大,这个任务的执行效率会非常低 select id,name,age from student order by age desc limit 3;
3、第二种方式
-- 使用distribute by + sort by 多个reduceTask,每个reduceTask分别有序 set mapreduce.job.reduces=3; drop table student_orderby_result; -- 范围分桶 0 < 18 < 1 < 20 < 2 create table student_orderby_result as select * from student distribute by (case when age > 20 then 0 when age < 18 then 2 else 1 end) sort by (age desc)
关于分界值的确定,使用采样的方式,来估计数据分布规律。
3.15 Count Distinct优化
当要统计某一列去重数时,如果数据量很大,count(distinct) 就会非常慢,原因与 order by 类似,count(distinct) 逻辑只会有很少的 reducer 来处理。
这时可以用 group by 来改写:
-- 先 group by 在 count select count(1) from ( select age from student where department >= "MA" group by age ) t;
再来一个例子:
优化前 ,一个普通的只使用一个reduceTask来进行count(distinct) 操作。
-- 优化前(只有一个reduce,先去重再count负担比较大): select count(distinct id) from tablename;
优化后 ,但是这样写会启动两个 MR job(单纯 distinct 只会启动一个),所以要确保数据量大到启动 job 的 overhead 远小于计算耗时,才考虑这种方法。
当数据集很小或者 key 的倾斜比较明显时,group by 还可能会比 distinct 慢。
-- 优化后(启动两个job,一个job负责子查询(可以有多个reduce),另一个job负责count(1)): select count(1) from (select distinct id from tablename) tmp; select count(1) from (select id from tablename group by id) tmp; // 推荐使用这种
select t.a, count(t.b) , sum(t.c) from t group by t.a; select t.a, count(distinct t.b, t.c) from t group by t.a;
3.16 怎样写 in/exists 语句
在Hive的早期版本中,in/exists语法是不被支持的,但是从 hive-0.8x 以后就开始支持这个语法。但是不推荐使用这个语法。虽然经过测验,Hive-2.3.6 也支持 in/exists 操作,但还是推荐使用 Hive 的一个高效替代方案:left semi join 。
比如说:
-- in / exists 实现 select a.id, a.name from a where a.id in (select b.id from b); select a.id, a.name from a where exists (select id from b where a.id = b.id);
可以使用 join 来改写:
select a.id, a.name from a join b on a.id = b.id;
应该转换成:
-- left semi join 实现 select a.id, a.name from a left semi join b on a.id = b.id;
3.17 使用 vectorization 技术
在计算类似 scan, filter, aggregation 的时候, vectorization 技术以设置批处理的增量大小为 1024 行单次来达到比单条记录单次获得更高的效率。
set hive.vectorized.execution.enabled=true ; set h/ive.vectorized.execution.reduce.enabled=true;
3.18 多重模式
如果你碰到一堆SQL,并且这一堆SQL的模式还一样。都是从同一个表进行扫描,做不同的逻辑。
有可优化的地方:如果有 n 条SQL,每个SQL执行都会扫描一次这张表
如果一个 HQL 底层要执行 10 个 Job,那么能优化成 8 个一般来说,肯定能有所提高,多重插入就是一个非常实用的技能。一次读取,多次插入,有些场景是从一张表读取数据后,要多次利用,这时可以使用 multi insert 语法。
from sale_detail insert overwrite table sale_detail_multi partition (sale_date='2019',region='china' ) select shop_name, customer_id, total_price where ..... insert overwrite table sale_detail_multi partition (sale_date='2020',region='china' ) select shop_name, customer_id, total_price where .....;
说明:multi insert 语法有一些限制。
1、一般情况下,单个SQL中最多可以写128路输出,超过128路,则报语法错误。 2、在一个multi insert中:对于分区表,同一个目标分区不允许出现多次。对于未分区表,该表不能出现多次。 3、对于同一张分区表的不同分区,不能同时有insert overwrite和insert into操作,否则报错返回。
Multi-Group by 是 Hive 的一个非常好的特性,它使得 Hive 中利用中间结果变得非常方便。例如
FROM (SELECT a.status, b.school, b.gender FROM status_updates a JOIN profiles b ON (a.userid = b.userid and a.ds='2019-03-20' )) subq1 INSERT OVERWRITE TABLE gender_summary PARTITION(ds='2019-03-20') SELECT subq1.gender, COUNT(1) GROUP BY subq1.gender INSERT OVERWRITE TABLE school_summary PARTITION(ds='2019-03-20') SELECT subq1.school, COUNT(1) GROUP BY subq1.school;
上述查询语句使用了 Multi-Group by 特性连续 group by 了 2 次数据,使用不同的 Multi-Group by。
这一特性可以减少一次 MapReduce 操作。
3.19 启动中间结果压缩
map 输出压缩
set mapreduce.map.output.compress=true; set mapreduce.map.output.compress.codec=org.apache.hadoop.io.compress.SnappyCodec;
中间数据压缩
中间数据压缩就是对 hive 查询的多个 Job 之间的数据进行压缩。最好是选择一个节省CPU耗时的压缩方式。
可以采用 snappy 压缩算法,该算法的压缩和解压效率都非常高。
set hive.exec.compress.intermediate=true; set hive.intermediate.compression.codec=org.apache.hadoop.io.compress.SnappyCodec; set hive.intermediate.compression.type=BLOCK;
结果数据压缩
最终的结果数据(Reducer输出数据)也是可以进行压缩的,可以选择一个压缩效果比较好的,可以减少数据的大小和数据的磁盘读写时间。
注:常用的 gzip,snappy 压缩算法是不支持并行处理的,如果数据源是 gzip/snappy压缩文件大文件,这样只会有有个 mapper 来处理这个文件,会严重影响查询效率。
所以如果结果数据需要作为其他查询任务的数据源,可以选择支持 splitable 的 LZO 算法,这样既能对结果文件进行压缩,还可以并行的处理,这样就可以大大的提高 job 执行的速度了。
set hive.exec.compress.output=true; set mapreduce.output.fileoutputformat.compress=true; set mapreduce.output.fileoutputformat.compress.codec=org.apache.hadoop.io.compress.LzoCodec; set mapreduce.output.fileoutputformat.compress.type=BLOCK;
四、Hive 架构层面
4.1 启用本地抓取
Hive 的某些 SQL 语句需要转换成 MapReduce 的操作,某些 SQL 语句就不需要转换成 MapReduce 操作,但是同学们需要注意,理论上来说,所有的 SQL 语句都需要转换成 MapReduce 操作,只不过Hive 在转换 SQL 语句的过程中会做部分优化,使某些简单的操作不再需要转换成 MapReduce,例如:
1、只是 select * 的时候
2、where 条件针对分区字段进行筛选过滤时
3、带有 limit 分支语句时
Hive 从 HDFS 中读取数据,有两种方式:启用MapReduce读取 和 直接抓取。
直接抓取数据比 MapReduce 方式读取数据要快的多,但是只有少数操作可以使用直接抓取方式。
可以通过 hive.fetch.task.conversion 参数来配置在什么情况下采用直接抓取方式:
minimal:只有 select * 、在分区字段上 where 过滤、有 limit 这三种场景下才启用直接抓取方式。
more:在 select、where 筛选、limit 时,都启用直接抓取方式。
查看 Hive 的抓取策略:
## 查看 set hive.fetch.task.conversion;
设置Hive的抓取策略:
## 默认more set hive.fetch.task.conversion=more;
请看 hive-default.xml 中关于这个参数的解释:
<property> <name>hive.fetch.task.conversion</name> <value>more</value> <description> Expects one of [none, minimal, more]. Some select queries can be converted to single FETCH task minimizing latency. Currently the query should be single sourced not having any subquery and should not have any aggregations or distincts (which incurs RS), lateral views and joins. 0. none : disable hive.fetch.task.conversion 1. minimal : SELECT STAR, FILTER on partition columns, LIMIT only 2. more : SELECT, FILTER, LIMIT only (support TABLESAMPLE and virtual columns) </description> </property> <property> <name>hive.fetch.task.conversion.threshold</name> <value>1073741824</value> <description> Input threshold for applying hive.fetch.task.conversion. If target table is native, input length is calculated by summation of file lengths. If it's not native, storage handler for the table can optionally implement org.apache.hadoop.hive.ql.metadata.InputEstimator interface. </description> </property>
4.2 本地执行优化
Hive 在集群上查询时,默认是在集群上多台机器上运行,需要多个机器进行协调运行,这种方式很好的解决了大数据量的查询问题。但是在 Hive 查询处理的数据量比较小的时候,其实没有必要启动分布式模式去执行,因为以分布式方式执行设计到跨网络传输、多节点协调等,并且消耗资源。对于小数据集,可以通过本地模式,在单台机器上处理所有任务,执行时间明显被缩短。
启动本地模式涉及到三个参数:
## 打开hive自动判断是否启动本地模式的开关 set hive.exec.mode.local.auto=true; ## map任务数最大值,不启用本地模式的task最大个数 set hive.exec.mode.local.auto.input.files.max=4; ## map输入文件最大大小,不启动本地模式的最大输入文件大小 set hive.exec.mode.local.auto.inputbytes.max=134217728;
4.3 JVM重用
Hive 语句最终会转换为一系列的 MapReduce 任务,每一个MapReduce 任务是由一系列的 MapTask和 ReduceTask 组成的,默认情况下,MapReduce 中一个 MapTask 或者 ReduceTask 就会启动一个JVM 进程,一个 Task 执行完毕后,JVM 进程就会退出。这样如果任务花费时间很短,又要多次启动JVM 的情况下,JVM 的启动时间会变成一个比较大的消耗,这时,可以通过重用 JVM 来解决。
set mapred.job.reuse.jvm.num.tasks=5;
JVM也是有缺点的,开启JVM重用会一直占用使用到的 task 的插槽,以便进行重用,直到任务完成后才会释放。
如果某个 不平衡的job 中有几个 reduce task 执行的时间要比其他的 reduce task 消耗的时间要多得多的话,那么保留的插槽就会一直空闲却无法被其他的 job 使用,直到所有的 task 都结束了才会释放。
根据经验,一般来说可以使用一个 cpu core 启动一个 JVM,假如服务器有 16 个 cpu core ,但是这个节点,可能会启动 32 个mapTask,完全可以考虑:启动一个JVM,执行两个Task 。
4.4 并行执行
有的查询语句,Hive 会将其转化为一个或多个阶段,包括:MapReduce 阶段、抽样阶段、合并阶段、limit 阶段等。默认情况下,一次只执行一个阶段。但是,如果某些阶段不是互相依赖,是可以并行执行的。多阶段并行是比较耗系统资源的。
一个 Hive SQL 语句可能会转为多个 MapReduce Job,每一个 job 就是一个 stage,这些 Job 顺序执行,这个在 cli 的运行日志中也可以看到。但是有时候这些任务之间并不是是相互依赖的,如果集群资源允许的话,可以让多个并不相互依赖 stage 并发执行,这样就节约了时间,提高了执行速度,但是如果集群资源匮乏时,启用并行化反倒是会导致各个 Job 相互抢占资源而导致整体执行性能的下降。
启用并行化:
## 可以开启并发执行。 set hive.exec.parallel=true; ## 同一个sql允许最大并行度,默认为8。 set hive.exec.parallel.thread.number=16;
4.5 推测执行
在分布式集群环境下,因为程序Bug(包括Hadoop本身的bug),负载不均衡或者资源分布不均等原因,会造成同一个作业的多个任务之间运行速度不一致,有些任务的运行速度可能明显慢于其他任务(比如一个作业的某个任务进度只有50%,而其他所有任务已经运行完毕),则这些任务会拖慢作业的整体执行进度。
为了避免这种情况发生,Hadoop采用了推测执行(Speculative Execution)机制,它根据一定的法则推测出“拖后腿”的任务,并为这样的任务启动一个备份任务,让该任务与原始任务同时处理同一份数据,并最终选用最先成功运行完成任务的计算结果作为最终结果。
# 启动mapper阶段的推测执行机制 set mapreduce.map.speculative=true; # 启动reducer阶段的推测执行机制 set mapreduce.reduce.speculative=true;
建议
如果用户对于运行时的偏差非常敏感的话,那么可以将这些功能关闭掉。如果用户因为输入数据量很大而需要执行长时间的MapTask或者ReduceTask的话,那么启动推测执行造成的浪费是非常巨大大。
设置开启推测执行参数:Hadoop 的 mapred-site.xml 文件中进行配置。
<property> <name>mapreduce.map.speculative</name> <value>true</value> <description>If true, then multiple instances of some map tasks may be executed in parallel.</description> </property> <property> <name>mapreduce.reduce.speculative</name> <value>true</value> <description>If true, then multiple instances of some reduce tasks may be executed in parallel.</description> </property>
Hive 本身也提供了配置项来控制 reduce-side 的推测执行。
<property> <name>hive.mapred.reduce.tasks.speculative.execution</name> <value>true</value> <description>Whether speculative execution for reducers should be turned on. </description> </property>
4.6 Hive严格模式
所谓严格模式,就是强制不允许用户执行有风险的 HiveQL 语句,一旦执行会直接失败。
但是Hive中为了提高SQL语句的执行效率,可以设置严格模式,充分利用Hive的某些特点。
## 设置Hive的严格模式 set hive.mapred.mode=strict; set hive.exec.dynamic.partition.mode=nostrict;
注意:当设置严格模式之后,会有如下限制:
1、对于分区表,必须添加where对于分区字段的条件过滤 select * from student_ptn where age > 25 2、order by语句必须包含limit输出限制 select * from student order by age limit 100; 3、限制执行笛卡尔积的查询 select a.*, b.* from a, b; 4、在hive的动态分区模式下,如果为严格模式,则必须需要一个分区列式静态分区
五、总结
1、资源不够时才需要调优
资源足够的时候,只需要调大一些资源用量
2、业务优先,运行效率靠后
首先实现业务,有多余精力再考虑调优
3、单个作业最优不如整体最优
全局最优
4、调优不能影响业务运行结果
业务正确性最重要(combiner: avg )
5、调优关注点
架构方面
业务方面
开发方面
资源方面