内容简要:
一、背景介绍
二、迁移方案
三、引擎差异
四、迁移结果
五、下一步计划
一、背景介绍
滴滴SQL任务调度架构
l 离线计算现状
1)存在Hive和Spark 2套计算引擎;
2)Hive是当前SQL任务主要引擎 - 占83%;
3)Hive平均执行时间20分钟以上;
4)稳定性较差。
l 为什么要启动这个项目
1)更快:Spark比Hive MR更快,平均提速30%以上;
2)更稳:HiveServer隔离性差,SparkSQL一个SQL对应一个App;
3)更强:人力聚焦,对引擎有更强的掌控力。
二、迁移方案
迁徙方案流程图
(一)SQL收集
SQL的收集与提取有两种方式,第一种是通过调度平台收集用户SQL,第二种通过HiveSever2收集用户SQL。第一种方式存在一个问题,调度平台的SQL可以认为是一种模板,并非真正跑在HiveSever2的SQL,因此不能直接执行,所以选择通过HiveSever2收集用户的SQL。
如上图所示,首先第一步通过对HiveHistory做轻微的改变,收集到每个Session中执行的所有Query与Command,并将这些内容保留到HiveSever2的一个本地文件中。
然后第二步,HiveSever2有一个外围脚本,将每天生成的History文件上传到HDFS。
第三步里有一个Parser程序,它主要有三个功能。
过滤:History中不仅包括 SQL,还有Command等其他内容,Parser会过滤无用内容,只保留用户在SQL文件中执行的Command和对应的Query。
去重:用户任务分为小时级别、天级别、周级别,如果用户任务为小时级别,每天重复24次,许多内容是重复的。因此Parser会对一天中执行多次的SQL进行去重。
合并:为了拓展后面迁移Hive任务,Parser会做一个Merge。在一个Shell脚本中,用户可能会启动多个Session来执行SQL,因此需要把任务中执行多次的SQL进行合并。
以上步骤完成后生成SQL文件,以用户的名称命名。
(二)SQL改写&回放
得到生成的SQL文件后,我们将文件进行分析与改写,如下图所示:
SQL改写流程图
分析与改写之后,将进行SQL回放并得到回放结果。回放结果包含状态,例如回放是否成功、运行时间、输出库表的名字等,回放结果如下图所示。
(三)结果对比
根据回放记录信息进行结果对比,主要分为数据对比和性能对比。
数据对比包含Memory、文件大小、文件个数、精度比较;
性能对比包含记录数、数据一致性、运行时间、CPU资源。
其中重点阐述数据一致性与精度比较。
1. 数据一致性
l select sum(hash(c1)), sum(hash(c2)),…from T
1)数据摘要对比, 简单快速;
2)摘要相同,数据不一定一致,反例如下图;
3)假设两个引擎数据差异不大;
4)GROUPING SETS。
2.精度比较
精度比较的核心目的在于比较两张表的不同列是否一致,具体实现过程如下。
结果对比结束后,将对任务状态做划分,可划分为9大类,如下图所示。
可迁移:数据一致,性能有提升,时间和资源有节省;
经验可迁移:可能存在数据不一致的情况,如果数据不一致的原因在经验范围内,也符合迁移状态;
数据不一致:数据不一致的原因可能是条数不一致,某一列不一致;
运行异常:在回放的过程中,任务执行失败;
语法不兼容:使用Spark引擎对SQL语法进行分析时,引擎报错则表明Spark引擎不兼容这种Hive语法。
Time_High、CPU_High、Files_High、Memory_High都是资源多的情况,例如Spark运营时间大于Hive,则认为Time_High。
上述9类状态中,只有“可迁移”与“经验可迁移”符合迁移状态。
(四)引擎切换
获得可迁移任务列表之后,则可执行以下操作进行迁移:
1)整理迁移任务列表和配置参数;
2)调用DataStudio接口修改任务类型为SparkSQL;
3)重跑任务当天批次。
三、引擎差异
引擎差异主要分为三大类:语法差异、UDF差异、性能&功能差异。
(一)语法差异
较为常见的语法差异主要有9种,如下图所示:
(二)UDF差异
UDF差异分为顺序差异,非顺序差异,环境差异。
l 顺序差异
1)collect_set
2)collect_list
3)row_number
4)map类型字段读写
5)sum(double/float)
前4种情况类似,以collect_set为例,如下图所示:
输入表共2列,第一列都是1,第二列是分别是1、2、3。对c1执行Group By之后,对c2执行collect_set,这种情况下两个引擎有可能出现不同结果。
如上图所示, collect_set得出的数据结果可能是231或312,造成数据不同的原因是数据的输入顺序,差异较为直观。
顺序差异中的第5种情况sum(double/float)与其他情况不太相同,举例如下:
单列表中有三个Double类型的值,执行结果可能有两种。
这也是做精度比较的原因,当出现这种问题的话,其实它的精度差异很小,结果是正确的,可以进行迁移。
这种情况出现的原因用Demo表示如下:
l 非顺序差异
1)Datediff
2)unix_timestamp
3)to_date
4)date_add/date_sub
如上所示,非顺序差异体现对在异常日期时间数据处理。
异常数据包括日期月份为0、时间为24点等,处理这种异常数据时,Spark直接返回NULL,而Hive会返回一个值。对于这种情况,由于目前的用户已经习惯Hive的处理方式,我们最终决定以Hive为准,用Hive内置的UDF替换Spark内置的UDF。
l 环境差异
Spark是多线程环境,Hive是单线程环境。、
如上图所示,Spark的一个Executor会同时执行多个Task,因此它是多线程的运营环境。
当遇到一个UDF共享静态变量的情况,由于Hive是单线程运营环境所以没有问题,但在Spark中可能存在线程安全问题,因此将Hive的UDF迁移至Spark时也需要特别注意。如果存在这种情况的话,可以改成私有或者加锁。
(三)性能&功能差异
主要存在以下4方面性能与功能的差异:
l 小文件合并
1)InsertIntoHiveTable中判断是否开启小文件合并;
2)文件的平均大小是否小于阈值则执行合并;
3)执行loadTable或者loadPartition操作。
l Spark SQL支持Cluster模式
1)执行SQL的客户端轻量;
2)不支持交互式。
l 分区剪裁优化
1)支持分区条件中包含concat/concat_ws/substr;
2)识别以上OP并转成Hive的ExprNodeGenericFuncDesc;
3)调用Hive#getPartitionsByExpr;
4)Support multiple types of function partition pruning on hive metastore.
l HiveorcWriter锁优化
BackPortHIVE-10191, 性能提升30%。
四、迁移结果
经过团队半年多努力,完成了一万多个SQL任务迁移,迁移前后总结对比如下:
由上表可知,将SQL任务由HiveSQL迁移至SparkSQL后,带来的收益十分可观。
五、下一步计划
l 迁移Shell类型任务
l Sparkremote shuffleservice
l 升级Spark3.x