Flink批处理优化器之成本估算

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: 成本估算 在基于成本的优化器中,成本估算非常重要,它直接影响着候选计划的生成。在Flink中成本估算依赖于每个不同的运算符所提供的自己的“预算”,本篇我们将分析什么是成本、运算符如何提供自己的预算以及如何基于预算估算成本。

成本估算

在基于成本的优化器中,成本估算非常重要,它直接影响着候选计划的生成。在Flink中成本估算依赖于每个不同的运算符所提供的自己的“预算”,本篇我们将分析什么是成本、运算符如何提供自己的预算以及如何基于预算估算成本。

什么是成本

Flink以类Costs来定义成本,它封装了一些成本估算的因素同时提供了一些针对成本对象的计算方法(加、减、乘、除)以及对这些因素未知值的认定与校验。

“cost”一词也有译作:开销、代价,将其视为同义即可。

Flink当前将成本估算的因素划分为两大类:

  • 可量化的成本估算因素:指代通过跟踪一个可量化的测量指标可以计算出的成本估算因素(比如网络或I/O的字节数);
  • 启发式的成本估算因素:指代那些不可定量计算的成本估算因素,因此只能给出一些定性的经验值;

当前被纳入成本估算的因素如下:

  • 网络成本;
  • 磁盘I/O成本;
  • CPU成本;
  • 启发式网络成本;
  • 启发式磁盘成本;
  • 启发式CPU成本;

可量化的成本估算因素可能经常会被设置为未知的(UNKNOWN,在Costs中以字面常量值-1表示)。当可量化的成本估算因素被置为未知时,所有操作的成本都将变成未知的,因此这将导致在进行优化裁剪期间,无法决策出哪个偏向的操作。在这种情况下,启发式的成本估算因素必须能发挥作用,它应该包含一个值来确保以不同策略执行的运算符是可比较的(甚至在无法估算的情况下)。

如何估算成本

成本的估算借助于成本估算器(CostEstimator),CostEstimator定义了一系列增加成本的方法,这些方法有待具体的估算器实现,它们大致分为三大类:

  • 增加传输策略的成本;
  • 增加本地策略的成本;
  • 增加屏障的成本;

CostEstimator借助于以上这几类方法,可完成对一个运算符总成本的计算,具体的计算逻辑封装在方法costOperator中,该方法接收一个计划节点(PlanNode)参数,然后按照传输策略和本地策略分别进行枚举与计算。完整的方法如下:

DefaultCostEstimator继承自CostEstimator,作为默认的(也是唯一的)成本估算器。它实现了上面计算成本逻辑中调用的一系列增加成本的addXXX方法。这些方法中的绝大部分,又依赖于预算提供器(EstimateProvider)所提供的预算数据,然后根据不同的增加成本的算法逻辑,利用这些预算数据做计算。比如我们以新增广播成本的addBroadcastCost方法为示例,其实广播传输方式说白了就是将数据复制到当前运算符的所有输出通道中,因此这里对成本的计算取决于复制因子,代码如下:

预算提供者

前面我们谈论了如何通过CostEstimator来估算成本,但其实CostEstimator是在已获得预算数据的基础上应用相关的算法来算出成本的,而用来估算成本的预算数据其实是来自预算提供者(EstimateProvider)。Flink批处理中所有的运算符都有一个基于优化器的内部表示,我们可以称它们为优化器运算符,这些运算符创建于优化操作之前,且它们都必须实现EstimateProvider接口。各个优化器运算符根据自己的实现以及语义将成本估算相关的信息暴露给外部查询。目前被纳入预算的信息有:

  • 输出的数据流大小:由接口方法getEstimatedOutputSize提供;
  • 输出的记录数:由接口方法getEstimatedNumRecords提供;
  • 单个输出记录的平均字节数:由接口方法getEstimatedAvgWidthPerOutputRecord提供;

在dag包下,EstimateProvider接口的继承关系图如下:

EstimateProvider

其中,OptimizerNode是所有被优化的运算符继承的基类,因此所有优化器运算符都是预算提供者。OptimizerNode为绝大部分的优化器运算符提供了统一的预算计算方法computeOutputEstimates。

为什么说是绝大部分运算符呢?因为有些运算符是特殊的,比如双输入端union运算符BinaryUnionNode以及迭代相关的运算符。

所有的运算符都会在优化时被遍历,Flink提供了一个编号及预算遍历器(IdAndEstimatesVisitor)来对所有运算符进行逐个遍历并计算预算,这一点体现在Optimizer的compile方法的下面这行代码中:

而在IdAndEstimatesVisitor的postVisit方法中即调用computeOutputEstimates方法来计算预算。下面,我们来分析一下预算是如何计算得出的,总得来说computeOutputEstimates的逻辑被分为两部分:

  • 各个具体的运算符计算它们特定的预算;
  • 根据编译提示(CompilerHints)覆盖原有的预算计算;

OptimizerNode将特定运算符的预算计算定义成名为computeOperatorSpecificDefaultEstimates的抽象方法开放给派生类根据自身的特定逻辑实现。然后,如果该运算符如果设置有CompilerHints的话,将会根据CompilerHints覆盖原有的预算结果。

所谓CompilerHints,它是封装了描述用户函数行为的编译提示,它可用于改进优化器对计划的选择。如果给某个运算符设置编译提示的话,那么在计算预算时,将会用它来覆盖运算符自身给出的中间结果的预算。当前,CompilerHints在优化器中没有得到太大的机会发挥。

因为CompilerHints没有被广泛应用,所以预算的计算还是依赖各个运算符具体提供,所以我们关注一下computeOperatorSpecificDefaultEstimates方法。该方法完全是按照具体运算符的语义特征来实现的,我们选择看其中的几个实现:

二元union运算符的预算就是累加其两个输入端:

Cross运算符的处理方式是:

从上面的两个运算符对预算的计算可见,它们大都依赖上游运算符的输出预算。而最初的预算肯定由source运算符决定,因为只有source才能知道数据的具体规模。

所以,我们来看一下DataSourceNode,很明显它作为数据的输入源,是最有可能了解初始数据集大小的运算符,为此Flink定义了一个专门用于统计的对象BaseStatistics,它用于统计对接外部的数据源的预算信息。但并非每个数据源的信息都能被统计到,而Flink当前也只实现了以文件为输入的FileInputFormat的预算统计FileBaseStatistics。


原文发布时间为:2017-03-28

本文作者:vinoYang

本文来自云栖社区合作伙伴CSDN博客,了解相关信息可以关注CSDN博客。

相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
目录
相关文章
|
SQL 分布式计算 大数据
统一批处理流处理——Flink批流一体实现原理
统一批处理流处理——Flink批流一体实现原理
1480 0
统一批处理流处理——Flink批流一体实现原理
|
3月前
|
关系型数据库 数据处理 流计算
【Flink】Flink 流处理和批处理
【1月更文挑战第26天】【Flink】Flink 流处理和批处理
|
11月前
|
人工智能 算法 调度
马腾宇团队新出大模型预训练优化器,比Adam快2倍,成本减半
马腾宇团队新出大模型预训练优化器,比Adam快2倍,成本减半
|
SQL 存储 运维
如何降低 Flink 开发和运维成本?阿里云实时计算平台建设实践
本次分享主要介绍阿里云实时计算平台从 2.0 基于 Yarn 的架构到 3.0 云原生时代的演进,以及在 3.0 平台上一些核心功能的建设实践,如健康分,智能诊断,细粒度资源,作业探查以及企业级安全的建设等。
如何降低 Flink 开发和运维成本?阿里云实时计算平台建设实践
|
SQL 分布式计算 算法
Apache Spark 2.2中基于成本的优化器(CBO)
Apache Spark 2.2中基于成本的优化器(CBO)
196 0
Apache Spark 2.2中基于成本的优化器(CBO)
|
流计算
《朱翥、贺小令|更快更稳更易用:Flink 自适应批处理能力演》电子版地址
朱翥、贺小令|更快更稳更易用:Flink 自适应批处理能力演
77 0
《朱翥、贺小令|更快更稳更易用:Flink 自适应批处理能力演》电子版地址
|
运维 安全 Cloud Native
基于云原生的集群自愈系统 Flink Cluster Inspector(成本侧)
1. 业务背景与挑战1.1 实时计算集群现状关于热点机器处理一直是阿里云 Flink 集群运维的一大痛点,不管在日常还是大促都已经是比较严重的问题,同时这也是分布式系统的老大难问题。而在今年整个阿里云成本控制的背景下,随着集群水位的逐步抬升,热点问题愈发严重。日均有上千次的热点机器出现,并且在晚上业务高峰期,整个热点持续时间会超过 60min,对于业务以及对于平台影响是比较大的。(集群日均数千次机
基于云原生的集群自愈系统 Flink Cluster Inspector(成本侧)
|
SQL 缓存 运维
更快更稳更易用: Flink 自适应批处理能力演进
朱翥、贺小令在 9.24 Apache Flink Meetup 的演讲内容整理。
更快更稳更易用: Flink 自适应批处理能力演进
|
Apache 流计算 Python
Flink第一课!使用批处理,流处理,Socket的方式实现经典词频统计
Flink第一课!使用批处理,流处理,Socket的方式实现经典词频统计
308 0
Flink第一课!使用批处理,流处理,Socket的方式实现经典词频统计
|
SQL 机器学习/深度学习 存储
从 Spark 做批处理到 Flink 做流批一体
Flink 做流批一体在 linkedIn 的一些探索经验
从 Spark 做批处理到 Flink 做流批一体