离线数仓任务优化
优化方向
无用数据尽早过滤掉 合理的分配、利用资源,使每个task处理的数据量适中,避免启动过多或过少的task 减少shuffle或者减少参与shuffle的数据量 避免数据倾斜,遇到倾斜的key进行打散处理
优化层面
业务层面
思考业务逻辑的合理性、可行性、必要性 计算量太大是不是必须的,是否可以减少参与计算的用户量或者时间跨度 计算逻辑是否过于复杂,是否可以简化
模型层面
设计合理的数仓模型 是否有现成的数据可以使用或者基于现成的数据进行加工 是否可以将整个计算逻辑进行合理拆分,降低每个子任务的复杂度,同时提高复用的可能性 维度退化,空间和时间的权衡
系统层面
遵循一些计算引擎建议的使用规则和参数设置 使用Spark3引擎,自动合并小文件 输入文件的存储格式、压缩格式、大小 输出文件的大小 启用压缩 分区、分桶 拉链表 yarn队列的设置 合适的计算引擎 task的内存设置 task处理的数据量 task的数量 并行度优化 调整参数减少Map数量 调整参数减少reduce数量
sql、代码层面
列裁剪,避免select * 分区裁剪,使用分区字段过滤 条件限制 谓词下推 map端预聚合 大key的过滤 打散倾斜key 合适的join方式 用Distribute By Rand控制分区中数据量 group by优化 中间结果的缓存和复用 小文件优化
任务层面
减少任务依赖,尽可能缩短链路 业务链路/逻辑重构/改写 任务分级,任务数评估,错峰调度 任务依赖降级,周级别的任务依赖天级别,天级别依赖小时级别,小时级别依赖分钟级别。 避免频繁创建任务 核心任务优先保证产出,双链路机制开启 耗时长的任务拆分成子任务。任务批次提交 资源动态扩容 资源腾挪调整 无用任务下线
hive常用优化手段&参数
spark常用优化手段&参数
spark-sql常用优化手段&参数
自适应中reduce参数控制
spark.sql.adaptive.shuffle.targetPostShuffleInputSize用于控制任务Shuffle后的目标输入大小(以字节为单位)。 spark.sql.adaptive.minNumPostShufflePartitions用于控制自适应执行中使用的shuffle后最小的分区数,可用于控制最小并行度。 spark.sql.adaptive.maxNumPostShufflePartitions来控制Shuffle后分区的最大数量。
合理设置单partition读取数据量
SET spark.sql.files.maxPartitionBytes=xxxx;
合理设置shuffle partition的数量
SET spark.sql.shuffle.partitions=xxxx
使用coalesce & repartition调整partition数量
SELECT /*+ COALESCE(3) */ * FROM EMP_TABLE SELECT /*+ REPARTITION(3) */ * FROM EMP_TABLE SELECT /*+ REPARTITION(c) */ * FROM EMP_TABLE SELECT /*+ REPARTITION(3, dept_col) */ * FROM EMP_TABLE SELECT /*+ REPARTITION_BY_RANGE(dept_col) */ * FROM EMP_TABLE SELECT /*+ REPARTITION_BY_RANGE(3, dept_col) */ * FROM EMP_TABLE
使用broadcast join
开启Adaptive Query Execution(Spark 3.0)
1、动态合并分区: spark会根据分区的数据量将小数据量的多个分区合并成一个分区,可以提高资源的利用率 spark.sql.adaptive.enabled: 是否开启AQE优化 spark.sql.adaptive.coalescePartitions.enabled: 是否开启动态合并分区 spark.sql.adaptive.coalescePartitions.initialPartitionNum: 初始分区数 spark.sql.adaptive.advisoryPartitionSizeInBytes 合并分区的推荐目标大小 spark.sql.adaptive.coalescePartitions.minPartitionNum: 合并之后的最小分区数 当RDD的分区数处于spark.sql.adaptive.coalescePartitions.initialPartitionNum与spark.sql.adaptive.coalescePartitions.minPartitionNum范围内才会合并 spark.sql.adaptive.advisoryPartitionSizeInBytes: 合并分区之后,分区的数据量的预期大小 2、动态切换join策略: 在join的时候,会动态选择性能最高的join策略,提高效率 spark.sql.adaptive.enabled: 是否开启AQE优化 spark.sql.adaptive.localShuffleReader.enabled:在不需要进行shuffle重分区时,尝试使用本地shuffle读取器。将sort-meger join 转换为广播join 3、动态申请资源: 当计算过程中资源不足会自动申请资源 spark.sql.adaptive.enabled: 是否开启AQE优化 spark.dynamicAllocation.enabled: 是否开启动态资源申请 spark.dynamicAllocation.shuffleTracking.enabled: 是否开启shuffle状态跟踪 4、动态join数据倾斜: join的时候如果出现了数据倾斜,会动态调整分区的数据量,优化数据倾斜导致的性能问题。 spark.sql.adaptive.enabled: 是否开启AQE优化 倾斜的膨胀系数:spark.sql.adaptive.skewJoin.skewedPartitionFactor:N 倾斜的最低阈值:spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes:M 拆分粒度,以字节为单位:spark.sql.adaptive.advisoryPartitionSizeInBytes G [代表优化之后,分区数数据的预期大小] sparksql判断出现数据倾斜的依据[需要两个条件同时满足]: 当某个分区处理的数据量>= N * 所有task处理数据量的中位数 当某个分区处理的数据量>= M
文件与分区
SET spark.sql.files.maxPartitionBytes=xxx //读取文件的时候一个分区接受多少数据; spark.sql.files.openCostInBytes//文件打开的开销,通俗理解就是小文件合并的阈值
CBO优化
spark.sql.cbo.enabled: 是否开启cbo优化 spark.sql.cbo.joinReorder.enabled: 是否调整多表Join的顺序 spark.sql.cbo.joinReorder.dp.threshold: 设置多表jion的表数量的阈值,一旦join的表数量超过该阈值则不优化多表join的顺序
hints优化
hints预防主要用在分区和join上。
Partitioning Hints Types:COALESCE,REPARTITION,REPARTITION_BY_RANGE
Join Hints Types:BROADCAST,MERGE,SHUFFLE_HASH,SHUFFLE_REPLICATE_NL
SELECT /*+ COALESCE(3) */ * FROM t; SELECT /*+ REPARTITION(3) */ * FROM t; SELECT /*+ REPARTITION(c) */ * FROM t; SELECT /*+ REPARTITION(3, c) */ * FROM t; SELECT /*+ REPARTITION_BY_RANGE(c) */ * FROM t; SELECT /*+ REPARTITION_BY_RANGE(3, c) */ * FROM t; ## Join Hints for broadcast join SELECT /*+ BROADCAST(t1) */ * FROM t1 INNER JOIN t2 ON t1.key = t2.key; SELECT /*+ BROADCASTJOIN (t1) */ * FROM t1 left JOIN t2 ON t1.key = t2.key; SELECT /*+ MAPJOIN(t2) */ * FROM t1 right JOIN t2 ON t1.key = t2.key; -- Join Hints for shuffle sort merge join SELECT /*+ SHUFFLE_MERGE(t1) */ * FROM t1 INNER JOIN t2 ON t1.key = t2.key; SELECT /*+ MERGEJOIN(t2) */ * FROM t1 INNER JOIN t2 ON t1.key = t2.key; SELECT /*+ MERGE(t1) */ * FROM t1 INNER JOIN t2 ON t1.key = t2.key; ## Join Hints for shuffle hash join SELECT /*+ SHUFFLE_HASH(t1) */ * FROM t1 INNER JOIN t2 ON t1.key = t2.key; ## Join Hints for shuffle-and-replicate nested loop join SELECT /*+ SHUFFLE_REPLICATE_NL(t1) */ * FROM t1 INNER JOIN t2 ON t1.key = t2.key; ## When different join strategy hints are specified on both sides of a join, Spark ## prioritizes the BROADCAST hint over the MERGE hint over the SHUFFLE_HASH hint ## over the SHUFFLE_REPLICATE_NL hint. ## Spark will issue Warning in the following example ## org.apache.spark.sql.catalyst.analysis.HintErrorLogger: Hint (strategy=merge) ## is overridden by another hint and will not take effect. SELECT /*+ BROADCAST(t1), MERGE(t1, t2) */ * FROM t1 INNER JOIN t2 ON t1.key = t2.key;
缓存表
对于一条SQL语句中可能多次使用到的表,可以对其进行缓存,使用
SQLContext.cacheTable(TableName)或者DataFrame.cache即可
,SparkSQL会用内存列存储的格式进行表的缓存,然后SparkSQL就可以仅仅扫描需要使用的列,并且自动优化压缩,来最小化内存的使用和GC的开销,SQLContext.uncacheTable(tableName)可以将表从缓存中移除,使用SQLContext.setConf()设置,可以通过
spark.sql.inMemoryColumnarStorage.batchSize
这个参数,默认10000,配置列存储单位。
永久视图 view:永久保存一段查询语句的逻辑,而不是查询语句的数据,永久有效,查询这个视图,相当于查询一个SQL语句,如果保存的查询逻辑复杂,这查询视图也耗时长。支持重新覆盖 create or replace view view1 as 临时视图 temporary view:只在当前会话生效,如果会话结束,则临时视图失效,支持重新覆盖 create or replace temporary view temp_view1 as,类似于 SparkSQL 中的 DataFrame.createOrReplaceTempView('视图名'),hive不支持这个语法 缓存表cache table:只在当前会话有效,将一段查询结果集缓存到内存,并赋予一个表名。 table:永久有效,保存数据结构和数据本身到磁盘。 with as:当子查询的嵌套层数太多时,可以用with as 增加可读性。
group by优化
为了提高 group by 查询的性能,可以尝试以下几种方法: 仅选择必要的字段进行 group by 操作,避免选择过多的字段。 尽可能将 group by 字段类型保持一致,以减少数据转换的开销。 如果可能,可以将 group by 字段进行哈希分区,以减少数据传输和处理的开销。 如果使用的是字符串类型,可以考虑使用哈希函数来减少字符串比较的开销。
优化倾斜连接
数据偏斜会严重降低联接查询的性能。此功能通过将倾斜的任务拆分(按需复制)为大小大致相等的任务来动态处理排序合并联接中的倾斜。同时启用spark.sql.adaptive.enabled和spark.sql.adaptive.skewJoin.enabled配置时,此选项才生效。
实时数仓任务优化
flink任务优化
时效优化
1:查看kafka延迟监控:flink 消费上游的 lag(比如看消费 kafka lag 情况) 2:分层和时延之间做好平衡和取舍,既需要保证复用性,又要避免造成链路过长。 3:乱序数据的处理。 4:提前压测,应对流量高峰期,特别是大促场景下,提前做好资源保障、任务优化等措施。 5:设置好延时基线,通过优化程序代码、资源、解决倾斜与反压等问题,使其控制在base line之内。 6:指标监控,监控任务failover情况、checkpoint指标、GC情况、作业反压等,出现异常告警。 7:Flink链路延迟监控的LatencyMarker观测延迟情况。
数据质量保障:
## 数据一致性 1:正确性实时计算端到端的一致性,对数据正确性的影响,常用手段就是通过输出幂等方式保障,这种方式要求输出使用存储介质支持重写,对于不支持幂等的存储,比较常用的就是DWD层的kafka, 可能会产生重复的数据,那么在下游使用的时候可以使用row_number() 语法进行去重,保证相同的key不会被多次计算; 2:离线与实时的一致性,需要保证使用数据源一致、加工业务逻辑一致。 ## 数据完整性: 目标有效数据从数据源头到数据加工再到前端数据展示,不能因为加工逻辑权限,存储异常,前端展现异常等原因导致数据丢失。例如: 1:数据源层出现背压时,导致数据源头(MQ,KAFKA)消息积压,积压严重时导致资源耗尽,进而导致数据丢失; 2:数据处理层数据加工未按照需求进行加工,导致目标有效数据丢失; 3:数据存储层的存储容量写满时,导致新数据无法继续写入导致数据丢失; 4:数据加工正确性、数据加工及时性、数据快速恢复性构成数据完整性; ## 数据加工正确性: 目标源数据按照业务需求加工成目标有效数据,目标有效数据根据不同维度不同指标计算成需要展示的不同指标数据。例如: 1:数据源层原始数据包含不同联盟的点击数据,那么数据处理层过滤掉不需要的联盟点击数据,并将目标联盟的点击数据根据媒体和创意信息补齐当前点击所属的账号、计划、单元; 2:业务层根据媒体,账号、计划、单元不同维度计算出对应的点击总量; ## 数据加工及时性: 目标源数据从产生到前端展示的时间需要控制合理的时间范围内; ## 数据快速恢复性: 数据在流转路径中因为异常导致流转中断,数据停止在某一个环节中,当异常解决,系统恢复正常时,停止的数据(停止的数据)需要快速恢复流转,并且这种恢复是正确的,不应该存在重复的消费和加工或者遗漏。例如: 1:数据处理层因为消费程序性能问题导致消息积压,性能问题解决后数据挤压问题逐步得到缓解直到恢复正常水平; 2:数据处理层因为消费程序bug导致程序崩溃,重启后数据消费正常; ## 数据可监控性: 数据流转路径中关键节点的关键状态可以有效监控; ## 数据高可用性: 数据不能因为灾难性的问题导致丢失造成不能使用的情况,因此需要考虑实时数据消费应用集群和存储集群的主备和可容灾;
成本优化
## 计存成本优化 cpu、内存、network合理选择 cpu进程绑定 数据分区分桶,选择合适的文件格式 数据分级与压缩存储 建立数据、数仓共享方案 计算引擎的选择 无用、低频数据下线或者冷备 数据布局优化技术 优化系统 ... ## 资源弹性伸缩容(云原生k8s等) ## 资源隔离,存算分离
稳定性保证
任务压测 提前压测应对流量高峰期,特别是大促场景下,提前做好资源保障、任务优化等措施。 任务分级 制定保障等级,从任务影响面大小、数据使用方来划分,一般情况公司层面优先于部门层面,外部使用优先于内部使用, 高优先级任务需要优先/及时响应、必要情况下做双链路保障机制; 做好指标监控 指标监控,监控任务failover情况、checkpoint指标、GC情况、作业反压等,出现异常告警。 高可用HA 整个实时Pipeline链路都应该选取高可用组件,确保理论上整体高可用;在数据关键链路上支持数据备份和重演机制;在业务关键链路上支持双跑融合机制 SLA保障 在确保集群和实时Pipeline高可用的前提下,支持动态扩容和数据处理流程自动漂移 弹性反脆弱 基于规则和算法的资源弹性伸缩;支持事件触发动作引擎的失效处理。 监控预警 集群设施层面,物理管道层面,数据逻辑层面的多方面监控预警能力 自动运维 能够捕捉并存档缺失数据和处理异常,并具备定期自动重试机制修复问题数据 上游元数据变更抗性 上游业务库要求兼容性元数据变更;实时Pipeline处理显式字段。
任务调度优化
减少任务依赖,尽可能缩短链路 业务链路/逻辑重构/改写 任务分级,任务数评估,错峰调度 任务依赖降级,周级别的任务依赖天级别,天级别依赖小时级别,小时级别依赖分钟级别。 避免频繁创建小任务 核心任务优先保证产出,双链路机制开启 耗时长的任务拆分成子任务。任务批次提交 资源动态扩容 资源腾挪调整 无用任务下线
离线数仓任务延迟
## 1、紧急修复故障 公司集群机器下线,肯定是出了故障,那第一优先级当然是尽快核查集群机器下线的原因,然后给出针对性解决方案,如果短时间内集群问题没法解决,也要尽快升级领导,把对业务的影响讲清楚,如果上级重视了,可能就会帮你协调到更高端的技术资源,这个工作一定要同步进行,一定要给集群支撑方足够的压力,这叫对症下药,也是治本的方法,其他方法说起来都是曲线救国。 这一步做到位了,如果时间的确紧急,那就走到下一步。 ## 2:资源动态扩容 既然是集群,按道理资源是有冗余的吧,那么临时动态扩容是最基本的方法,这也是云计算存在的意义吧,如果这一步都做不到,至少说明系统规划没做好啊,难不成现在数据仓库还是单机?如果是这样,就要考虑数据仓库云化的方法,现在hadoop大数据平台架构已经很成熟了。 这一步如果做不到,那就记下来跟规划部门秋后算账,然后继续往下走。 ## 3、资源腾挪调整 集群资源的使用也存在2/8现象,既然是核心任务,肯定有很多非核心任务,那就把其他非核心任务的资源腾挪部分给核心任务,假如是hadoop集群,可以采取调整队列的方法来快速增加资源,当然前提是要能够大致判断调整后的业务影响,不过这种抢别人资源的方式还是比较简单粗暴。 如果资源调无可调,那就继续往下走。 ## 4、任务分级、双链路切换 如果资源腾挪不现实,比如短时间内难以在资源层面进行精细化的调度,那就对所有任务的优先级进行排序,把核心任务的调度优先级提升,调低其他任务的优先级,确保核心任务的生成时间满足要求,当然前提是对所有任务的重要程度、相互依赖程度和对业务的影响有比较清晰的认识,这些功夫都在诗外,临时仓促的去调整任务优先级可能会导致严重的后果。 制定任务优先级,可以从任务影响面大小、数据使用方来划分,一般情况公司层面优先于部门层面,外部使用优先于内部使用, 高优先级任务需要优先/及时响应、必要情况下做双链路保障机制;把非核心的任务靠后运行 。 如果任务有成千上万,互相之间有千丝万缕的关系,短时间不具备梳理清楚和操作的条件,那就继续往下走。 ## 5、任务代码优化 核心任务一般是比较复杂的,消耗的资源也比较多,意味着有较多的优化空间,原来家里有余粮的时候可能不太会关注代码的质量和效率,现在不得不去做优化了,这就看开发人员的能力了,技术大拿在这个时候往往显示出了价值,我们以前就通过将hive换成spark取得了不错的提速效果。 如果代码优化的空间仍然有限,那就继续往下走。 ## 6、降低任务依赖 数据仓库建模通过模型的复用可以有效提升上层应用的整体支撑效率,但回归到单个应用的任务,由于需要依赖仓库模型的生成,反倒影响了生成速度,这就是局部和全局最优的矛盾。通过剥离核心任务对数据仓库模型的依赖,为其量身定做一套数据处理逻辑,则可以大幅提升效率,后果是造成了资源的浪费,加剧了数据仓库整体资源的紧缺状态,当然非常时期非常手段,有时候为了考核保障就不得不这么做。 如果这样也不行,那就继续往下走。 ## 7、任务错峰调度、批量提交、无效任务下线,高消耗任务拆解 因为有些有些不同的任务执行时间也会根据数据量、数据分布、分配资源等因素,造成执行时长是不一样的。对于一些任务可以将执行时间相差不大的,放在一起进行批量提交。对于低频或无效任务进行下线或降级。而对于高消耗资源的任务,可以对任务进行拆解成子任务,然和进行提交。 ## 8、核心任务容灾 既然核心任务这么重要,而单集群也不可信任,那就不能把所有鸡蛋放在一个篮子里,容灾或者应急是常规做法,比如我们为了确保某些作业万无一失,常常会采取异地异构(核心任务分别放在hadoop和MPP集群)的解决方案,前提是核心任务规模不大,否则投资和成本都吃不消,数据仓库由于数据量大的特点,一般都不会做容灾方案,虽然集群垮掉是极小概率事件,但集群性能下降是很大概率事件,比如hadoop一个参数调整就可能大幅降低数据处理的效率。 ## 9、做好解释工作 核心任务延迟肯定影响了业务,面对这种情况,一方面要及时跟上级做好沟通汇报,协同各方做好故障的分析,给出可以落地的解决方案,如果下属能拿着这7个方案来让我决策,我会很满意,另一方面,要做好核心任务延迟对业务造成的实际影响的评估,做到心中有数,同时跟业务方做好解释工作,适当降低业务的预期。 能做到这一步,我想已经超越了大多数人,因为这不是简单的技术问题,对于处理人员的综合素质要求挺高。 ## 10、转危机为机会 出故障对于数据仓库来讲既是挑战,其实也是机会,平时没出问题的时候业务感受不到数据仓库的价值,要争取些资源还挺难的,如果故障真的对业务造成了较大的影响,可能会让公司重新审视数据仓库的价值。 记得以前某次IT系统挂了,造成了较大的社会影响,后来分析出来的原因是容量不足,然后公司对规划部门、市场部门、IT部门的负责