1. 前言
本文旨在帮助非专业数据研发但是有高频ODPS使用需求的同学们(如数分、算法、产品等)能够快速上手ODPS查询优化,实现高性能查数看数,避免日常工作中因SQL任务卡壳、失败等情况造成的工作产出delay甚至集群资源稳定性问题。
相信各位小伙伴们是不是经常感觉咱们的集群有些“力不从心”?任务排队时间变长,大SQL跑起来慢吞吞,甚至有时直接卡住… 这背后的“罪魁祸首”之一,其实就是我们提交上去的那些庞大、复杂且未经优化的SQL任务。集群资源就像大家共享一条单向高速公路,当个别“巨无霸卡车”(超大、低效SQL)开上去,不仅自己跑得慢,还会把整条路都堵得水泄不通,导致其他小伙伴的“小车”(简单、合理SQL)也寸步难行。结果就是:大家工作效率都变低,资源成本却在飙升!
作为数据开发同学,我们非常理解大家在探索数据、产出洞察时的需求,其实咱们的目标是一致的:又快又好地拿到数据结果,但与此同时如果大家能够掌握一些ODPS SQL性能优化的“小窍门”,就能带来巨大的多赢收益:
- 让你的任务跑得飞快: 优化后的SQL,执行时间可能从小时级缩短到几分钟。想想看,原本等结果时喝一杯咖啡的时间,现在报告都生成了,效率翻倍提升!
- 减少失败和返工次数: 避免全表扫描、数据倾斜等问题,能大大降低任务失败和需要重跑的风险,让你的分析流程更稳定可靠,巨幅提升工作效率。
- 节省宝贵的集群资源: 每条优化过的SQL,都像把“巨无霸卡车”换成了“方程式赛车”。省下的CPU、内存、网络带宽,能让整个集群跑得更顺畅,大家提交的任务排队更少,跑得更快。少花钱,多办事!
- 提升团队的整体体验: 资源宽裕了,环境稳定了,无论是数据分析还是数据开发同学,工作起来都更舒心、更高效。
因此,本文就是期望和大家一起解决这个问题! 我们不讲深奥难懂的理论,而是聚焦实用的、能立刻上手的优化方法:
- 原理简单懂: 用“驿站传信”的事例,轻松理解SQL在ODPS里是怎么变成分布式计算的,知道哪里容易“堵车”。
- 写前先自检: 学会几个关键检查步骤,在提交SQL前就能避免常见性能“雷区”。
- 跑后看得清: 掌握用Logview这个“仪表盘”监控任务,快速定位是哪里慢、为什么慢。
- 调优有妙招: 针对数据倾斜、大表关联等老大难问题,分享多种有效策略和调优参数·,轻松化解性能瓶颈。
- 案例学得来: 选取真实任务案例,从头开始展示SQL分析优化步骤,保障老师同学们能够学以致用,应用自如。
优化SQL,不是负担,而是加速器! 掌握这些知识,不仅能让你的分析任务飞速完成,更能为整个团队节省宝贵的资源,让集群真正高效地为我们的分析目标服务。让我们一起,把有限的集群资源,用在最需要的地方,让数据价值更快地流动起来!接下来的内容,咱们就一起探讨如何写出更“聪明”、更高效的ODPS SQL吧!
2. 知己知彼:MapReduce原理与SQL执行逻辑
2.1 MapReduce的"分治"思想
所谓“分治”思想,可以说是人类自古以来为应对复杂系统的一种普适性智慧。春秋战国时期,管仲推行"相地而衰征"的土地政策,将广袤国土划分为不同区域实施差异化治理;秦朝震撼天下的万里长城,由各段落分别筑成后首尾相连,最终形成统一的防御体系;古罗马工程师修建的引水渠系统,通过分段建造、模块化连接的方式跨越山川障碍。这些实践共同揭示了分治思想的核心逻辑:将整体问题解构为可控单元,通过并行推进与协同整合达成全局目标。
时至今日,分治思想依旧在各现代科学领域熠熠生辉。计算机科学中,大家耳熟能详的快速排序、归并排序等算法都是其典型应用;而在分布式计算领域,分治思想可谓又绽放了新的生命力——Google提出的MapReduce框架(后文简称MR)以"分而治之"为理论内核,通过数据分片并行计算与结果聚合,将这一古老智慧实现了从物理空间到数字世界的范式迁移,完美印证了该思想在治理模式、工程技术及计算科学等不同维度的普适价值。
我们日常使用MaxCompute开发SQL时,其底层计算引擎ODPS就是基于MapReduce的封装开发,而MR本质是一个并行计算编程模型,也可以理解为一种抽象理论,一个完整的MR过程通常可以分解为以下4个主要阶段:Split、Map、Shuffle和Reduce,整体概览流程如下图所示:
《汉书》记载:“十里一亭,三十里一驿”,古代驿站体系如同神经网络般连接帝国疆域,是极为重要的传信枢纽。为了更方便理解MR的原理流程,让我们以古代战争时期,前线密报传信回京的流程为例,揭开MapReduce的神秘面纱!
Step 1. Split——拆分密报,化整为零
前线战事每日都会产生诸多机密情报(原始文件块),为了防止被拦截破译,都会被加密并拆分为多道密函(split分片),然后快马分送至各边关驿站(Map节点)。
Step 2. Map——驿站初译,各司其职
各边关驿站中的驿卒(MapTask)拿到负责的密函分片后立即初步翻译密函关键抬头信息,各驿卒之间互不干扰并行处理,根据初译信息给每一份密函标记类别,例如:军力部署情报、敌国间谍情报等等。
Step 3. Shuffle——分拣传信,交错有序
边关驿站将所有驿卒处理完的标记密函按照类别判断需要发往的中枢部门(Reduce节点),如军力部署情报需统一送往兵部,敌国间谍情需递交锦衣卫司等,随后将所有需要发往同一中枢部门的密函交由各差役乘快马加急传信。
Step 4. Reduce——中枢整合,终抵天听
中枢部门待收齐所有分片密函后,将分片汇总合译得到完整密报信息,然后根据具体内容作出相应举措,最后将结果奏折(输出文件)加急直抵皇宫。
相信经过上面这个驿站传信体系的例子,大家已经对MR的大致过程有了一定的理解。正如《孙子兵法》中道:“凡治众如治寡,分数是也",其实不管是驿站传信还是MapReduce,其本质都是“先分而后治之”的思维,通过分布式体系分解体量庞大且繁杂的任务,从而实现提高系统整体效率的目的。关于MR的诸多细节其实在上面的例子中还未详细描述,后文中会继续结合驿站传信体系以及实际SQL代码来进一步深入理解MR的底层原理。
2.2 SQL在MapReduce中的执行逻辑
回到实际中来,我们日常提交的SQL代码究竟是如何转化为MR流程的呢?以我们最常见的UV类指标为例,现在我们需要根据一张推荐dwd层大宽表sr_ds.dwd_tb_rec_vst_itm_pv_lead_di来计算每日首猜曝光UV,SQL代码如下:
计算手淘首猜推荐曝光UV SQL
SELECT COUNT(visitor_id) AS uv FROM ( SELECT visitor_id FROM sr_ds.dwd_tb_rec_vst_itm_pv_lead_extend_di WHERE ds = '20250401' AND leaf_flow_code IN ('1000000130') AND x_biz_enum NOT IN ('sjgg') AND pv > 0 GROUP BY visitor_id ) t1 ;
当我们在ODPS中运行这段SQL代码时,分析引擎会为其自动生成对应的MR执行计划,随即待所需计算资源准备就绪后就会按照上述提及的4个阶段开始依次运行。此处我们先忽略一些可能的优化情况,以最基础完整的流程体验SQL到MR的转换过程:
Step 1. Split
众所周知,我们的表存储会按照建表时设置的分区字段“分块”存储,因此只有在SQL的WHERE条件中过滤了的分区才会被选中并进行Split切片。这部分被选择的分区随后会按照一定规则被切分成n份“逻辑”切片数据(并非真的物理切分),以供Map阶段进行分布式处理。
本例中,表sr_ds.dwd_tb_rec_vst_itm_pv_lead_di中对应ds = '20250401'、leaf_flow_code IN ('1000000130')并且x_biz_enum NOT IN ('sjgg')的分区就会被选中并Split。
Step 2. Map
Map阶段,每个MapTask各自负责处理一个Split分片的数据,即n份Split切片就会有n个MapTask。MR中,MapTask通常会将处理完成的每条数据以“键值对(Key-Value)”的形式输出,Key的作用是对每一条数据进行标识,也就决定了本条数据应当交由哪个ReduceTask处理;Value则为经过加工逻辑后的数据,可能会在后续ReduceTask中参与聚合计算。聪明的读者这里想必会有疑问:为什么一定需要Reduce呢?直接在Map阶段处理完了不行吗?事实确实如此,对于一些简单逻辑的SQL,Map阶段就可以直接将数据处理完毕并输出落盘。但大多数时候,Reduce阶段都是无法避免的,具体缘由会在下文详细解释。
就本例而言,Map阶段处理工作较为简单,只需要针对每部分切片数据,取出对应visitor_id列的数据作为Key,将数字“1”作为Value,然后将两者组成为键值对输出即可,例如:(447794,1),(123456,1)……
Step 3. Shuffle
Shuffle,顾名思义:洗牌、打乱,Shuffle阶段是Map和Reduce之间的桥梁,作用是将各个MapTask输出的一条条数据按照Key和一定的哈希规则分配给不同的ReduceTask进行聚合处理。需要指出的是,一个ReduceTask不一定只处理一种Key的数据,事实上一个ReduceTask经常需要负责处理多个Key的数据,这是和MapTask不同的地方。
这里我们就可以详细解释一下为什么需要Shuffle和Reduce了,我们可以看到,上图中各个MapTask的输出数据是杂乱无章的,每个MapTask之间都可能有重复的数据输出,例如(447794,1),(123456,1)等,这对于Map阶段而言几乎是必然现象,因为在读入Split分片数据时我们控制不了每个分片中数据不重复。但是我们现在的需求是需要计算UV数据,既然每个MapTask之间互相不知道是否含有重复visitor_id,那么只靠Map阶段当然也就无法产出最终结果。因此,我们需要Shuffle帮助我们将Key相同的数据交给同一个ReduceTask,才能在Reduce阶段计算出我们需要的结果。
经过上述分析,Shuffle存在的意义相信大家已经有所理解,这里可以简单地归纳如何判断是否会有Shuffle过程:一般而言,当存在GROUP BY、JOIN、开窗函数、EXPLODE等关键字时,大概率都会触发Shuffle。
Step 4. Reduce
来到了“最后”的Reduce阶段,每个ReduceTask经过Shuffle后都会得到其负责的Key对应的所有数据,由于一个ReduceTask可能需要处理多个Key的数据,因此在该阶段各个Key对应的数据会依次按照聚合逻辑进行处理,直到一个ReduceTask处理完所有其需要负责的数据,就会将聚合结果合并输出。
在本例中,相同的visitor_id会被Shuffle到同一个ReduceTask内,由于我们的聚合逻辑是计算UV,因此每个ReduceTask只需要计算其一共需要负责多少个Key并输出即可。
那么,是不是流程到这里就结束了呢?我们似乎还是不知道最终结果是什么,因为尽管每个ReduceTask把其所负责统计的的visitor_id数量输出了,但是COUNT总数对于我们而言仍然是黑盒,总不能我们手动把一个个输出文件打开来求和吧?所以事实上在这一步Reduce之后,还会有一次Reduce过程,我们暂且称其为“二次Reduce”。对于二次Reduce来说,它的工作是将所有第一次ReduceTask的输出结果Shuffle汇总到一个ReduceTask里,简单求和并输出结果文件,这样就能得到我们最终的UV指标结果。
纵观全局,如果我们跳出局部视角来看,事实上我们可以将本次流程理解为两次MR过程,其中第二次的Map其实就是第一次的Reduce。而所谓MR,其实从来不是固定一次Map和一次Reduce,往往是多层嵌套循环、层层递进的结构。至于究竟什么时候会有二次、三次乃至更多的MR过程,这取决于我们具体的SQL逻辑。此外,并非所有的MR流程最后的Reduce阶段都一定只有1个ReduceTask,除了一些必要的聚合算子(如全局排序 ORDER、COUNT等)外,Reduce阶段也可能直接将各个ReduceTask计算结果直接落盘,无需再次聚合。
3. 运筹帷幄:识别SQL性能问题
3.1 未雨绸缪——提交SQL前的自省“心法”
现在我们已经大致了解了从SQL转化为MR的流程,基于此我们在提交SQL任务之前就可以通过SQL代码自身特征快速预判一些潜在的性能问题,不需要在任务提交之后再分析各种运行Log,从而大幅度提高开发效率、减少资源浪费。
3.1.1 全表扫描
上文已经提到,Split阶段可以只针对需要的分区进行切片操作以减少无用分区的扫描和Split过程,减轻Map阶段过滤和逻辑处理的负担,而达成这一目的最直接的办法就是在WHERE条件中针对读取表的分区结构指定对应的过滤条件,只保留需要的分区。这一步虽然看似简单,其作用却非常明显:对于日TB级别的表,一个分区很有可能就有几百GB甚至TB级别,不加以分区过滤导致的全表扫描不仅仅会导致运行时长严重上升甚至任务失败,更会导致集群资源占用异常,严重影响集群稳定性,降低工作效率。
因此,在提交SQL任务前的心法第一省就是警惕全表扫描,谨防无过滤条件/过滤条件无效的问题,表分区不仅仅只有常见的日期分区,不同的业务表往往都有其特定分区结构,应当仔细核对。
3.1.2 列裁剪缺失
所谓列裁剪,其实就是“各取所需”,剔除各源表中无用的字段,只拿需要用到的字段列。上文的过滤分区可以帮助我们缩减读取行数,而列裁剪则可以帮助我们缩减读取列数。对于一些宽表,其列数可能上百个,如不加以列裁剪将对整个MR阶段都有严重性能影响,尤其在Shuffle阶段会造成极高的传输耗时,原本一趟就可以运送完成的密函由于夹带过多无用手续导致需要多趟往返,效率大大降低。
综上,提交SQL任务前的心法第二省就是重视列裁剪,谨防列裁剪缺失,宁可一一列举,谨慎使用SELECT *。
3.1.3 数据倾斜
数据倾斜这个概念常常被提及,其本质原因却很少被阐述清楚,所谓数据倾斜,一句话概括就是:“不患寡而患不均”。上文中我们已经提到,Map阶段和Reduce阶段都会有很多Task去分别负责处理一部分数据,但由于某些原因可能导致一部分Task需要处理“很多”数据,而其他Task只需要处理“少量”数据,因此该阶段的性能瓶颈完全取决于处理大量数据的Task,其他处理少量数据的Task完成任务后只能在一旁干瞪眼却帮不上忙,这就是数据倾斜。
很多科普内容默认数据倾斜都是发生在Shuffle和Reduce阶段,这显然也是不严谨的,Map阶段同样会发生数据倾斜,只是通常来说我们的数据存储分布不会过于“不规则”,同时Split切片机制也能尽可能让每个MapTask拿到差不多大小的数据片,只要MapTask数量不过少,一般不会发生严重的数据倾斜。那为什么Shuffle和Reduce阶段就是数据倾斜的重灾区呢?
仍然以上文的计算手淘推荐大盘曝光UV指标为例,在Map阶段结束后每一条数据都会以Key-Value形式输出,每条数据被Shuffle到哪一个ReduceTask根据Key来决定。这里的Key自然就是visitor_id,那么很显然,如果有某一部分特殊高频用户(如爬虫账号、测试账号)产生海量行为日志,又或者有大量visitor_id为NULL的脏数据,那么这些visitor_id对应的数据都会分配到对应的一个ReduceTask,而其他的ReduceTask负责的都是正常用户,这里的处理数据量级的差距就可能有几十甚至上百上千倍,大部分正常的ReduceTask十几分钟就处理完毕,却要一起等待倾斜的Task可能数小时,也就出现了严重的数据倾斜。
知道了数据倾斜的本质之后,我们自然也就可以提前预防了。数据倾斜重点发生在Shuffle和Reduce阶段,而Shuffle和Reduce阶段往往由GROUP BY或者JOIN触发,因此我们对GROUP BY和JOIN的Key字段应当更为敏感,除了方才提及的用户id外,对一些如热点商家/店铺id、商品id、品类id等容易引起数据倾斜的列都应当提前注意,减少SQL提交运行后返工的次数,提高集群资源可用度。
所以,提交SQL任务前的心法第三省就是警惕潜在数据倾斜,提前识别高频倾斜模式,可针对性设计分桶、过滤、缓存等策略,从源头规避性能风险。
3.2 见微知著——提交SQL后的监控“道法”
除了在写SQL时可以提前预判一些潜在的性能问题外,在提交SQL之后我们依然不能掉以轻心,许多问题在实际运行之前都是难以提前预料的,尤其是数据倾斜类问题,因此需要针对具体任务分析其执行过程才能做到真正意义上的运筹帷幄,而查看Logview则是这一环节不可或缺的辅助手段。
3.2.1 Logview组成分析
当我们在DatasStudio运行了一个SQL任务之后,日志展示中都会有如下图所示的一个Logview入口链接:
点击进入之后,就可以到达我们的Logview任务详情页:
图中主要将Logview的任务详情页分为了3个区域,分别对应:Basic Info区域、标题功能区、任务详情区域。这里我们重点讲一讲怎么看懂任务具体信息区域中的DAG(Directed Acyclic Graph,有向无环图)图以及各阶段的task详细情况。
所谓DAG图,其实就是MR任务执行流程的逻辑映射和直观展现,看懂了DAG图自然也就看懂了整个MR过程。继续以上文的计算手淘推荐大盘曝光UV指标为例,提交该SQL任务之后,任务具体信息区域中的DAG图就如下图所示:
图中灰色的方框即揭示了本次SQL所需的分区数据内容,可以理解为Split阶段,同时方框内列举了一部分分区信息和总扫描行数;
“M1”方框则表示这是本次MR流程的第一个实例任务阶段:Map阶段,可以看见一共有1721个MapTask实例,R/W表示该阶段读入和写出的总行数统计;
“R2_1”和“R3_2”稍有不同,前缀“R2”和“R3”分别表示这是第二个和第三个实例任务阶段,且均是Reduce任务,而后面紧跟的“_1”和“_2”则分别表示该阶段任务依赖于第一和第二个实例任务阶段,只有在依赖的实例任务阶段完全结束后本实例任务阶段才能开始,即“R2_1”要等待“M1”结束后才能开始,“R3_2”要等待“R2_1”结束后才能开始。同样地,R/W表示该阶段读入和写出的总行数统计,并且可以看见“R2_1”一共有287个ReduceTask实例,“R3_2”只有1个ReduceTask实例;
剩下Map和Reduce以及Reduce和Reduce之间的箭头自然就是Shuffle阶段了。
其实除了上述“M”和“R”阶段外,ODPS会将JOIN也单独标记为一个阶段,在DAG图中以“J”表示,以方便开发者定位JOIN位置,其本质上可以理解为一种特殊的Reudce,定位问题分析方法都是一致的,在此不额外叙述。
由于本例中MR流程已经全部执行结束,因此Map和Reduce方框都是绿色,否则可能有其他颜色表示当前状态。不难看出,上述DAG图所展现的MR总体流程其实和我们在1.2章节中分析的近乎一致,“R2_1”就是第一次Reduce,“R3_2”即为第二次Reduce。搞懂了上述DAG图的基本含义,现在我们已经可以慢慢分析更复杂的任务了,但是仅仅靠DAG图内的这些信息我们仍无法得知当前SQL是否存在倾斜问题,因此需要深入每一阶段的Task实例进行分析。
将任务具体信息区域继续下滑,我们可以看如下图所示的内容:
图中红色方框框住的两个主要部分需要重点关注,第一个框“Fuxi Jobs”即为上文所述的实例任务阶段总览,即Map和Reduce阶段的实例总览,展示的信息包括该阶段的总读入/输出行数及文件大小、运行状态、运行时长等等;第二个框“Fuxi Instance of Fuxi Task”中的内容即为每个实例阶段内各个Task详细信息展示,也就是各个“驿卒”的工作状态,当我们点击DAG图或者“Fuxi Jobs”中的不同阶段,本部分对应的Task信息也会跟随变化,例如可以看到现在点击的是“M1”阶段,则各个Task即为Maptask,展示的信息内容和总览类似。下图展示了任务具体信息区域内三个大模块的对应关系:
3.2.2 借助Logview定位SQL问题
理解了上述Logview组成之后,我们就可以通过观察任务执行情况来判断我们的SQL有没有需要改善的问题了,这里我们只针对一些常见问题讲解如何判断其发生情况以及对应SQL中的代码位置,关于问题的具体解决方法会在后续章节详细叙述。
3.2.2.1 MapTask资源问题
前文已经提到,Map阶段Task的数量取决于数据一共被Split成了多少个数据片,通常来说MapTask的数量在没有经过任何调整的情况下只要在10000以内都可以不用特殊控制,例如上述例子中MapTask数量一共为1721个。
但是当我们发现这个数字过大的时候就应当及时加以调整,我们可以简单地理解为,MapTask越多,所需的机器和cpu就越多,那么对集群资源的占用情况就越严重。对于一些并非紧急的任务,我们可以考虑让每个MapTask多处理一部分数据以缓解集群资源紧张的情况。
每个MapTask都会分配有一定数量的cpu负责处理对应的分片数据,分配的cpu数量越多那么处理速度自然也会更快,只不过大部分情况下我们不需要额外调整cpu的分配数量,因为分配过多或者过少可能都不利于集群资源健康。当我们发现Map阶段整体执行较为缓慢,但MapTask数量没有异常并且内部也没有显著的倾斜发生时,应当检查SQL中对应读入数据后的Map处理逻辑是否过于复杂,进而优化代码降低复杂度。若确实无法从SQL角度降低处理成本,最后可以考虑为每个MapTask适当增加分配的cpu数量,以加速处理过程,减少资源占用时长。
3.2.2.2 Reduce数据倾斜问题
关于数据倾斜的概念在2.1.3节中已经给出了较为详细的叙述,除了一些可以提前预防的高频倾斜模式外,还有许多潜在的倾斜问题需要结合Logview才能准确分析。通过查看Reduce阶段内的Task细览,我们可以简单地点击如下图所示的“Long-Tails”按钮快速查看长尾的Task实例:
如上图中,在第一个Reduce阶段有2个长尾Task,那么我们如何判断这2个Task长尾是由于数据倾斜的原因导致的呢?结合数据倾斜的定义,我们只需要将这两个Task的数据处理条数与其他Task进行对比,如果有明显的数量级差异,那么就可以判断其长尾原因由数据倾斜导致,否则有时候一些长尾节点可能由于资源或者集群原因导致运行时间稍长,并非倾斜导致。
判断出有数据倾斜的Reduce任务后,我们就可以根据其发生位置反推SQL问题所在之处。对于一些比较简单的任务我们直接可以判断出来是哪个JOIN或者GROUP BY所导致,而对于较为复杂的任务,我们则可以继续借助Logview信息查看。在确定了发生倾斜的Reduce任务实例之后,我们首先需要在DAG图中查找其前缀依赖节点,因为对于已经发生数据倾斜的Reduce任务而言,其收到的是由其前缀依赖阶段Shuffle而来的数据,以上文例子而言,假设我们的“R2_1”确定发生了数据倾斜,那么其前缀则为“M1”。然后通过双击前缀实例节点,可以看到如下图所示的该阶段任务具体流程:
继续单击尾部的“StreamLineWrite”流程,可以看到上图中左侧所示的细节参数信息,其中红框所示的信息是我们需要重点关注的,即keys和partitions,因为这就是Shuffle的具体依据,可以看到在此例中visitor_id即为导致数据倾斜的Key,那么结合我们的SQL代码也就可以判断出“GROUP BY visitor_id”即为问题根源。关于数据倾斜,其本质是一个大类问题,具体到GROUP BY和JOIN各自还有多种问题场景及对应解决方案,这些都会在第三章详细展开,读者在此只需要能够借助Logview等方式判断代码问题位置即可。
3.2.2.3 ReduceTask资源问题
在排除了Reduce阶段的数据倾斜问题之后,如果我们发现每个ReduceTask处理的数据量比较大,耗时较长,但是没有发生长尾,则可以考虑适当增大ReduceTask的数量,以减轻每个ReduceTask的负载量,提升并行度。
同样地,如果发现某些Reduce任务并没有上述显著异常但特别耗计算资源导致运行缓慢的话,可以为Reduce阶段适当增加cpu数目或者增加节点内存分配,只是对于大多数SQL任务来说,一般不需要调整这两类资源。
4. 沙场点兵:SQL调优实战策略
4.1 Map阶段调优
4.1.1 MapTask 数量过多问题
当发现MapTask 数量过多问题时,应当直接从Split过程寻找解决方法,因为MapTask数量是无法人工指定的,直接取决于Split分片数量。
首先我们可以先观察每个MapTask负责的分片数据大小,默认情况下Split切分文件的size阈值为256M,因此如果较多MapTask的Input大小都远低于256M时,说明源数据存储时有较多小文件,而Split默认不会将小文件合并再切分,因此每个小文件都需要一个MapTask,导致MapTask 数量过多。此时可以用以下参数合并小文件:
set odps.sql.mapper.merge.limit.size=64;
作用:设定控制文件被合并的最大阈值,小于该阈值的文件会被合并后切分,单位M,默认64M,在[0,Integer.MAX_VALUE]之间调整。
如果观察MapTask的Input大小后发现没有明显的小文件过多特征,则可以考虑适当增加Split size的大小,让每个分片大一些,即让每个MapTask处理的数据量增大,也能达到降低MapTask 数量的目的:
set odps.sql.mapper.split.size=256;
作用:设定一个MapTask的最大数据输入量,可以通过设置这个变量达到对Split size的控制,单位M,默认256M,在[1,Integer.MAX_VALUE] 之间调整。调大是降低并发,调小是提升并发。
上述这种参数形式都是作用于整个任务级别的,如果想更细致控制可以使用SPLIT SIZE HINT来作用到表级别,示例如下:
select a.key from src a /*+split_size(128)*/ join src2 b on a.key=b.key;
作用:设置split size大小为128MB,此hint会在读表src时,按照128M的大小来切分task。指定的值单位为MB,默认值为256MB,建议按照256MB的倍数进行调整,例如128MB、512MB等。
需要说明的是,如果一个SQL中有重复读同一张表,那么Hint会被合并成为指定的最小值,例如SQL中有两个读表src的地方,一个Hint为1MB一个为10MB,那么最后会按照1MB来切分。
4.1.2 MapTask cpu不足问题(谨慎调整)
针对MapTask cpu不足问题,如果从SQL逻辑本身出发无法降低复杂度,可以适当给MapTask增加cpu配额:
set odps.sql.mapper.cpu=100;
作用:设置处理Map Task每个Instance的CPU数目,默认为100,在[50,800]之间调整。
4.2 数据倾斜调优
4.2.1 GROUP BY 数据倾斜调优
假设现在有如下需求:需要根据推荐dwd层大宽表sr_ds.dwd_tb_rec_vst_itm_pv_lead_di计算每日首猜有曝光商品粒度相关指标,常规SQL代码如下:
计算手淘首猜推荐有曝光商品粒度相关指标 SQL
SELECT item_id ,COUNT(1) AS pv FROM sr_ds.dwd_tb_rec_vst_itm_pv_lead_extend_di WHERE ds = '20250401' AND leaf_flow_code IN ('1000000130') AND x_biz_enum NOT IN ('sjgg') AND pv > 0 GROUP BY item_id ;
按照上述代码执行过程中,我们发现Reduce阶段有明显的数据倾斜问题,于是根据上一章方法快速定位到了问题根源:“GROUP BY item_id”,据此推测原因是有部分热点商品曝光次数远高于其他商品所导致。那么基于数据倾斜的严重程度和具体场景,可以参考以下几个方式解决:
4.2.1.1 Map端预聚合优化
从问题发生的本质原因来看,正是因为要根据Key去做Shuffle将明细数据交由Reduce阶段进行聚合操作,才会导致ReduceTask内数据不均匀的问题,那有没有办法在Shuffle之前就减少数据量呢?答案当然是有的:如果我们在Map过程中就对每个MapTask内的数据根据GROUP Key做一次“初步聚合”操作,然后将这个初步聚合结果依然按照GROUP Key拿去分发,这样每个ReduceTask虽然负责的Key数量不变,但是同一个Key对应的数据大大减少,也就能避免数据倾斜了,这就是“预聚合”方法。
其实当前MaxCompute已经为我们做了非常多的优化设计,其中就包括“预聚合”优化,因此我们在实际开发中并不需要显式指定使用预聚合功能,大部分内置聚合函数都会自动实现预聚合,以AVG为例,我们在下图Logview中可以看到,底层会自动帮我们将AVG解构成分子分母形式去做累加计算,然后在Reduce阶段将来自各个MapTask的分子分母累加再做除法最后得出结果。那在Map阶段做这种初步聚合操作会不会影响性能呢?可以负责任的说,在绝大部分场景下预聚合都对提升任务整体性能是有效的,因为其不仅仅能够帮助减少Reduce阶段的任务量和发生数据倾斜的可能性,更能够直接减少Shuffle过程IO及网络传输耗时。
控制预聚合生效/失效的参数如下:
set odps.sql.map.aggr=true/false;
需要特别注意的是,尽管预聚合在大部分场景下都有明显的优化作用,但仍然有少部分场景可能会受其影响导致反向优化,同时对于一些自定义的UDAF函数,预聚合不一定能够生效。因此如果我们并未关闭预聚合优化但还是发现了明确的数据倾斜问题时,就需要考虑使用下述方式尝试手动解决。
4.2.1.2 少量倾斜Key打散+二次聚合后UNION
针对由一小部分热点Key或异常Key所引起的GROUP BY类数据倾斜问题(反映在Logview上即“Long-Tails”的Task数量较小,一般在20以内),可以通过将这部分Key对应的数据手动取出后,将该Key打散单独做二次聚合操作,再与其他正常数据UNION即可。
那么如何判定倾斜Key呢?在一些倾斜Key常年不变的场景中,我们可以使用一张维表来存储这些历史热点Key,然后在处理逻辑中将这一部分Key剔除单独做处理;而在一些无法预测倾斜Key的场景中,我们则需要手动实现判断倾斜Key的逻辑,常见的方法是先对源数据采样,然后根据GROUP Key排序得到top倾斜Key,或者基于历史数据情况设定倾斜阈值来过滤出倾斜Key等。具体实现方法感兴趣的读者可以自行查阅相关资料,此处不过多赘述。
在得到倾斜Key列表后,就可以将其手动打散做二次聚合来规避数据倾斜问题,假设我们已经发现“12345”和“54321”这两个item_id相较其他Key有明显的倾斜,那么就可以用下面的优化SQL来解决:
少量倾斜Key手动打散二次聚合后UNION
SELECT item_id ,COUNT(1) AS pv FROM sr_ds.dwd_tb_rec_vst_itm_pv_lead_extend_di WHERE ds = '20250401' AND leaf_flow_code IN ('1000000130') AND x_biz_enum NOT IN ('sjgg') AND pv > 0 AND item_id NOT IN ('12345', '54321') GROUP BY item_id UNION ALL SELECT SPLIT(item_id_rand, '-')[0] AS item_id ,SUM(pv_rand) AS pv FROM ( SELECT item_id_rand ,COUNT(1) AS pv_rand FROM ( SELECT * ,CONCAT(item_id, '-', RAND()*1000) AS item_id_rand FROM sr_ds.dwd_tb_rec_vst_itm_pv_lead_extend_di WHERE ds = '20250401' AND leaf_flow_code IN ('1000000130') AND x_biz_enum NOT IN ('sjgg') AND pv > 0 AND item_id IN ('12345', '54321') ) tmp GROUP BY item_id_rand ) t1 GROUP BY SPLIT(item_id_rand, '-')[0] ;
核心逻辑就是先根据加随机值打散后的Key进行一次MR,将原始倾斜的这部分数据均匀分散到N个ReduceTask上做聚合操作,得到打散Key的聚合值;然后再去掉随机值后缀,恢复原Key做聚合得到最终结果,这时二次MR的Reduce过程中每个复原的Key最多也就只有N条数据需要聚合,即便每个Key之间可能还有一定体量差异,但已经可以忽略不计。那么,这个“N”应该为多大呢?如果过小,仍然有可能导致数据倾斜问题;如果过大,会降低集群性能运行缓慢,通常来说打散范围应该和ReduceTask数量匹配,设置为ReduceTask数量的1-2倍最佳。
4.2.1.3 全部Key打散+二次聚合
当发现长尾节点特别多,各ReduceTask处理数据行数分布范围又极大时,我们再去针对部分倾斜Key做二次聚合的处理效率就比较低了,这时我们可以直接在全局Key层面上应用上述二次聚合思路,即将全局Key都进行打散操作做初步聚合,然后再恢复Key二次聚合得到结果,SQL如下:
全部Key手动打散二次聚合
SELECT SPLIT(item_id_rand, '-')[0] AS item_id ,SUM(pv_rand) AS pv FROM ( SELECT item_id_rand ,COUNT(1) AS pv_rand FROM ( SELECT * ,CONCAT(item_id, '-', RAND()*1000) AS item_id_rand FROM sr_ds.dwd_tb_rec_vst_itm_pv_lead_extend_di WHERE ds = '20250401' AND leaf_flow_code IN ('1000000130') AND x_biz_enum NOT IN ('sjgg') AND pv > 0 ) tmp GROUP BY item_id_rand ) t1 GROUP BY SPLIT(item_id_rand, '-')[0] ;
4.2.1.4 skewindata参数优化
针对GROUP BY数据倾斜问题,MaxCompute同样为开发者提供了自动优化相关参数:
set odps.sql.groupby.skewindata=true/false;
作用:开启/关闭Group By优化。
该参数本质原理和3.2.1.2“少量倾斜Key打散+二次聚合后UNION”一致,通过“分桶随机化”和“两阶段聚合”的方式来缓解因某些Group Key分布极不均匀导致的性能问题。相较手动实现的方式,使用该参数主要优势在于可以利用ODPS底层的存储分析优势快速检测倾斜 Key,省去人工分析倾斜Key的过程。
4.2.2 JOIN 数据倾斜调优
假设现在有如下需求:需要从推荐dwd层事实表sr_ds.dwd_tb_rec_vst_itm_pv_lead_di中取出手淘首猜曝光明细数据后关联上商品类目维度,常规SQL代码如下:
手淘首猜曝光明细数据关联商品类目维度 SQL
set odps.sql.mapper.memory = 2048; SELECT t1.* ,t2.cate_tags FROM ( SELECT * FROM sr_ds.dwd_tb_rec_vst_itm_pv_lead_di /*+split_size(20480)*/ WHERE ds = '20250401' AND leaf_flow_code IN ('1000000130') AND x_biz_enum NOT IN ('sjgg') AND pv > 0 ) t1 LEFT JOIN ( SELECT TO_CHAR(leaf_cate_id) AS leaf_cate_id ,cate_tags FROM sr_ds.dim_tb_cate_tags WHERE ds = '20250401' ) t2 ON t1.leaf_cate_id = t2.leaf_cate_id ;
按照上述代码执行过程中,我们发现JOIN阶段有明显的数据倾斜问题,于是根据第二章方法快速定位到了问题根源:“LEFT JOIN ON t1.leaf_cate_id = t2.leaf_cate_id”,据此推测原因是有部分热点商品类目曝光次数远高于其他商品类目所导致。那么基于数据倾斜的严重程度和具体场景,可以参考以下几个方式解决,总体思路上其实和GROUP BY的倾斜问题解决有异曲同工之妙:
4.2.2.1 MapJoin(大表JOIN小表)
在针对GROUP BY的倾斜问题的优化中,我们通过预聚合方法可以实现在Map阶段中就完成一大部分运算而不影响整体性能,那么类似地,JOIN过程是否也可以参考这种思路?我们知道,JOIN本质即两个前缀依赖阶段通过将相同Key对应的数据Shuffle到相同Reduce节点执行JOIN操作,那如果直接在前缀阶段的各个Task中执行完JOIN操作,不就不需要Shuffle,即不会有数据倾斜了吗?没错,这就是“MapJoin”。
MapJoin的本质即通过将两表JOIN时较小的那张表(通常是维表)提前完整分发到另一个大表所在的各个Task中,然后大表所在的各个Task就可以负责将各自的数据与分发维表做JOIN即可。之所以需要是较小的维表才可以分发,原因在于该JOIN过程需要将小表读入各个Task的内存中才能执行,如果分发表过大就会爆内存。这样一来,相当于在“Map”阶段中就完成了JOIN操作,完全无需Shuffle后再到Reduce阶段做JOIN,大大提升了性能。要想使用MapJoin也非常简单,在代码中SELECT后加上 /*+ mapjoin(b) */ 即可,其中b代表小表(或者是子查询)的别名。应用在上述场景中,修改优化代码如下:
MapJoin优化后 SQL
set odps.sql.mapjoin.memory.max=2048; SELECT /*+ mapjoin(t2) */ t1.* ,t2.cate_tags FROM ( SELECT * FROM sr_ds.dwd_tb_rec_vst_itm_pv_lead_di /*+split_size(20480)*/ WHERE ds = '20250401' AND leaf_flow_code IN ('1000000130') AND x_biz_enum NOT IN ('sjgg') AND pv > 0 ) t1 LEFT JOIN ( SELECT TO_CHAR(leaf_cate_id) AS leaf_cate_id ,cate_tags FROM sr_ds.dim_tb_cate_tags WHERE ds = '20250401' ) t2 ON t1.leaf_cate_id = t2.leaf_cate_id ;
使用MapJoin虽然性能提升巨大,但需要注意的事项也不少,首先必须是Join中的从表比较小才可用,所谓从表,即Left Outer Join中的右表,或者Right Outer Join中的左表;另外,Mapjoin使用的时候,对小表的大小有限制,默认小表读入内存后的大小不能超过512M,但是用户可以通过参数设置加大内存,最大为2048M:
set odps.sql.mapjoin.memory.max=2048;
因此,当JOIN出现了数据倾斜但两张表都比较大而无法直接全量使用MapJoin时,可以考虑使用下文的优化方案。
4.2.2.2 Distributed MapJoin(大表JOIN中表)
上述MapJoin方法要求读入的从表最大不得超过2048M,这意味着在许多场景中可能无法直接使用MapJoin,因此当从表大小超过该限制时我们还有没有办法使用类似MapJoin的优化形式呢?这时就可以考虑使用MaxCompute为我们提供的Distributed MapJoin方法。
Distributed MapJoin是MapJoin的升级版,适用于适用于大表Join中表的场景,二者的核心目的都是为了减少大表侧的Shuffle和排序。简单地说,Distributed MapJoin核心原理是将从表按照Join Key分片成多个Shard后并行广播到各个MapTask中,而此时MapTask不会再一次性将从表数据全部导入内存,而是根据Join Key的哈希值匹配对应的小表分片加载到内存完成局部Join,虽然这样会导致IO较为频繁,但可以直接在Map阶段输出Join结果省去Reduce阶段,不仅避免了Shuffle过程还规避了倾斜问题,性能虽不及MapJoin但远高于一般的MergeJoin。
通常来说,使用Distributed MapJoin要求小表侧数据在[1GB, 100GB]范围内,并且数据需要均匀分布,没有明显的长尾,否则单个分片会产生过多的数据,导致OOM(Out Of Memory)及RPC(Remote Procedure Call)超时问题。
使用Distributed MapJoin需要在SELECT语句中使用Hint提示:/*+distmapjoin((shard_count=,replica_count=))*/才会执行distmapjoin。其中,shard_count和replica_count共同决定任务运行的并发度,即并发度=shard_count * replica_count。
参数说明:shard_count=:设置小表数据的分片数,小表数据分片会分布至各个计算节点处理。n即为分片数,一般按奇数设置。该值建议手动指定,shard_count值可以根据小表数据量来大致估算,预估一个分片节点处理的数据量范围是[200 MB, 500 MB]。
replica_count=:设置小表数据的副本数。m即为副本数,默认为1。为了减少访问压力以及避免单个节点失效导致整个任务失败,同一个分片的数据,可以有多个副本。当并发过多,或者环境不稳定导致运行节点频繁重启,可以适当提高replica_count,一般建议为2或3。
使用方法有如下几种方式:
-- 方法1:指定shard_count(replica_count默认为1)
/*+distmapjoin(a(shard_count=5))*/
-- 方法2:指定shard_count和replica_count
/*+distmapjoin(a(shard_count=5,replica_count=2))*/
以方法1为例,上文的例子则可以用如下SQL代码优化:
DistMapJoin优化后 SQL
set odps.sql.mapjoin.memory.max=2048; SELECT /*+ DISTMAPJOIN(t2(shard_count=5)) */ t1.* ,t2.cate_tags FROM ( SELECT * FROM sr_ds.dwd_tb_rec_vst_itm_pv_lead_di /*+split_size(20480)*/ WHERE ds = '20250401' AND leaf_flow_code IN ('1000000130') AND x_biz_enum NOT IN ('sjgg') AND pv > 0 ) t1 LEFT JOIN ( SELECT TO_CHAR(leaf_cate_id) AS leaf_cate_id ,cate_tags FROM sr_ds.dim_tb_cate_tags WHERE ds = '20250401' ) t2 ON t1.leaf_cate_id = t2.leaf_cate_id ;
4.2.2.3 SkewJoin(大表JOIN大表)
当从表数据量进一步增大,上述两种方法都无法使用时又应该怎么解决呢?回顾3.2.1.2节,针对由一小部分热点Key或异常Key所引起的数据倾斜问题,通用的思路是可以将这部分热点数据手动取出后单独处理,那么放在JOIN问题中该思路也一样可以复用。换言之,对于这部分热点数据而言,我们并不需要将从表完全读取到内存中执行MapJoin,只需要将热点Key对应的那部分从表数据取出然后使用MapJoin即可,因为一般倾斜Key占总体Key的很小一部分,因此从表中对应倾斜Key的数据的体量也会较小,绝大部分情况下能够符合MapJoin使用条件,而剩下的正常数据只需要正常Merge Join然后与倾斜数据UNION即可。
上述思路如果手动实现也并不复杂,不过MaxCompute也已经为我们提供了现成的优化策略:SkewJoin。SkewJoin可以通过自动或手动方式获取两张表的热点key,分别计算热点数据和非热点数据的Join结果并合并,从而加快Join的执行速度。同样地,SkewJoin由于本质是对热点数据使用MapJoin,因此对支持的Join类型同样有一定要求:Inner Join可以对Join两侧表中的任意一侧进行Hint;Left Join、Semi Join和Anti Join只可以Hint左侧表;Right Join只可以Hint右侧表;Full Join不支持Skew Join Hint。同时,被Hint的Join的Left Side Join Key的类型需要与Right Side Join Key的类型一致,否则SkewJoin Hint不生效。使用方法有如下几种方式:
- 方法1:hint表名(注意hint的是表的alias)
select /*+ skewjoin(a) */ * from T0 a join T1 b on a.c0 = b.c0 and a.c1 = b.c1;
- 方法2:hint表名和认为可能产生倾斜的列,下面的case认为a的c0和c1列存在数据倾斜
select /*+ skewjoin(a(c0, c1)) */ * from T0 a join T1 b on a.c0 = b.c0 and a.c1 = b.c1 and a.c2 = b.c2;
- 方法3:hint表名和列,并提供发生倾斜的key值(注意如果是String类型,需要加上引号),下面的例子是认为,(a.c0=1 and a.c1="2") 和 (a.c0=3 and a.c1="4")的值都存在数据倾斜
select /*+ skewjoin(a(c0, c1)((1, "2"), (3, "4"))) */ * from T0 a join T1 b on a.c0 = b.c0 and a.c1 = b.c1 and a.c2 = b.c2;
可以肯定的是,直接指定skewvalue(方法3)效率会比不指定的高(方法1和方法2)。以方法2为例,上文例子则可以用如下SQL代码优化:
SkewJoin优化后 SQL
SELECT /*+ SKEWJOIN(t1(leaf_cate_id)) */ t1.* ,t2.cate_tags FROM ( SELECT * FROM sr_ds.dwd_tb_rec_vst_itm_pv_lead_di /*+split_size(20480)*/ WHERE ds = '20250401' AND leaf_flow_code IN ('1000000130') AND x_biz_enum NOT IN ('sjgg') AND pv > 0 ) t1 LEFT JOIN ( SELECT TO_CHAR(leaf_cate_id) AS leaf_cate_id ,cate_tags FROM sr_ds.dim_tb_cate_tags WHERE ds = '20250401' ) t2 ON t1.leaf_cate_id = t2.leaf_cate_id ;
使用SkewJoin Hint后,优化器会运行一个Aggregate获取前20的热值,20是默认值,可以通过set odps.optimizer.skew.join.topk.num = xx;进行设置。
同时需要特别注意的是,被MapJoin Hint的Join不能再添加SkewJoin Hint,并且当在多个连续JOIN场景下同时使用SkewJoin和MapJoin时,必须保障第一个JOIN阶段不是MapJoin,否则会使SkewJoin失效。如果没有显式指定倾斜列,SkewJoin会默认将第一个JOIN的Key对应的列作为倾斜列进行热点数据拆分,而如果指定了倾斜列,SkewJoin就会在JOIN之前将主表根据倾斜列拆分,然后对于后续所有JOIN操作都区分热点和非热点数据进行。
4.3 Reduce阶段调优
4.3.1 ReduceTask 并行度低问题
在确认没有数据倾斜问题或者数据倾斜调优完成后,如果我们发现每个ReduceTask处理的数据量比较大导致耗时较长,则可以考虑基于当前实例数量的情况下适当增加实例数来提高任务并行度:
set odps.sql.reducer.instances=-1;
作用: 设定Reduce Task的Instance数量,手动设置区间在[1,10000]之间调整。不走HBO优化时,ODPS能够自动设定的最大值为1111,手动设定的最大值为10000,建议不超过2999,容易导致OOM(爆内存)。
类似地,针对JOIN阶段并行度低问题也可通过如下参数来调整实例数:
set odps.sql.joiner.instances=-1;
作用: 设定Join Task的Instance数量,默认为-1,在[0,2000]之间调整。不走HBO优化时,ODPS能够自动设定的最大值为1111,手动设定的最大值为2000,走HBO时可以超过2000。
4.3.2 ReduceTask cpu/内存不足问题(谨慎调整)
ReduceTask cpu不足与MapTask cpu不足问题解决方法类似,但非必要不建议调整:
set odps.sql.reducer.cpu=100;
作用:设定处理Reduce Task每个Instance的Cpu数目,默认为100,在[50,800]之间调整。
除了cpu不足导致的运行缓慢问题外,内存不足也有可能导致任务龟速,特征表现在Reduce阶段的Instance有Writer Dumps(作业跑完后,可以在LogView的summary 页面中搜索“writer dumps”字样来判断是否产生 Writer Dumps),这时可以适当的增加内存的大小,减少Dumps所花的时间:
set odps.sql.reducer.memory=1024;
作用:设定Reduce Task每个Instance的Memory大小,单位M,默认1024M,在[256,12288]之间调整。
同样地,JOIN阶段对应上述问题的优化参数如下:
set odps.sql.joiner.cpu=100;
作用: 设定Join Task每个Instance的CPU数目,默认为100,在[50,800]之间调整。
set odps.sql.joiner.memory=1024;
作用:设定Join Task每个Instance的Memory大小,单位为M,默认为1024M,在[256,12288]之间调整。
5. 以史为鉴:经典案例剖析
5.1 案例1——不设分区,全表扫描
先看原SQL:
案例1 SQL
set odps.sql.mapper.split.size=8192; insert overwrite table sr_ds.dwd_tb_rec_all_reimps_1d partition(ds = '${bizdate}', leaf_flow_code ) select *except(leaf_flow_code) , leaf_flow_code from ( select t1.user_id , t1.item_id , bkt_id_88 , bkt_id_99 , bkt_id_9421 , seller_id , org_brand_id , taggingv5_cate , taggingv5_cate_name , taggingv5_first , tt_cluster_id , tk_cluster_id , xsk_cluster_id , v5_cluster_id , session_id_ut , bkt_id_ad , page_num , isfixpos , is_reserve , app_version , os , os_version , itemorimatchtype , x_biz , x_sub_biz , card_type , card_subtype , rec_sessionid , cate_id , ind1_name , cate_level1_name , cate_level2_name , cate_name , xcat1_id , xcat1_name , xcat2_id , xcat2_name , xcat3_id , xcat3_name , if((local_time-if(local_time=t2.min_today_time_stamp,t2.max_time_stamp,t2.min_today_time_stamp))<=24*60*60,1,0) as is_24h_pv_item , if((local_time-if(local_time=t2.min_today_time_stamp,t2.max_time_stamp,t2.min_today_time_stamp))<=48*60*60,1,0) as is_48h_pv_item , if((local_time-if(local_time=t2.min_today_time_stamp,t2.max_time_stamp,t2.min_today_time_stamp))<=72*60*60,1,0) as is_72h_pv_item , if((local_time-if(local_time=t3.min_today_time_stamp,t3.max_time_stamp,t3.min_today_time_stamp))<=24*60*60,1,0) as is_24h_clk_item , if((local_time-if(local_time=t3.min_today_time_stamp,t3.max_time_stamp,t3.min_today_time_stamp))<=48*60*60,1,0) as is_48h_clk_item , if((local_time-if(local_time=t3.min_today_time_stamp,t3.max_time_stamp,t3.min_today_time_stamp))<=72*60*60,1,0) as is_72h_clk_item , if((local_time-if(local_time=t3.min_today_time_stamp,t3.max_time_stamp,t3.min_today_time_stamp))<=168*60*60,1,0) as is_7d_clk_item , if((local_time-if(local_time=t4.min_today_time_stamp,t4.max_time_stamp,t4.min_today_time_stamp))<=24*60*60,1,0) as is_24h_interact_item , if((local_time-if(local_time=t4.min_today_time_stamp,t4.max_time_stamp,t4.min_today_time_stamp))<=48*60*60,1,0) as is_48h_interact_item , if((local_time-if(local_time=t4.min_today_time_stamp,t4.max_time_stamp,t4.min_today_time_stamp))<=72*60*60,1,0) as is_72h_interact_item , if((local_time-if(local_time=t4.min_today_time_stamp,t4.max_time_stamp,t4.min_today_time_stamp))<=168*60*60,1,0) as is_7d_interact_item , if((local_time-if(local_time=t5.min_today_time_stamp,t5.max_time_stamp,t5.min_today_time_stamp))<=24*60*60,1,0) as is_24h_pay_item , if((local_time-if(local_time=t5.min_today_time_stamp,t5.max_time_stamp,t5.min_today_time_stamp))<=48*60*60,1,0) as is_48h_pay_item , if((local_time-if(local_time=t5.min_today_time_stamp,t5.max_time_stamp,t5.min_today_time_stamp))<=72*60*60,1,0) as is_72h_pay_item , if((local_time-if(local_time=t5.min_today_time_stamp,t5.max_time_stamp,t5.min_today_time_stamp))<=168*60*60,1,0) as is_7d_pay_item , if((local_time-if(local_time=t5.min_today_time_stamp,t5.max_time_stamp,t5.min_today_time_stamp))<=360*60*60,1,0) as is_15d_pay_item , if((local_time-if(local_time=t5.min_today_time_stamp,t5.max_time_stamp,t5.min_today_time_stamp))<=720*60*60,1,0) as is_30d_pay_item , if((local_time-if(local_time=t6.min_today_time_stamp,t6.max_time_stamp,t6.min_today_time_stamp))<=24*60*60,1,0) as is_24h_pv_seller , if((local_time-if(local_time=t6.min_today_time_stamp,t6.max_time_stamp,t6.min_today_time_stamp))<=48*60*60,1,0) as is_48h_pv_seller , if((local_time-if(local_time=t6.min_today_time_stamp,t6.max_time_stamp,t6.min_today_time_stamp))<=72*60*60,1,0) as is_72h_pv_seller , if((local_time-if(local_time=t7.min_today_time_stamp,t7.max_time_stamp,t7.min_today_time_stamp))<=24*60*60,1,0) as is_24h_clk_seller , if((local_time-if(local_time=t7.min_today_time_stamp,t7.max_time_stamp,t7.min_today_time_stamp))<=48*60*60,1,0) as is_48h_clk_seller , if((local_time-if(local_time=t7.min_today_time_stamp,t7.max_time_stamp,t7.min_today_time_stamp))<=72*60*60,1,0) as is_72h_clk_seller , if((local_time-if(local_time=t7.min_today_time_stamp,t7.max_time_stamp,t7.min_today_time_stamp))<=168*60*60,1,0) as is_7d_clk_seller , if((local_time-if(local_time=t8.min_today_time_stamp,t8.max_time_stamp,t8.min_today_time_stamp))<=24*60*60,1,0) as is_24h_interact_seller , if((local_time-if(local_time=t8.min_today_time_stamp,t8.max_time_stamp,t8.min_today_time_stamp))<=48*60*60,1,0) as is_48h_interact_seller , if((local_time-if(local_time=t8.min_today_time_stamp,t8.max_time_stamp,t8.min_today_time_stamp))<=72*60*60,1,0) as is_72h_interact_seller , if((local_time-if(local_time=t8.min_today_time_stamp,t8.max_time_stamp,t8.min_today_time_stamp))<=168*60*60,1,0) as is_7d_interact_seller , if((local_time-if(local_time=t9.min_today_time_stamp,t9.max_time_stamp,t9.min_today_time_stamp))<=24*60*60,1,0) as is_24h_pay_seller , if((local_time-if(local_time=t9.min_today_time_stamp,t9.max_time_stamp,t9.min_today_time_stamp))<=48*60*60,1,0) as is_48h_pay_seller , if((local_time-if(local_time=t9.min_today_time_stamp,t9.max_time_stamp,t9.min_today_time_stamp))<=72*60*60,1,0) as is_72h_pay_seller , if((local_time-if(local_time=t9.min_today_time_stamp,t9.max_time_stamp,t9.min_today_time_stamp))<=168*60*60,1,0) as is_7d_pay_seller , if((local_time-if(local_time=t9.min_today_time_stamp,t9.max_time_stamp,t9.min_today_time_stamp))<=360*60*60,1,0) as is_15d_pay_seller , if((local_time-if(local_time=t9.min_today_time_stamp,t9.max_time_stamp,t9.min_today_time_stamp))<=720*60*60,1,0) as is_30d_pay_seller , if((local_time-if(local_time=t10.min_today_time_stamp,t10.max_time_stamp,t10.min_today_time_stamp))<=24*60*60,1,0) as is_24h_pv_brand , if((local_time-if(local_time=t10.min_today_time_stamp,t10.max_time_stamp,t10.min_today_time_stamp))<=48*60*60,1,0) as is_48h_pv_brand , if((local_time-if(local_time=t10.min_today_time_stamp,t10.max_time_stamp,t10.min_today_time_stamp))<=72*60*60,1,0) as is_72h_pv_brand , if((local_time-if(local_time=t11.min_today_time_stamp,t11.max_time_stamp,t11.min_today_time_stamp))<=24*60*60,1,0) as is_24h_clk_brand , if((local_time-if(local_time=t11.min_today_time_stamp,t11.max_time_stamp,t11.min_today_time_stamp))<=48*60*60,1,0) as is_48h_clk_brand , if((local_time-if(local_time=t11.min_today_time_stamp,t11.max_time_stamp,t11.min_today_time_stamp))<=72*60*60,1,0) as is_72h_clk_brand , if((local_time-if(local_time=t11.min_today_time_stamp,t11.max_time_stamp,t11.min_today_time_stamp))<=168*60*60,1,0) as is_7d_clk_brand , if((local_time-if(local_time=t12.min_today_time_stamp,t12.max_time_stamp,t12.min_today_time_stamp))<=24*60*60,1,0) as is_24h_interact_brand , if((local_time-if(local_time=t12.min_today_time_stamp,t12.max_time_stamp,t12.min_today_time_stamp))<=48*60*60,1,0) as is_48h_interact_brand , if((local_time-if(local_time=t12.min_today_time_stamp,t12.max_time_stamp,t12.min_today_time_stamp))<=72*60*60,1,0) as is_72h_interact_brand , if((local_time-if(local_time=t12.min_today_time_stamp,t12.max_time_stamp,t12.min_today_time_stamp))<=168*60*60,1,0) as is_7d_interact_brand , if((local_time-if(local_time=t13.min_today_time_stamp,t13.max_time_stamp,t13.min_today_time_stamp))<=24*60*60,1,0) as is_24h_pay_brand , if((local_time-if(local_time=t13.min_today_time_stamp,t13.max_time_stamp,t13.min_today_time_stamp))<=48*60*60,1,0) as is_48h_pay_brand , if((local_time-if(local_time=t13.min_today_time_stamp,t13.max_time_stamp,t13.min_today_time_stamp))<=72*60*60,1,0) as is_72h_pay_brand , if((local_time-if(local_time=t13.min_today_time_stamp,t13.max_time_stamp,t13.min_today_time_stamp))<=168*60*60,1,0) as is_7d_pay_brand , if((local_time-if(local_time=t13.min_today_time_stamp,t13.max_time_stamp,t13.min_today_time_stamp))<=360*60*60,1,0) as is_15d_pay_brand , if((local_time-if(local_time=t13.min_today_time_stamp,t13.max_time_stamp,t13.min_today_time_stamp))<=720*60*60,1,0) as is_30d_pay_brand , if((local_time-if(local_time=t14.min_today_time_stamp,t14.max_time_stamp,t14.min_today_time_stamp))<=24*60*60,1,0) as is_24h_pv_v5cate , if((local_time-if(local_time=t14.min_today_time_stamp,t14.max_time_stamp,t14.min_today_time_stamp))<=48*60*60,1,0) as is_48h_pv_v5cate , if((local_time-if(local_time=t14.min_today_time_stamp,t14.max_time_stamp,t14.min_today_time_stamp))<=72*60*60,1,0) as is_72h_pv_v5cate , if((local_time-if(local_time=t15.min_today_time_stamp,t15.max_time_stamp,t15.min_today_time_stamp))<=24*60*60,1,0) as is_24h_clk_v5cate , if((local_time-if(local_time=t15.min_today_time_stamp,t15.max_time_stamp,t15.min_today_time_stamp))<=48*60*60,1,0) as is_48h_clk_v5cate , if((local_time-if(local_time=t15.min_today_time_stamp,t15.max_time_stamp,t15.min_today_time_stamp))<=72*60*60,1,0) as is_72h_clk_v5cate , if((local_time-if(local_time=t15.min_today_time_stamp,t15.max_time_stamp,t15.min_today_time_stamp))<=168*60*60,1,0) as is_7d_clk_v5cate , if((local_time-if(local_time=t16.min_today_time_stamp,t16.max_time_stamp,t16.min_today_time_stamp))<=24*60*60,1,0) as is_24h_interact_v5cate , if((local_time-if(local_time=t16.min_today_time_stamp,t16.max_time_stamp,t16.min_today_time_stamp))<=48*60*60,1,0) as is_48h_interact_v5cate , if((local_time-if(local_time=t16.min_today_time_stamp,t16.max_time_stamp,t16.min_today_time_stamp))<=72*60*60,1,0) as is_72h_interact_v5cate , if((local_time-if(local_time=t16.min_today_time_stamp,t16.max_time_stamp,t16.min_today_time_stamp))<=168*60*60,1,0) as is_7d_interact_v5cate , if((local_time-if(local_time=t17.min_today_time_stamp,t17.max_time_stamp,t17.min_today_time_stamp))<=24*60*60,1,0) as is_24h_pay_v5cate , if((local_time-if(local_time=t17.min_today_time_stamp,t17.max_time_stamp,t17.min_today_time_stamp))<=48*60*60,1,0) as is_48h_pay_v5cate , if((local_time-if(local_time=t17.min_today_time_stamp,t17.max_time_stamp,t17.min_today_time_stamp))<=72*60*60,1,0) as is_72h_pay_v5cate , if((local_time-if(local_time=t17.min_today_time_stamp,t17.max_time_stamp,t17.min_today_time_stamp))<=168*60*60,1,0) as is_7d_pay_v5cate , if((local_time-if(local_time=t17.min_today_time_stamp,t17.max_time_stamp,t17.min_today_time_stamp))<=360*60*60,1,0) as is_15d_pay_v5cate , if((local_time-if(local_time=t17.min_today_time_stamp,t17.max_time_stamp,t17.min_today_time_stamp))<=720*60*60,1,0) as is_30d_pay_v5cate , if((local_time-if(local_time=t18.min_today_time_stamp,t18.max_time_stamp,t18.min_today_time_stamp))<=24*60*60,1,0) as is_24h_pv_tt , if((local_time-if(local_time=t18.min_today_time_stamp,t18.max_time_stamp,t18.min_today_time_stamp))<=48*60*60,1,0) as is_48h_pv_tt , if((local_time-if(local_time=t18.min_today_time_stamp,t18.max_time_stamp,t18.min_today_time_stamp))<=72*60*60,1,0) as is_72h_pv_tt , if((local_time-if(local_time=t19.min_today_time_stamp,t19.max_time_stamp,t19.min_today_time_stamp))<=24*60*60,1,0) as is_24h_clk_tt , if((local_time-if(local_time=t19.min_today_time_stamp,t19.max_time_stamp,t19.min_today_time_stamp))<=48*60*60,1,0) as is_48h_clk_tt , if((local_time-if(local_time=t19.min_today_time_stamp,t19.max_time_stamp,t19.min_today_time_stamp))<=72*60*60,1,0) as is_72h_clk_tt , if((local_time-if(local_time=t19.min_today_time_stamp,t19.max_time_stamp,t19.min_today_time_stamp))<=168*60*60,1,0) as is_7d_clk_tt , if((local_time-if(local_time=t20.min_today_time_stamp,t20.max_time_stamp,t20.min_today_time_stamp))<=24*60*60,1,0) as is_24h_interact_tt , if((local_time-if(local_time=t20.min_today_time_stamp,t20.max_time_stamp,t20.min_today_time_stamp))<=48*60*60,1,0) as is_48h_interact_tt , if((local_time-if(local_time=t20.min_today_time_stamp,t20.max_time_stamp,t20.min_today_time_stamp))<=72*60*60,1,0) as is_72h_interact_tt , if((local_time-if(local_time=t20.min_today_time_stamp,t20.max_time_stamp,t20.min_today_time_stamp))<=168*60*60,1,0) as is_7d_interact_tt , if((local_time-if(local_time=t21.min_today_time_stamp,t21.max_time_stamp,t21.min_today_time_stamp))<=24*60*60,1,0) as is_24h_pay_tt , if((local_time-if(local_time=t21.min_today_time_stamp,t21.max_time_stamp,t21.min_today_time_stamp))<=48*60*60,1,0) as is_48h_pay_tt , if((local_time-if(local_time=t21.min_today_time_stamp,t21.max_time_stamp,t21.min_today_time_stamp))<=72*60*60,1,0) as is_72h_pay_tt , if((local_time-if(local_time=t21.min_today_time_stamp,t21.max_time_stamp,t21.min_today_time_stamp))<=168*60*60,1,0) as is_7d_pay_tt , if((local_time-if(local_time=t21.min_today_time_stamp,t21.max_time_stamp,t21.min_today_time_stamp))<=360*60*60,1,0) as is_15d_pay_tt , if((local_time-if(local_time=t21.min_today_time_stamp,t21.max_time_stamp,t21.min_today_time_stamp))<=720*60*60,1,0) as is_30d_pay_tt , if((local_time-if(local_time=t22.min_today_time_stamp,t22.max_time_stamp,t22.min_today_time_stamp))<=24*60*60,1,0) as is_24h_pv_tk , if((local_time-if(local_time=t22.min_today_time_stamp,t22.max_time_stamp,t22.min_today_time_stamp))<=48*60*60,1,0) as is_48h_pv_tk , if((local_time-if(local_time=t22.min_today_time_stamp,t22.max_time_stamp,t22.min_today_time_stamp))<=72*60*60,1,0) as is_72h_pv_tk , if((local_time-if(local_time=t23.min_today_time_stamp,t23.max_time_stamp,t23.min_today_time_stamp))<=24*60*60,1,0) as is_24h_clk_tk , if((local_time-if(local_time=t23.min_today_time_stamp,t23.max_time_stamp,t23.min_today_time_stamp))<=48*60*60,1,0) as is_48h_clk_tk , if((local_time-if(local_time=t23.min_today_time_stamp,t23.max_time_stamp,t23.min_today_time_stamp))<=72*60*60,1,0) as is_72h_clk_tk , if((local_time-if(local_time=t23.min_today_time_stamp,t23.max_time_stamp,t23.min_today_time_stamp))<=168*60*60,1,0) as is_7d_clk_tk , if((local_time-if(local_time=t24.min_today_time_stamp,t24.max_time_stamp,t24.min_today_time_stamp))<=24*60*60,1,0) as is_24h_interact_tk , if((local_time-if(local_time=t24.min_today_time_stamp,t24.max_time_stamp,t24.min_today_time_stamp))<=48*60*60,1,0) as is_48h_interact_tk , if((local_time-if(local_time=t24.min_today_time_stamp,t24.max_time_stamp,t24.min_today_time_stamp))<=72*60*60,1,0) as is_72h_interact_tk , if((local_time-if(local_time=t24.min_today_time_stamp,t24.max_time_stamp,t24.min_today_time_stamp))<=168*60*60,1,0) as is_7d_interact_tk , if((local_time-if(local_time=t25.min_today_time_stamp,t25.max_time_stamp,t25.min_today_time_stamp))<=24*60*60,1,0) as is_24h_pay_tk , if((local_time-if(local_time=t25.min_today_time_stamp,t25.max_time_stamp,t25.min_today_time_stamp))<=48*60*60,1,0) as is_48h_pay_tk , if((local_time-if(local_time=t25.min_today_time_stamp,t25.max_time_stamp,t25.min_today_time_stamp))<=72*60*60,1,0) as is_72h_pay_tk , if((local_time-if(local_time=t25.min_today_time_stamp,t25.max_time_stamp,t25.min_today_time_stamp))<=168*60*60,1,0) as is_7d_pay_tk , if((local_time-if(local_time=t25.min_today_time_stamp,t25.max_time_stamp,t25.min_today_time_stamp))<=360*60*60,1,0) as is_15d_pay_tk , if((local_time-if(local_time=t25.min_today_time_stamp,t25.max_time_stamp,t25.min_today_time_stamp))<=720*60*60,1,0) as is_30d_pay_tk , if((local_time-if(local_time=t26.min_today_time_stamp,t26.max_time_stamp,t26.min_today_time_stamp))<=24*60*60,1,0) as is_24h_pv_xsk , if((local_time-if(local_time=t26.min_today_time_stamp,t26.max_time_stamp,t26.min_today_time_stamp))<=48*60*60,1,0) as is_48h_pv_xsk , if((local_time-if(local_time=t26.min_today_time_stamp,t26.max_time_stamp,t26.min_today_time_stamp))<=72*60*60,1,0) as is_72h_pv_xsk , if((local_time-if(local_time=t27.min_today_time_stamp,t27.max_time_stamp,t27.min_today_time_stamp))<=24*60*60,1,0) as is_24h_clk_xsk , if((local_time-if(local_time=t27.min_today_time_stamp,t27.max_time_stamp,t27.min_today_time_stamp))<=48*60*60,1,0) as is_48h_clk_xsk , if((local_time-if(local_time=t27.min_today_time_stamp,t27.max_time_stamp,t27.min_today_time_stamp))<=72*60*60,1,0) as is_72h_clk_xsk , if((local_time-if(local_time=t27.min_today_time_stamp,t27.max_time_stamp,t27.min_today_time_stamp))<=168*60*60,1,0) as is_7d_clk_xsk , if((local_time-if(local_time=t28.min_today_time_stamp,t28.max_time_stamp,t28.min_today_time_stamp))<=24*60*60,1,0) as is_24h_interact_xsk , if((local_time-if(local_time=t28.min_today_time_stamp,t28.max_time_stamp,t28.min_today_time_stamp))<=48*60*60,1,0) as is_48h_interact_xsk , if((local_time-if(local_time=t28.min_today_time_stamp,t28.max_time_stamp,t28.min_today_time_stamp))<=72*60*60,1,0) as is_72h_interact_xsk , if((local_time-if(local_time=t28.min_today_time_stamp,t28.max_time_stamp,t28.min_today_time_stamp))<=168*60*60,1,0) as is_7d_interact_xsk , if((local_time-if(local_time=t29.min_today_time_stamp,t29.max_time_stamp,t29.min_today_time_stamp))<=24*60*60,1,0) as is_24h_pay_xsk , if((local_time-if(local_time=t29.min_today_time_stamp,t29.max_time_stamp,t29.min_today_time_stamp))<=48*60*60,1,0) as is_48h_pay_xsk , if((local_time-if(local_time=t29.min_today_time_stamp,t29.max_time_stamp,t29.min_today_time_stamp))<=72*60*60,1,0) as is_72h_pay_xsk , if((local_time-if(local_time=t29.min_today_time_stamp,t29.max_time_stamp,t29.min_today_time_stamp))<=168*60*60,1,0) as is_7d_pay_xsk , if((local_time-if(local_time=t29.min_today_time_stamp,t29.max_time_stamp,t29.min_today_time_stamp))<=360*60*60,1,0) as is_15d_pay_xsk , if((local_time-if(local_time=t29.min_today_time_stamp,t29.max_time_stamp,t29.min_today_time_stamp))<=720*60*60,1,0) as is_30d_pay_xsk , if((local_time-if(local_time=t30.min_today_time_stamp,t30.max_time_stamp,t30.min_today_time_stamp))<=24*60*60,1,0) as is_24h_pv_v5cluster , if((local_time-if(local_time=t30.min_today_time_stamp,t30.max_time_stamp,t30.min_today_time_stamp))<=48*60*60,1,0) as is_48h_pv_v5cluster , if((local_time-if(local_time=t30.min_today_time_stamp,t30.max_time_stamp,t30.min_today_time_stamp))<=72*60*60,1,0) as is_72h_pv_v5cluster , if((local_time-if(local_time=t31.min_today_time_stamp,t31.max_time_stamp,t31.min_today_time_stamp))<=24*60*60,1,0) as is_24h_clk_v5cluster , if((local_time-if(local_time=t31.min_today_time_stamp,t31.max_time_stamp,t31.min_today_time_stamp))<=48*60*60,1,0) as is_48h_clk_v5cluster , if((local_time-if(local_time=t31.min_today_time_stamp,t31.max_time_stamp,t31.min_today_time_stamp))<=72*60*60,1,0) as is_72h_clk_v5cluster , if((local_time-if(local_time=t31.min_today_time_stamp,t31.max_time_stamp,t31.min_today_time_stamp))<=168*60*60,1,0) as is_7d_clk_v5cluster , if((local_time-if(local_time=t32.min_today_time_stamp,t32.max_time_stamp,t32.min_today_time_stamp))<=24*60*60,1,0) as is_24h_interact_v5cluster , if((local_time-if(local_time=t32.min_today_time_stamp,t32.max_time_stamp,t32.min_today_time_stamp))<=48*60*60,1,0) as is_48h_interact_v5cluster , if((local_time-if(local_time=t32.min_today_time_stamp,t32.max_time_stamp,t32.min_today_time_stamp))<=72*60*60,1,0) as is_72h_interact_v5cluster , if((local_time-if(local_time=t32.min_today_time_stamp,t32.max_time_stamp,t32.min_today_time_stamp))<=168*60*60,1,0) as is_7d_interact_v5cluster , if((local_time-if(local_time=t33.min_today_time_stamp,t33.max_time_stamp,t33.min_today_time_stamp))<=24*60*60,1,0) as is_24h_pay_v5cluster , if((local_time-if(local_time=t33.min_today_time_stamp,t33.max_time_stamp,t33.min_today_time_stamp))<=48*60*60,1,0) as is_48h_pay_v5cluster , if((local_time-if(local_time=t33.min_today_time_stamp,t33.max_time_stamp,t33.min_today_time_stamp))<=72*60*60,1,0) as is_72h_pay_v5cluster , if((local_time-if(local_time=t33.min_today_time_stamp,t33.max_time_stamp,t33.min_today_time_stamp))<=168*60*60,1,0) as is_7d_pay_v5cluster , if((local_time-if(local_time=t33.min_today_time_stamp,t33.max_time_stamp,t33.min_today_time_stamp))<=360*60*60,1,0) as is_15d_pay_v5cluster , if((local_time-if(local_time=t33.min_today_time_stamp,t33.max_time_stamp,t33.min_today_time_stamp))<=720*60*60,1,0) as is_30d_pay_v5cluster -- 推荐场域 , if((local_time-if(local_time=t2.min_today_time_stamp_rec,t2.max_time_stamp_rec,t2.min_today_time_stamp_rec))<=24*60*60,1,0) as is_24h_pv_item_rec , if((local_time-if(local_time=t2.min_today_time_stamp_rec,t2.max_time_stamp_rec,t2.min_today_time_stamp_rec))<=48*60*60,1,0) as is_48h_pv_item_rec , if((local_time-if(local_time=t2.min_today_time_stamp_rec,t2.max_time_stamp_rec,t2.min_today_time_stamp_rec))<=72*60*60,1,0) as is_72h_pv_item_rec , if((local_time-if(local_time=t3.min_today_time_stamp_rec,t3.max_time_stamp_rec,t3.min_today_time_stamp_rec))<=24*60*60,1,0) as is_24h_clk_item_rec , if((local_time-if(local_time=t3.min_today_time_stamp_rec,t3.max_time_stamp_rec,t3.min_today_time_stamp_rec))<=48*60*60,1,0) as is_48h_clk_item_rec , if((local_time-if(local_time=t3.min_today_time_stamp_rec,t3.max_time_stamp_rec,t3.min_today_time_stamp_rec))<=72*60*60,1,0) as is_72h_clk_item_rec , if((local_time-if(local_time=t3.min_today_time_stamp_rec,t3.max_time_stamp_rec,t3.min_today_time_stamp_rec))<=168*60*60,1,0) as is_7d_clk_item_rec , if((local_time-if(local_time=t6.min_today_time_stamp_rec,t6.max_time_stamp_rec,t6.min_today_time_stamp_rec))<=24*60*60,1,0) as is_24h_pv_seller_rec , if((local_time-if(local_time=t6.min_today_time_stamp_rec,t6.max_time_stamp_rec,t6.min_today_time_stamp_rec))<=48*60*60,1,0) as is_48h_pv_seller_rec , if((local_time-if(local_time=t6.min_today_time_stamp_rec,t6.max_time_stamp_rec,t6.min_today_time_stamp_rec))<=72*60*60,1,0) as is_72h_pv_seller_rec , if((local_time-if(local_time=t7.min_today_time_stamp_rec,t7.max_time_stamp_rec,t7.min_today_time_stamp_rec))<=24*60*60,1,0) as is_24h_clk_seller_rec , if((local_time-if(local_time=t7.min_today_time_stamp_rec,t7.max_time_stamp_rec,t7.min_today_time_stamp_rec))<=48*60*60,1,0) as is_48h_clk_seller_rec , if((local_time-if(local_time=t7.min_today_time_stamp_rec,t7.max_time_stamp_rec,t7.min_today_time_stamp_rec))<=72*60*60,1,0) as is_72h_clk_seller_rec , if((local_time-if(local_time=t7.min_today_time_stamp_rec,t7.max_time_stamp_rec,t7.min_today_time_stamp_rec))<=168*60*60,1,0) as is_7d_clk_seller_rec , if((local_time-if(local_time=t10.min_today_time_stamp_rec,t10.max_time_stamp_rec,t10.min_today_time_stamp_rec))<=24*60*60,1,0) as is_24h_pv_brand_rec , if((local_time-if(local_time=t10.min_today_time_stamp_rec,t10.max_time_stamp_rec,t10.min_today_time_stamp_rec))<=48*60*60,1,0) as is_48h_pv_brand_rec , if((local_time-if(local_time=t10.min_today_time_stamp_rec,t10.max_time_stamp_rec,t10.min_today_time_stamp_rec))<=72*60*60,1,0) as is_72h_pv_brand_rec , if((local_time-if(local_time=t11.min_today_time_stamp_rec,t11.max_time_stamp_rec,t11.min_today_time_stamp_rec))<=24*60*60,1,0) as is_24h_clk_brand_rec , if((local_time-if(local_time=t11.min_today_time_stamp_rec,t11.max_time_stamp_rec,t11.min_today_time_stamp_rec))<=48*60*60,1,0) as is_48h_clk_brand_rec , if((local_time-if(local_time=t11.min_today_time_stamp_rec,t11.max_time_stamp_rec,t11.min_today_time_stamp_rec))<=72*60*60,1,0) as is_72h_clk_brand_rec , if((local_time-if(local_time=t11.min_today_time_stamp_rec,t11.max_time_stamp_rec,t11.min_today_time_stamp_rec))<=168*60*60,1,0) as is_7d_clk_brand_rec , if((local_time-if(local_time=t14.min_today_time_stamp_rec,t14.max_time_stamp_rec,t14.min_today_time_stamp_rec))<=24*60*60,1,0) as is_24h_pv_v5cate_rec , if((local_time-if(local_time=t14.min_today_time_stamp_rec,t14.max_time_stamp_rec,t14.min_today_time_stamp_rec))<=48*60*60,1,0) as is_48h_pv_v5cate_rec , if((local_time-if(local_time=t14.min_today_time_stamp_rec,t14.max_time_stamp_rec,t14.min_today_time_stamp_rec))<=72*60*60,1,0) as is_72h_pv_v5cate_rec , if((local_time-if(local_time=t15.min_today_time_stamp_rec,t15.max_time_stamp_rec,t15.min_today_time_stamp_rec))<=24*60*60,1,0) as is_24h_clk_v5cate_rec , if((local_time-if(local_time=t15.min_today_time_stamp_rec,t15.max_time_stamp_rec,t15.min_today_time_stamp_rec))<=48*60*60,1,0) as is_48h_clk_v5cate_rec , if((local_time-if(local_time=t15.min_today_time_stamp_rec,t15.max_time_stamp_rec,t15.min_today_time_stamp_rec))<=72*60*60,1,0) as is_72h_clk_v5cate_rec , if((local_time-if(local_time=t15.min_today_time_stamp_rec,t15.max_time_stamp_rec,t15.min_today_time_stamp_rec))<=168*60*60,1,0) as is_7d_clk_v5cate_rec , if((local_time-if(local_time=t18.min_today_time_stamp_rec,t18.max_time_stamp_rec,t18.min_today_time_stamp_rec))<=24*60*60,1,0) as is_24h_pv_tt_rec , if((local_time-if(local_time=t18.min_today_time_stamp_rec,t18.max_time_stamp_rec,t18.min_today_time_stamp_rec))<=48*60*60,1,0) as is_48h_pv_tt_rec , if((local_time-if(local_time=t18.min_today_time_stamp_rec,t18.max_time_stamp_rec,t18.min_today_time_stamp_rec))<=72*60*60,1,0) as is_72h_pv_tt_rec , if((local_time-if(local_time=t19.min_today_time_stamp_rec,t19.max_time_stamp_rec,t19.min_today_time_stamp_rec))<=24*60*60,1,0) as is_24h_clk_tt_rec , if((local_time-if(local_time=t19.min_today_time_stamp_rec,t19.max_time_stamp_rec,t19.min_today_time_stamp_rec))<=48*60*60,1,0) as is_48h_clk_tt_rec , if((local_time-if(local_time=t19.min_today_time_stamp_rec,t19.max_time_stamp_rec,t19.min_today_time_stamp_rec))<=72*60*60,1,0) as is_72h_clk_tt_rec , if((local_time-if(local_time=t19.min_today_time_stamp_rec,t19.max_time_stamp_rec,t19.min_today_time_stamp_rec))<=168*60*60,1,0) as is_7d_clk_tt_rec , if((local_time-if(local_time=t22.min_today_time_stamp_rec,t22.max_time_stamp_rec,t22.min_today_time_stamp_rec))<=24*60*60,1,0) as is_24h_pv_tk_rec , if((local_time-if(local_time=t22.min_today_time_stamp_rec,t22.max_time_stamp_rec,t22.min_today_time_stamp_rec))<=48*60*60,1,0) as is_48h_pv_tk_rec , if((local_time-if(local_time=t22.min_today_time_stamp_rec,t22.max_time_stamp_rec,t22.min_today_time_stamp_rec))<=72*60*60,1,0) as is_72h_pv_tk_rec , if((local_time-if(local_time=t23.min_today_time_stamp_rec,t23.max_time_stamp_rec,t23.min_today_time_stamp_rec))<=24*60*60,1,0) as is_24h_clk_tk_rec , if((local_time-if(local_time=t23.min_today_time_stamp_rec,t23.max_time_stamp_rec,t23.min_today_time_stamp_rec))<=48*60*60,1,0) as is_48h_clk_tk_rec , if((local_time-if(local_time=t23.min_today_time_stamp_rec,t23.max_time_stamp_rec,t23.min_today_time_stamp_rec))<=72*60*60,1,0) as is_72h_clk_tk_rec , if((local_time-if(local_time=t23.min_today_time_stamp_rec,t23.max_time_stamp_rec,t23.min_today_time_stamp_rec))<=168*60*60,1,0) as is_7d_clk_tk_rec , if((local_time-if(local_time=t26.min_today_time_stamp_rec,t26.max_time_stamp_rec,t26.min_today_time_stamp_rec))<=24*60*60,1,0) as is_24h_pv_xsk_rec , if((local_time-if(local_time=t26.min_today_time_stamp_rec,t26.max_time_stamp_rec,t26.min_today_time_stamp_rec))<=48*60*60,1,0) as is_48h_pv_xsk_rec , if((local_time-if(local_time=t26.min_today_time_stamp_rec,t26.max_time_stamp_rec,t26.min_today_time_stamp_rec))<=72*60*60,1,0) as is_72h_pv_xsk_rec , if((local_time-if(local_time=t27.min_today_time_stamp_rec,t27.max_time_stamp_rec,t27.min_today_time_stamp_rec))<=24*60*60,1,0) as is_24h_clk_xsk_rec , if((local_time-if(local_time=t27.min_today_time_stamp_rec,t27.max_time_stamp_rec,t27.min_today_time_stamp_rec))<=48*60*60,1,0) as is_48h_clk_xsk_rec , if((local_time-if(local_time=t27.min_today_time_stamp_rec,t27.max_time_stamp_rec,t27.min_today_time_stamp_rec))<=72*60*60,1,0) as is_72h_clk_xsk_rec , if((local_time-if(local_time=t27.min_today_time_stamp_rec,t27.max_time_stamp_rec,t27.min_today_time_stamp_rec))<=168*60*60,1,0) as is_7d_clk_xsk_rec , if((local_time-if(local_time=t30.min_today_time_stamp_rec,t30.max_time_stamp_rec,t30.min_today_time_stamp_rec))<=24*60*60,1,0) as is_24h_pv_v5cluster_rec , if((local_time-if(local_time=t30.min_today_time_stamp_rec,t30.max_time_stamp_rec,t30.min_today_time_stamp_rec))<=48*60*60,1,0) as is_48h_pv_v5cluster_rec , if((local_time-if(local_time=t30.min_today_time_stamp_rec,t30.max_time_stamp_rec,t30.min_today_time_stamp_rec))<=72*60*60,1,0) as is_72h_pv_v5cluster_rec , if((local_time-if(local_time=t31.min_today_time_stamp_rec,t31.max_time_stamp_rec,t31.min_today_time_stamp_rec))<=24*60*60,1,0) as is_24h_clk_v5cluster_rec , if((local_time-if(local_time=t31.min_today_time_stamp_rec,t31.max_time_stamp_rec,t31.min_today_time_stamp_rec))<=48*60*60,1,0) as is_48h_clk_v5cluster_rec , if((local_time-if(local_time=t31.min_today_time_stamp_rec,t31.max_time_stamp_rec,t31.min_today_time_stamp_rec))<=72*60*60,1,0) as is_72h_clk_v5cluster_rec , if((local_time-if(local_time=t31.min_today_time_stamp_rec,t31.max_time_stamp_rec,t31.min_today_time_stamp_rec))<=168*60*60,1,0) as is_7d_clk_v5cluster_rec , local_time , index_id , pvid , leaf_flow_code from ( select * from sr_ds.dwd_tb_rec_sc_session_reimps_1d where ds = '${bizdate}' -- and leaf_flow_code in ( '1000000130' ) )t1 left join ( select * from sr_ds.dwd_user_allnet_pv_3d_group where ds = '${bizdate}' and type='item' )t2 on t1.user_id = t2.user_id and t1.item_id = t2.id left join ( select * from sr_ds.dwd_user_allnet_clk_7d_group where ds = '${bizdate}' and type='item' )t3 on t1.user_id = t3.user_id and t1.item_id = t3.id left join ( select * from sr_ds.dwd_user_allnet_interact_7d_group where ds = '${bizdate}' and type='item' )t4 on t1.user_id = t4.user_id and t1.item_id = t4.id left join ( select * from sr_ds.dwd_user_allnet_pay_30d_group where ds = '${bizdate}' and type='item' )t5 on t1.user_id = t5.user_id and t1.item_id = t5.id ------seller------------- left join ( select * from sr_ds.dwd_user_allnet_pv_3d_group where ds = '${bizdate}' and type='seller' )t6 on t1.user_id = t6.user_id and t1.seller_id = t6.id left join ( select * from sr_ds.dwd_user_allnet_clk_7d_group where ds = '${bizdate}' and type='seller' )t7 on t1.user_id = t7.user_id and t1.seller_id = t7.id left join ( select * from sr_ds.dwd_user_allnet_interact_7d_group where ds = '${bizdate}' and type='seller' )t8 on t1.user_id = t8.user_id and t1.seller_id = t8.id left join ( select * from sr_ds.dwd_user_allnet_pay_30d_group where ds = '${bizdate}' and type='seller' )t9 on t1.user_id = t9.user_id and t1.seller_id = t9.id --brand_id left join ( select * from sr_ds.dwd_user_allnet_pv_3d_group where ds = '${bizdate}' and type='brand' )t10 on t1.user_id = t10.user_id and t1.org_brand_id = t10.id left join ( select * from sr_ds.dwd_user_allnet_clk_7d_group where ds = '${bizdate}' and type='brand' )t11 on t1.user_id = t11.user_id and t1.org_brand_id = t11.id left join ( select * from sr_ds.dwd_user_allnet_interact_7d_group where ds = '${bizdate}' and type='brand' )t12 on t1.user_id = t12.user_id and t1.org_brand_id = t12.id left join ( select * from sr_ds.dwd_user_allnet_pay_30d_group where ds = '${bizdate}' and type='brand' )t13 on t1.user_id = t13.user_id and t1.org_brand_id = t13.id --taggingV5 left join ( select * from sr_ds.dwd_user_allnet_pv_3d_group where ds = '${bizdate}' and type='taggingV5' )t14 on t1.user_id = t14.user_id and t1.taggingv5_cate = t14.id left join ( select * from sr_ds.dwd_user_allnet_clk_7d_group where ds = '${bizdate}' and type='taggingV5' )t15 on t1.user_id = t15.user_id and t1.taggingv5_cate = t15.id left join ( select * from sr_ds.dwd_user_allnet_interact_7d_group where ds = '${bizdate}' and type='taggingV5' )t16 on t1.user_id = t16.user_id and t1.taggingv5_cate = t16.id left join ( select * from sr_ds.dwd_user_allnet_pay_30d_group where ds = '${bizdate}' and type='taggingV5' )t17 on t1.user_id = t17.user_id and t1.taggingv5_cate = t17.id --tt left join ( select * from sr_ds.dwd_user_allnet_pv_3d_group where ds = '${bizdate}' and type='tt' )t18 on t1.user_id = t18.user_id and t1.tt_cluster_id = t18.id left join ( select * from sr_ds.dwd_user_allnet_clk_7d_group where ds = '${bizdate}' and type='tt' )t19 on t1.user_id = t19.user_id and t1.tt_cluster_id = t19.id left join ( select * from sr_ds.dwd_user_allnet_interact_7d_group where ds = '${bizdate}' and type='tt' )t20 on t1.user_id = t20.user_id and t1.tt_cluster_id = t20.id left join ( select * from sr_ds.dwd_user_allnet_pay_30d_group where ds = '${bizdate}' and type='tt' )t21 on t1.user_id = t21.user_id and t1.tt_cluster_id = t21.id --tk left join ( select * from sr_ds.dwd_user_allnet_pv_3d_group where ds = '${bizdate}' and type='tk' )t22 on t1.user_id = t22.user_id and t1.tk_cluster_id = t22.id left join ( select * from sr_ds.dwd_user_allnet_clk_7d_group where ds = '${bizdate}' and type='tk' )t23 on t1.user_id = t23.user_id and t1.tk_cluster_id = t23.id left join ( select * from sr_ds.dwd_user_allnet_interact_7d_group where ds = '${bizdate}' and type='tk' )t24 on t1.user_id = t24.user_id and t1.tk_cluster_id = t24.id left join ( select * from sr_ds.dwd_user_allnet_pay_30d_group where ds = '${bizdate}' and type='tk' )t25 on t1.user_id = t25.user_id and t1.tk_cluster_id = t25.id --xsk left join ( select * from sr_ds.dwd_user_allnet_pv_3d_group where ds = '${bizdate}' and type='xsk' )t26 on t1.user_id = t26.user_id and t1.xsk_cluster_id = t26.id left join ( select * from sr_ds.dwd_user_allnet_clk_7d_group where ds = '${bizdate}' and type='xsk' )t27 on t1.user_id = t27.user_id and t1.xsk_cluster_id = t27.id left join ( select * from sr_ds.dwd_user_allnet_interact_7d_group where ds = '${bizdate}' and type='xsk' )t28 on t1.user_id = t28.user_id and t1.xsk_cluster_id = t28.id left join ( select * from sr_ds.dwd_user_allnet_pay_30d_group where ds = '${bizdate}' and type='xsk' )t29 on t1.user_id = t29.user_id and t1.xsk_cluster_id = t29.id --v5_cluster_id left join ( select * from sr_ds.dwd_user_allnet_pv_3d_group where ds = '${bizdate}' and type='v5_cluster' )t30 on t1.user_id = t30.user_id and t1.v5_cluster_id = t30.id left join ( select * from sr_ds.dwd_user_allnet_clk_7d_group where ds = '${bizdate}' and type='v5_cluster' )t31 on t1.user_id = t31.user_id and t1.v5_cluster_id = t31.id left join ( select * from sr_ds.dwd_user_allnet_interact_7d_group where ds = '${bizdate}' and type='v5_cluster' )t32 on t1.user_id = t32.user_id and t1.v5_cluster_id = t32.id left join ( select * from sr_ds.dwd_user_allnet_pay_30d_group where ds = '${bizdate}' and type='v5_cluster' )t33 on t1.user_id = t33.user_id and t1.v5_cluster_id = t33.id )
代码很长,我们先不关注代码本身的目的及其逻辑问题,在这个作业中一共有32个JOIN过程,其中本质有5张源表参与,除1张主表外,其余每张从表分别经过`type`字段条件过滤后变成8张子表,即一共有32张从表。作业运行实例DAG图如下:
可以看到DAG图中一共有8个JOIN节点,说明有8个不同的JOIN Key,也就是对应上述`type`字段的8个枚举值过滤条件,而在代码中我们可以简单任选一段JOIN来看:
类似地,其它7个过滤条件和JOIN Key也与上图类似,虽然代码很长,其实本身逻辑十分简单。但平静的水面下往往暗流涌动,我们点进其中1张源表查看其表组成和分区信息:
这样一看着实令人不太淡定,一张每日都有超过6TB总存储量的日全量表竟然没有其它分区字段?如果是没有显著分区条件倒还能理解,但既然在上述SQL作业中这4张源表都需要根据`type`字段进行过滤,并且经验证这几张表的其它下游作业也均使用到了`type`字段作为过滤条件,那么为什么不用`type`也作为分区字段呢?这显然是一个不设分区导致的被动全表扫描问题,对集群资源和稳定性可能造成的影响此处就不再赘述了。
然而,这样不设分区全表扫描导致的问题就到此为止了吗?当这个作业的下游逐渐增多,越多越多的业务需要接入,这个时候对作业的产出实效和稳定性要求也会愈发苛刻,而由于底表未分区这种底层建设问题所引发的未来优化成本将会是十分高昂的。仍然以上述SQL作业为例,现在因时效问题我们需要对其进行优化改进,因此必须结合其运行实例过程进行分析然后制定优化策略,但光靠看上述DAG图我们压根无法快速得出每部分JOIN的从表体量分别有多大,即便费尽力气将1天的从表体量计算出来,也不能只依据这一天的数据就设定优化策略,还需要以这样的方式计算多天得到较为明确的数据体量分布,效率极其低下,几乎不可能以这种方式实现。这时候如果底表建设时考虑周全设定了分区,我们就只需要通过数据地图查看每日分区产出信息就能很快速地制定大方向上的优化策略,暂且不说效率上提升多少,这件事也从“不行”瞬间变为了“可行”。然而,如果现在痛定思痛决定去进行底表改造,不仅需要提前沟通上下游做好数据备份,还需要回刷数据调整调度依赖,徒增工时。
5.2 案例2——数据倾斜,效能低下
先看原SQL:
案例2 SQL
set odps.sql.mapper.split.size = 4096; select /*+mapjoin(B, D)*/ user_id ,split_part(coalesce( tbbi.get_biz_module( '${bizdate}' ,coalesce( if(x_ad = '1'and x_object_type not in ('zuanshiad', 'txad', 'texiu'), concat('ad_', x_object_type), null) ,x_object_type ) ,'rec_subobj' ) ,'未归类' ), '.', 1) as x_biz ,ind1_name ,0 as home_lead_dye_pay_cnt ,0 as home_lead_dye_pay_amt ,0 as home_lead_dye_pay_cnt_direct ,0 as home_lead_dye_pay_amt_direct ,0 as home_lead_dye_pay_cnt_indirect ,0 as home_lead_dye_pay_amt_indirect ,0 as home_lead_dye_pay_cnt_other_today ,0 as home_lead_dye_pay_amt_other_today ,0 as home_lead_dye_pay_cnt_other_not_today ,0 as home_lead_dye_pay_amt_other_not_today ,if(has_trigger = 1, ipv, 0) as flow_extra_ranse_ipv ,if(has_trigger = 1and trigger_click_ds = '${bizdate}', ipv, 0) as flow_extra_ranse_ipv_today ,if(has_trigger = 1and trigger_click_ds != '${bizdate}', ipv, 0) as flow_extra_ranse_ipv_not_today ,if(A.order_id is not null and B.order_id is null, 1, 0) as flow_dye_pay_cnt ,if(A.order_id is not null and B.order_id is null, pay_ord_amt_1d, 0) as flow_dye_pay_amt ,if(flow_name = '手淘首页猜你喜欢'and A.order_id is not null and B.order_id is null, 1, 0) as flow_dye_pay_cnt_direct ,if(flow_name = '手淘首页猜你喜欢'and A.order_id is not null and B.order_id is null, pay_ord_amt_1d, 0) as flow_dye_pay_amt_direct ,if(flow_name != '手淘首页猜你喜欢'and A.order_id is not null and B.order_id is null, 1, 0) as flow_dye_pay_cnt_indirect ,if(flow_name != '手淘首页猜你喜欢'and A.order_id is not null and B.order_id is null, pay_ord_amt_1d, 0) as flow_dye_pay_amt_indirect ,if(trigger_click_ds = '${bizdate}'and A.order_id is not null and B.order_id is null, 1, 0) as flow_dye_pay_cnt_other_today ,if(trigger_click_ds = '${bizdate}'and A.order_id is not null and B.order_id is null, pay_ord_amt_1d, 0) as flow_dye_pay_amt_other_today ,if(trigger_click_ds != '${bizdate}'and A.order_id is not null and B.order_id is null, 1, 0) as flow_dye_pay_cnt_other_not_today ,if(trigger_click_ds != '${bizdate}'and A.order_id is not null and B.order_id is null, pay_ord_amt_1d, 0) as flow_dye_pay_amt_other_not_today from( select user_id, flow_name, if(has_trigger = 1, trigger_object_type, sub_item_object_type) as x_object_type, if(has_trigger = 1, trigger_x_ad, sub_x_ad) as x_ad, has_trigger, trigger_click_ds, sub_item_id as item_id, get_json_object(nd_params, '$.cardnum') as cardnum, order_id, pay_ord_amt_1d, pay_ord_itm_qty_1d, ipv from sr_ds.dws_tb_rec_dye_ord_cvr_label_1d where ds = '${bizdate}' and ( flow_name = '手淘首页猜你喜欢' or nd_entry_flow_name = '手淘首页猜你喜欢' ) ) as A left join( select order_id from sr_ds.dwd_tb_anti_pure_order_d where ds = '${bizdate}' group by order_id ) as B on A.order_id = B.order_id left join( select item_id ,to_char(cate_id) as cate_id from tbcdm.dim_tb_itm where ds = '${bizdate}' ) as C on A.item_id = C.item_id left join( select cate_id ,ind1_id ,ind1_name from tbcdm.dim_tm_cate_industry where ds = '${bizdate}' ) as D on C.cate_id = D.cate_id ;
代码目的不重要,只看结构其实很简单,就是一张主表和三张维表JOIN的过程,其中维表B和D已经使用了MapJoin来加速,作业运行实例DAG图如下:
其中两次MapJoin分别在M4和J6阶段进行,整体乍一看似乎没什么问题,SQL流程也完全如预想一样,但实际结果是:该作业运行时长竟然达到了6h18min!其中J6阶段JOIN过程更是然持续了足足6h11min!几乎整个作业都是被JOIN阶段卡住。这么一个无论是数据量还是自身复杂度都不高的、看似十分正常的任务,就这样在集群中持续占用了这么久的资源,其对于后续的数据产出和下游任务而言更是灾难性的。为了解决这个问题,我们需要搞清楚造成JOIN阶段迟滞的根本原因,究竟是资源问题导致,还是任务本身的确存在问题。
通过观察J6阶段的实例细览很快可以发现,大部分task的数据输入量和运行时长都相差不大处于正常差异范围内,但其中有1个特别明显的长尾task,运行时长达到了6h11min,这也正是导致了JOIN阶段冗长的原因。经过与其它正常task的数据输入量进行对比,长尾task的数据输入行数多了近10倍,因此可以判断造成长尾的主要原因是数据倾斜。
那么根据DAG图所示的现象,我们就可以快速反向定位到代码中的问题片段,由于MapJoin是不会有如此显著的倾斜问题的,所以倾斜现象大概率是在JOIN表C时造成的,即主表的item_id字段有倾斜。为了验证这一猜想,我们可以针对主表的item_id进行数据分布探查,看看占比最多的item_id都有哪些就一目了然了,探查sql和结果如下:
这个结果与我们的猜想几乎吻合,item_id为“0”的脏数据量达到了亿条,而其它的正常数据最多也只是百万条,如此大的量级差异进而导致了严重的数据倾斜!那么既然问题已经明了,解决方案也就信手拈来了,我们只需要针对item_id的热点(脏)数据进行打散然后JOIN即可。这里可以直接运用MaxCompute为我们提供的SkewJoin方法,具体使用方式上文已有叙述故此处省略,唯一需要注意的是使用SkewJoin不能和MapJoin冲突,需要把普通Join前置才能触发SkewJoin。经改写后的SQL和DAG图如下:
案例2 改进SQL
set odps.sql.mapper.split.size = 4096; select /*+mapjoin(B, D), skewjoin(A(item_id))*/ user_id ,split_part(coalesce( tbbi.get_biz_module( '${bizdate}' ,coalesce( if(x_ad = '1'and x_object_type not in ('zuanshiad', 'txad', 'texiu'), concat('ad_', x_object_type), null) ,x_object_type ) ,'rec_subobj' ) ,'未归类' ), '.', 1) as x_biz ,ind1_name ,0 as home_lead_dye_pay_cnt ,0 as home_lead_dye_pay_amt ,0 as home_lead_dye_pay_cnt_direct ,0 as home_lead_dye_pay_amt_direct ,0 as home_lead_dye_pay_cnt_indirect ,0 as home_lead_dye_pay_amt_indirect ,0 as home_lead_dye_pay_cnt_other_today ,0 as home_lead_dye_pay_amt_other_today ,0 as home_lead_dye_pay_cnt_other_not_today ,0 as home_lead_dye_pay_amt_other_not_today ,if(has_trigger = 1, ipv, 0) as flow_extra_ranse_ipv ,if(has_trigger = 1and trigger_click_ds = '${bizdate}', ipv, 0) as flow_extra_ranse_ipv_today ,if(has_trigger = 1and trigger_click_ds != '${bizdate}', ipv, 0) as flow_extra_ranse_ipv_not_today ,if(A.order_id is not null and B.order_id is null, 1, 0) as flow_dye_pay_cnt ,if(A.order_id is not null and B.order_id is null, pay_ord_amt_1d, 0) as flow_dye_pay_amt ,if(flow_name = '手淘首页猜你喜欢'and A.order_id is not null and B.order_id is null, 1, 0) as flow_dye_pay_cnt_direct ,if(flow_name = '手淘首页猜你喜欢'and A.order_id is not null and B.order_id is null, pay_ord_amt_1d, 0) as flow_dye_pay_amt_direct ,if(flow_name != '手淘首页猜你喜欢'and A.order_id is not null and B.order_id is null, 1, 0) as flow_dye_pay_cnt_indirect ,if(flow_name != '手淘首页猜你喜欢'and A.order_id is not null and B.order_id is null, pay_ord_amt_1d, 0) as flow_dye_pay_amt_indirect ,if(trigger_click_ds = '${bizdate}'and A.order_id is not null and B.order_id is null, 1, 0) as flow_dye_pay_cnt_other_today ,if(trigger_click_ds = '${bizdate}'and A.order_id is not null and B.order_id is null, pay_ord_amt_1d, 0) as flow_dye_pay_amt_other_today ,if(trigger_click_ds != '${bizdate}'and A.order_id is not null and B.order_id is null, 1, 0) as flow_dye_pay_cnt_other_not_today ,if(trigger_click_ds != '${bizdate}'and A.order_id is not null and B.order_id is null, pay_ord_amt_1d, 0) as flow_dye_pay_amt_other_not_today from( select user_id, flow_name, if(has_trigger = 1, trigger_object_type, sub_item_object_type) as x_object_type, if(has_trigger = 1, trigger_x_ad, sub_x_ad) as x_ad, has_trigger, trigger_click_ds, sub_item_id as item_id, get_json_object(nd_params, '$.cardnum') as cardnum, order_id, pay_ord_amt_1d, pay_ord_itm_qty_1d, ipv from sr_ds.dws_tb_rec_dye_ord_cvr_label_1d where ds = '${bizdate}' and ( flow_name = '手淘首页猜你喜欢' or nd_entry_flow_name = '手淘首页猜你喜欢' ) ) as A left join( select item_id ,to_char(cate_id) as cate_id from tbcdm.dim_tb_itm where ds = '${bizdate}' ) as C on A.item_id = C.item_id left join( select order_id from sr_ds.dwd_tb_anti_pure_order_d where ds = '${bizdate}' group by order_id ) as B on A.order_id = B.order_id left join( select cate_id ,ind1_id ,ind1_name from tbcdm.dim_tm_cate_industry where ds = '${bizdate}' ) as D on C.cate_id = D.cate_id ;
对比两个DAG图后即可以快速发现,触发skewjoin(A(item_id))后,底层会自动帮我们将主表根据热点Key进行拆分然后计算,避免数据倾斜问题。经过优化后该任务产出时间由原先的6h+减少到1h以内,优化幅度高达83%!正所谓“不积跬步,无以至千里;不积小流,无以成江海”,短短一行代码加上一点顺序调整,就获得了如此高的收益,这都离不开我们的日常积累和勇于尝试。
5.3 案例3——无序JOIN,徒增消耗
先看原SQL:
案例3 SQL
set odps.sql.mapper.split.size=8192; insert overwrite table sr_ds.dwd_tb_rec_sc_session_reimps_1d partition(ds = '${bizdate}', leaf_flow_code) with base as ( select t1.visitor_id user_id , x_object_id , t1.item_id , local_time , seller_id , org_brand_id , t4.taggingV5_cate , t4.taggingV5_cate_name , t4.taggingV5_first , tt_cluster_id , tk_cluster_id , xsk_cluster_id , v5_cluster_id , session_id_ut , index_id , real_index_id , page_num , isFixPos , if((isFixPos!='true'or isFixPos is null ),1,0) is_reserve -- 新的去定坑逻辑,20241105更改 , app_version , os , os_version , itemOriMatchType , if(x_biz_original is null, null, coalesce(split_part(x_biz_original, '.', 1), '未归类')) as x_biz , if(x_biz_original is null, null, coalesce(split_part(x_biz_original, '.', 2), '未归类')) as x_sub_biz , card_type , card_subtype , rec_sessionid , pvid , t2.cate_id , ind1_name , cate_level1_name , cate_level2_name , cate_name , xcat1_id , xcat1_name , xcat2_id , xcat2_name , xcat3_id , xcat3_name , bkt_id_88 , bkt_id_99 , bkt_id_9421 , bkt_id_ad , lag(t1.item_id,1) over (partition by leaf_flow_code,visitor_id,t1.rec_sessionid,t1.item_id order by local_time) as pre_item_id_session , lag(tk_cluster_id,1) over (partition by leaf_flow_code,visitor_id,t1.rec_sessionid,tk_cluster_id order by local_time) as pre_tk_cluster_id_session , lag(tt_cluster_id,1) over (partition by leaf_flow_code,visitor_id,t1.rec_sessionid,tt_cluster_id order by local_time) as pre_tt_cluster_id_session , lag(xsk_cluster_id,1) over (partition by leaf_flow_code,visitor_id,t1.rec_sessionid,xsk_cluster_id order by local_time) as pre_xsk_cluster_id_session , lag(t4.taggingV5_cate,1) over (partition by leaf_flow_code,visitor_id,t1.rec_sessionid,t4.taggingV5_cate order by local_time) as pre_taggingV5_cate_session , lag(v5_cluster_id,1) over (partition by leaf_flow_code,visitor_id,t1.rec_sessionid,v5_cluster_id order by local_time) as pre_v5_cluster_id_session , lag(seller_id,1) over (partition by leaf_flow_code,visitor_id,t1.rec_sessionid,seller_id order by local_time) as pre_seller_id_session , lag(org_brand_id,1) over (partition by leaf_flow_code,visitor_id,t1.rec_sessionid,org_brand_id order by local_time) as pre_org_brand_id_session , leaf_flow_code FROM ( select cnt_pos_chl_code leaf_flow_code , visitor_id , item_id , get_json_object(rec_params,'$.sessionid') rec_sessionid , index_id , pvid , x_object_id , max(real_index_id) real_index_id , min(unix_timestamp(to_date(local_time,'yyyy-mm-dd hh:mi:ss'))) as local_time , min(ds) as ds , max( case when cnt_pos_chl_code ='1000000130' then palgo_gul:newgul_parse_bucketId(tpp_buckets_info,'88','0') when cnt_pos_chl_code!='1000000130' then palgo_gul:newgul_parse_bucketId(tpp_buckets_info,'81','0') else'other' end ) bkt_id_88 , max( case when cnt_pos_chl_code='1000000130' then palgo_gul:newgul_parse_bucketId(tpp_buckets_info,'99','0') when cnt_pos_chl_code!='1000000130' then palgo_gul:newgul_parse_bucketId(tpp_buckets_info,'91','0') else'other' end ) bkt_id_99 , max(palgo_gul:newgul_parse_bucketId(tpp_buckets_info,'9421','0')) bkt_id_9421 , max(palgo_gul:newgul_parse_bucketId(tpp_buckets_info,'88','26637')) as bkt_id_ad -- 88#26637 , max(session_id) session_id_ut , max(coalesce(tbbi.get_biz_module( '${bizdate}' ,coalesce( if(coalesce(x_gongge_id, '') = '', null, concat('gongge_', x_gongge_id)) ,if(coalesce(x_object_type, '') = 'keyword_miniapp', concat('keyword_miniapp_', x_miniapp_id), null) ,if(coalesce(x_miniapp_id, '') = '', null, concat('miniapp_', x_miniapp_id)) ,if(is_ad = 1 and x_object_type not in ('zuanshiad', 'txad', 'texiu'), concat('ad_', x_object_type), null) ,x_object_type ) ,'rec_subobj' ) ,'未归类') ) x_biz_original , max(get_json_object(rec_params,'$.newface')) newface , max(keyvalue(get_json_object(rec_params,'$.x_extend'), '#', ':', 'diversify_fixpos')) diversify_fixpos , max(get_json_object(rec_params, '$.page_num')) page_num , max(get_json_object(rec_params,'$.isFixPos')) isFixPos , max(app_version) app_version , max(os) os , max(os_version) os_version , max( GET_JSON_OBJECT( sr_ds:toHoloJsonNew( sr_ds:HimalayasRecAutoParser( concat_ws(',', concat('isClientCache=', coalesce(is_client_cache, '')) ,concat('_afc_id=', coalesce(afc_id, '')) ) ,rec_params ,rec_params ,'offline' ,'' ) ,';' ,':' ),'$.itemOriMatchType' ) ) as itemOriMatchType , max(get_json_object(rec_params, '$.card_type')) card_type , max(get_json_object(rec_params, '$.card_subtype')) card_subtype from tbcdm.dwd_tb_ut_log_wl_imps_rec_di WHERE ds = '${bizdate}' AND visitor_type = 'uid' AND cnt_pos_chl_code in ( '1000000130','1000000039' ,'1000000037','1000000043','1000000044' ,'1000000041','1000000154','1000000038','1000000045' ) and visitor_id is notNULL and item_id is notNULL and get_json_object(rec_params,'$.sessionid') is notNULL and index_id is notNULL and pvid is notNULL and local_time is notNULL AND COALESCE(SPLIT( tbbi.get_biz_module('${bizdate}', COALESCE(IF(COALESCE(x_gongge_id,'') = '',NULL,CONCAT('gongge_',x_gongge_id)), IF(COALESCE(x_object_type,'') = 'keyword_miniapp',CONCAT('keyword_miniapp_',x_miniapp_id),NULL) ,IF(COALESCE(x_miniapp_id,'') = '',NULL,CONCAT('miniapp_',x_miniapp_id)), IF(is_ad = 1 AND x_object_type NOT IN ('zuanshiad','txad','texiu'),CONCAT('ad_',x_object_type),NULL) ,x_object_type) ,'rec_subobj') ,'\\.')[0],'') NOT IN ('宫格','首焦','定坑会场') and coalesce(is_client_cache,'0') <> '1' group by cnt_pos_chl_code , visitor_id , item_id , get_json_object(rec_params,'$.sessionid') , index_id , pvid -- ,local_time -- ,ds -- ,palgo_gul:newgul_parse_bucketId(tpp_buckets_info,'88','0') , x_object_id -- ,palgo_gul:newgul_parse_bucketId(tpp_buckets_info,'88','26637') )t1 left join ( select item_id ,title , concat('https://item.taobao.com/item.htm?id=',item_id) as item_url , concat('https://gw.alicdn.com/imgextra/',pict_url) as pict_url , coalesce(from_unixtime(cast(cast(keyvalue(features,'first_starts_time') as bigint) / 1000 as bigint)) , to_date(substr(old_starts,1,10),'yyyy-mm-dd')) online_date , bc_type , seller_id , seller_nick , cate_id , keyvalue(features, ';', ':', 'tags') as auction_tags , if(org_brand_id=0,null,org_brand_id) org_brand_id from tbcdm.dim_tb_itm where ds ='${bizdate}' and is_online='Y' )t2 on t1.item_id = t2.item_id left join ( select * from tbcdm.dim_tm_cate_industry where ds ='${bizdate}' )t3 on t2.cate_id = t3.cate_id left join ( select item_id , max(itemcate_id) taggingV5_cate , max(itemcate) taggingV5_cate_name , split_part(max(itemcate_full_path),"-",1) as taggingV5_first FROM tb_item_base.itemcate_tagging_v5 WHERE ds = max_pt('tb_item_base.itemcate_tagging_v5' ) group by item_id )t4 on t1.item_id = t4.item_id left join ( select item_id , max(cluster_id) tt_cluster_id from palgo_shopping.igraph_item_clusters_map where ds =max_pt('palgo_shopping.igraph_item_clusters_map') group by item_id )t5 on t1.item_id = t5.item_id left join ( select item_id , max(pid) as tk_cluster_id from palgo_shopping.gul_item_tongkuan_pid_v2 where ds = max_pt('palgo_shopping.gul_item_tongkuan_pid_v2') group by item_id )t6 on t1.item_id = t6.item_id left join ( select main_item_id item_id , min(cluster_id) xsk_cluster_id from imac.taobao_gul_cards_for_multimodal_emb_to_ssd_div_prepare_v2_cluster where ds = max_pt('imac.taobao_gul_cards_for_multimodal_emb_to_ssd_div_prepare_v2_cluster') group by main_item_id )t7 on t1.item_id = t7.item_id left join ( select itemcate_id taggingV5_cate , max(cluster_id) v5_cluster_id from search_f2e_odps_core.public_label_app_config_similar_category_cluster_online where pt =max_pt('search_f2e_odps_core.public_label_app_config_similar_category_cluster_online') group by itemcate_id )t8 on t4.taggingV5_cate = t8.taggingV5_cate ) select user_id , x_object_id , item_id , local_time , seller_id , org_brand_id , taggingV5_cate , taggingV5_cate_name , taggingV5_first , tt_cluster_id , tk_cluster_id , xsk_cluster_id , v5_cluster_id , session_id_ut , index_id , real_index_id , page_num , isFixPos , is_reserve , app_version , os , os_version , itemOriMatchType , x_biz , x_sub_biz , card_type , card_subtype , rec_sessionid , pvid , cate_id , ind1_name , cate_level1_name , cate_level2_name , cate_name , xcat1_id , xcat1_name , xcat2_id , xcat2_name , xcat3_id , xcat3_name , bkt_id_88 , bkt_id_99 , bkt_id_9421 , bkt_id_ad , if(item_id is not null and pre_item_id_session is not null,1,0) is_same_item_session , if(tk_cluster_id is not null and pre_tk_cluster_id_session is not null,1,0) is_same_tk_session , if(tt_cluster_id is not null and pre_tt_cluster_id_session is not null,1,0) is_same_tt_session , if(xsk_cluster_id is not null and pre_xsk_cluster_id_session is not null,1,0) is_same_xsk_session , if(taggingv5_cate is not null and pre_taggingv5_cate_session is not null,1,0) is_same_v5_session , if(v5_cluster_id is not null and pre_v5_cluster_id_session is not null,1,0) is_same_v5_cluster_session , if(seller_id is not null and pre_seller_id_session is not null,1,0) is_same_seller_session , if(org_brand_id is not null and pre_org_brand_id_session is not null,1,0) is_same_brand_session , leaf_flow_code from base ;
同样,我们先不关注代码逻辑,只看任务流程计划,在这个作业中一共有7个JOIN过程,为1张主表和7张从表JOIN。作业运行实例DAG图如下:
从DAG图中可以看到一共有2个JOIN阶段节点,通常来说ODPS会按照代码编写的Join顺序执行,如果多个相同的JOIN Key顺序相连,ODPS就会将这几个JOIN行为和并在一个JOIN阶段进行,即将各个从表中对应该JOIN Key的数据分发到同一台节点执行JOIN任务,以避免过多的Shuffle过程带来的负面影响。那么根据DAG图反推到代码结构意味着SQL中应该有2种不同的JOIN Key,并且代码也是按照JOIN Key的顺序编写的。但我们回看SQL发现似乎并不是这样,实际上的JOIN Key顺序是:item_id、cate_id、item_id、item_id、item_id、item_id和taggingV5_cate,那么ODPS自动合并应该会有4个JOIN阶段才对。为了搞清楚具体原因,我们需要查看这两个JOIN阶段的详细执行过程,如下图:
可以看到,ODPS自动为我们增加了一个ConditionalMapJoin环节,也就是ODPS识别到cate_id和taggingV5_cate对应的两张从表体量较小,可以根据实际调度情况尝试自动优化为MapJoin提升任务执行速度。那么看到这里就引出了另一个问题:既然cate_id和taggingV5_cate这两个Key的从表已经自动转变为了MapJoin,剩下的JOIN Key都是item_id为什么还会需要2个JOIN阶段呢?这也就引出了本案例想要指出的核心观点:编写JOIN顺序以及应用合适的JOIN方式十分重要。
尽管ODPS有些时候能够帮我们自动进行适当优化,其本质执行计划还是依赖于我们的SQL,以上述例子来说,ODPS虽然会帮助我们进行ConditionalMapJoin,但其并不能保证MapJoin一定能执行成功,因为这需要结合实例的JOIN表大小决定,一旦当日调度任务不适用MapJoin就需要以MergeJoin运行,所以并不会改变我们的JOIN顺序,也就导致至少还会有2个JOIN阶段。看到这里,相信读者已经想到,我们只需要稍稍改变一下JOIN的顺序就可以省去一个Shuffle过程,执行计划就能截然不同:即按照item_id、item_id、item_id、item_id、item_id、cate_id和taggingV5_cate的顺序执行即可,同时经过数据探查验证后可以发现,我们可以对除了cate_id和taggingV5_cate外的另外一张从表也人为设定使用MapJoin,这样一来就可以得到如下SQL和执行计划:
案例3 改进SQL
set odps.sql.mapper.split.size=8192; set odps.sql.mapjoin.memory.max=2048; insert overwrite table sr_ds.dwd_tb_rec_sc_session_reimps_1d partition(ds = '${bizdate}', leaf_flow_code) with base as ( select /*+ DISTMAPJOIN (t6(shard_count=39,replica_count=2), t7(shard_count=59,replica_count=2)), mapjoin(t5, t8, t3) */ t1.visitor_id user_id , x_object_id , t1.item_id , local_time , seller_id , org_brand_id , t4.taggingV5_cate , t4.taggingV5_cate_name , t4.taggingV5_first , tt_cluster_id , tk_cluster_id , xsk_cluster_id , v5_cluster_id , session_id_ut , index_id , real_index_id , page_num , isFixPos , if((isFixPos!='true'or isFixPos is null ),1,0) is_reserve -- 新的去定坑逻辑,20241105更改 , app_version , os , os_version , itemOriMatchType , if(x_biz_original is null, null, coalesce(split_part(x_biz_original, '.', 1), '未归类')) as x_biz , if(x_biz_original is null, null, coalesce(split_part(x_biz_original, '.', 2), '未归类')) as x_sub_biz , card_type , card_subtype , rec_sessionid , pvid , t2.cate_id , ind1_name , cate_level1_name , cate_level2_name , cate_name , xcat1_id , xcat1_name , xcat2_id , xcat2_name , xcat3_id , xcat3_name , bkt_id_88 , bkt_id_99 , bkt_id_9421 , bkt_id_ad -- 同品 , lag(t1.item_id,1) over (partition by leaf_flow_code,visitor_id,t1.rec_sessionid,t1.item_id order by local_time) as pre_item_id_session , lag(tk_cluster_id,1) over (partition by leaf_flow_code,visitor_id,t1.rec_sessionid,tk_cluster_id order by local_time) as pre_tk_cluster_id_session , lag(tt_cluster_id,1) over (partition by leaf_flow_code,visitor_id,t1.rec_sessionid,tt_cluster_id order by local_time) as pre_tt_cluster_id_session , lag(xsk_cluster_id,1) over (partition by leaf_flow_code,visitor_id,t1.rec_sessionid,xsk_cluster_id order by local_time) as pre_xsk_cluster_id_session , lag(t4.taggingV5_cate,1) over (partition by leaf_flow_code,visitor_id,t1.rec_sessionid,t4.taggingV5_cate order by local_time) as pre_taggingV5_cate_session , lag(v5_cluster_id,1) over (partition by leaf_flow_code,visitor_id,t1.rec_sessionid,v5_cluster_id order by local_time) as pre_v5_cluster_id_session , lag(seller_id,1) over (partition by leaf_flow_code,visitor_id,t1.rec_sessionid,seller_id order by local_time) as pre_seller_id_session , lag(org_brand_id,1) over (partition by leaf_flow_code,visitor_id,t1.rec_sessionid,org_brand_id order by local_time) as pre_org_brand_id_session , leaf_flow_code FROM ( select cnt_pos_chl_code leaf_flow_code , visitor_id , item_id , get_json_object(rec_params,'$.sessionid') rec_sessionid , index_id , pvid , x_object_id , max(real_index_id) real_index_id , min(unix_timestamp(to_date(local_time,'yyyy-mm-dd hh:mi:ss'))) as local_time , min(ds) as ds , max( case when cnt_pos_chl_code ='1000000130' then palgo_gul:newgul_parse_bucketId(tpp_buckets_info,'88','0') when cnt_pos_chl_code!='1000000130' then palgo_gul:newgul_parse_bucketId(tpp_buckets_info,'81','0') else'other' end ) bkt_id_88 , max( case when cnt_pos_chl_code='1000000130' then palgo_gul:newgul_parse_bucketId(tpp_buckets_info,'99','0') when cnt_pos_chl_code!='1000000130' then palgo_gul:newgul_parse_bucketId(tpp_buckets_info,'91','0') else'other' end ) bkt_id_99 , max(palgo_gul:newgul_parse_bucketId(tpp_buckets_info,'9421','0')) bkt_id_9421 , max(palgo_gul:newgul_parse_bucketId(tpp_buckets_info,'88','26637')) as bkt_id_ad -- 88#26637 , max(session_id) session_id_ut , max(coalesce(tbbi.get_biz_module( '${bizdate}' ,coalesce( if(coalesce(x_gongge_id, '') = '', null, concat('gongge_', x_gongge_id)) ,if(coalesce(x_object_type, '') = 'keyword_miniapp', concat('keyword_miniapp_', x_miniapp_id), null) ,if(coalesce(x_miniapp_id, '') = '', null, concat('miniapp_', x_miniapp_id)) ,if(is_ad = 1 and x_object_type not in ('zuanshiad', 'txad', 'texiu'), concat('ad_', x_object_type), null) ,x_object_type ) ,'rec_subobj' ) ,'未归类') ) x_biz_original , max(get_json_object(rec_params,'$.newface')) newface , max(keyvalue(get_json_object(rec_params,'$.x_extend'), '#', ':', 'diversify_fixpos')) diversify_fixpos , max(get_json_object(rec_params, '$.page_num')) page_num , max(get_json_object(rec_params,'$.isFixPos')) isFixPos , max(app_version) app_version , max(os) os , max(os_version) os_version , max( GET_JSON_OBJECT( sr_ds:toHoloJsonNew( sr_ds:HimalayasRecAutoParser( concat_ws(',', concat('isClientCache=', coalesce(is_client_cache, '')) ,concat('_afc_id=', coalesce(afc_id, '')) ) ,rec_params ,rec_params ,'offline' ,'' ) ,';' ,':' ),'$.itemOriMatchType' ) ) as itemOriMatchType , max(get_json_object(rec_params, '$.card_type')) card_type , max(get_json_object(rec_params, '$.card_subtype')) card_subtype from tbcdm.dwd_tb_ut_log_wl_imps_rec_di WHERE ds = '${bizdate}' AND visitor_type = 'uid' AND cnt_pos_chl_code in ( '1000000130','1000000039' ,'1000000037','1000000043','1000000044' ,'1000000041','1000000154','1000000038','1000000045' ) and visitor_id is notNULL and item_id is notNULL and get_json_object(rec_params,'$.sessionid') is notNULL and index_id is notNULL and pvid is notNULL and local_time is notNULL AND COALESCE(SPLIT( tbbi.get_biz_module('${bizdate}', COALESCE(IF(COALESCE(x_gongge_id,'') = '',NULL,CONCAT('gongge_',x_gongge_id)), IF(COALESCE(x_object_type,'') = 'keyword_miniapp',CONCAT('keyword_miniapp_',x_miniapp_id),NULL) ,IF(COALESCE(x_miniapp_id,'') = '',NULL,CONCAT('miniapp_',x_miniapp_id)), IF(is_ad = 1 AND x_object_type NOT IN ('zuanshiad','txad','texiu'),CONCAT('ad_',x_object_type),NULL) ,x_object_type) ,'rec_subobj') ,'\\.')[0],'') NOT IN ('宫格','首焦','定坑会场') and coalesce(is_client_cache,'0') <> '1' group by cnt_pos_chl_code , visitor_id , item_id , get_json_object(rec_params,'$.sessionid') , index_id , pvid -- ,local_time -- ,ds -- ,palgo_gul:newgul_parse_bucketId(tpp_buckets_info,'88','0') , x_object_id -- ,palgo_gul:newgul_parse_bucketId(tpp_buckets_info,'88','26637') )t1 left join ( select item_id ,title , concat('https://item.taobao.com/item.htm?id=',item_id) as item_url , concat('https://gw.alicdn.com/imgextra/',pict_url) as pict_url , coalesce(from_unixtime(cast(cast(keyvalue(features,'first_starts_time') as bigint) / 1000 as bigint)) , to_date(substr(old_starts,1,10),'yyyy-mm-dd')) online_date , bc_type , seller_id , seller_nick , cate_id , keyvalue(features, ';', ':', 'tags') as auction_tags , if(org_brand_id=0,null,org_brand_id) org_brand_id from tbcdm.dim_tb_itm where ds ='${bizdate}' and is_online='Y' )t2 on t1.item_id = t2.item_id left join ( select item_id , max(itemcate_id) taggingV5_cate , max(itemcate) taggingV5_cate_name , split_part(max(itemcate_full_path),"-",1) as taggingV5_first FROM tb_item_base.itemcate_tagging_v5 WHERE ds = max_pt('tb_item_base.itemcate_tagging_v5' ) group by item_id )t4 on t1.item_id = t4.item_id left join ( select item_id , max(pid) as tk_cluster_id from palgo_shopping.gul_item_tongkuan_pid_v2 where ds = max_pt('palgo_shopping.gul_item_tongkuan_pid_v2') group by item_id )t6 on t1.item_id = t6.item_id left join ( select main_item_id item_id , min(cluster_id) xsk_cluster_id from imac.taobao_gul_cards_for_multimodal_emb_to_ssd_div_prepare_v2_cluster where ds = max_pt('imac.taobao_gul_cards_for_multimodal_emb_to_ssd_div_prepare_v2_cluster') group by main_item_id )t7 on t1.item_id = t7.item_id left join ( select item_id , max(cluster_id) tt_cluster_id from palgo_shopping.igraph_item_clusters_map where ds =max_pt('palgo_shopping.igraph_item_clusters_map') group by item_id )t5 on t1.item_id = t5.item_id left join ( select itemcate_id taggingV5_cate , max(cluster_id) v5_cluster_id from search_f2e_odps_core.public_label_app_config_similar_category_cluster_online where pt =max_pt('search_f2e_odps_core.public_label_app_config_similar_category_cluster_online') group by itemcate_id )t8 on t4.taggingV5_cate = t8.taggingV5_cate left join ( select * from tbcdm.dim_tm_cate_industry where ds ='${bizdate}' )t3 on t2.cate_id = t3.cate_id ) select user_id , x_object_id , item_id , local_time , seller_id , org_brand_id , taggingV5_cate , taggingV5_cate_name , taggingV5_first , tt_cluster_id , tk_cluster_id , xsk_cluster_id , v5_cluster_id , session_id_ut , index_id , real_index_id , page_num , isFixPos , is_reserve , app_version , os , os_version , itemOriMatchType , x_biz , x_sub_biz , card_type , card_subtype , rec_sessionid , pvid , cate_id , ind1_name , cate_level1_name , cate_level2_name , cate_name , xcat1_id , xcat1_name , xcat2_id , xcat2_name , xcat3_id , xcat3_name , bkt_id_88 , bkt_id_99 , bkt_id_9421 , bkt_id_ad , if(item_id is not null and pre_item_id_session is not null,1,0) is_same_item_session , if(tk_cluster_id is not null and pre_tk_cluster_id_session is not null,1,0) is_same_tk_session , if(tt_cluster_id is not null and pre_tt_cluster_id_session is not null,1,0) is_same_tt_session , if(xsk_cluster_id is not null and pre_xsk_cluster_id_session is not null,1,0) is_same_xsk_session , if(taggingv5_cate is not null and pre_taggingv5_cate_session is not null,1,0) is_same_v5_session , if(v5_cluster_id is not null and pre_v5_cluster_id_session is not null,1,0) is_same_v5_cluster_session , if(seller_id is not null and pre_seller_id_session is not null,1,0) is_same_seller_session , if(org_brand_id is not null and pre_org_brand_id_session is not null,1,0) is_same_brand_session , leaf_flow_code from base ;
改进后的SQL相较原SQL只需要稍稍改变JOIN顺序,再加上
set odps.sql.mapjoin.memory.max=2048;和/*+ DISTMAPJOIN (t6(shard_count=39,replica_count=2), t7(shard_count=59,replica_count=2)), mapjoin(t5, t8, t3) */
这两行参数设定,就可以节省一个多余的MergeJoin和Shuffle过程,显著提升JOIN阶段整体效率!针对这个任务的表体量大小关系,优化时也结合使用了上文提到的DISTMAPJOIN方法,就只经过这样几个简单的操作,经过连续两周相同周期(周四-周日)对比,该任务产出时间由原先的平均4.2h减少到平均3.56h,就仅这一个环节的每日数据产出可用时间就提前了40多分钟!
来源 | 阿里云开发者公众号
作者 | 李伯瑞(不累)