Flink批处理优化器之成本估算

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: 成本估算 在基于成本的优化器中,成本估算非常重要,它直接影响着候选计划的生成。在Flink中成本估算依赖于每个不同的运算符所提供的自己的“预算”,本篇我们将分析什么是成本、运算符如何提供自己的预算以及如何基于预算估算成本。

成本估算

在基于成本的优化器中,成本估算非常重要,它直接影响着候选计划的生成。在Flink中成本估算依赖于每个不同的运算符所提供的自己的“预算”,本篇我们将分析什么是成本、运算符如何提供自己的预算以及如何基于预算估算成本。

什么是成本

Flink以类Costs来定义成本,它封装了一些成本估算的因素同时提供了一些针对成本对象的计算方法(加、减、乘、除)以及对这些因素未知值的认定与校验。

“cost”一词也有译作:开销、代价,将其视为同义即可。

Flink当前将成本估算的因素划分为两大类:

  • 可量化的成本估算因素:指代通过跟踪一个可量化的测量指标可以计算出的成本估算因素(比如网络或I/O的字节数);
  • 启发式的成本估算因素:指代那些不可定量计算的成本估算因素,因此只能给出一些定性的经验值;

当前被纳入成本估算的因素如下:

  • 网络成本;
  • 磁盘I/O成本;
  • CPU成本;
  • 启发式网络成本;
  • 启发式磁盘成本;
  • 启发式CPU成本;

可量化的成本估算因素可能经常会被设置为未知的(UNKNOWN,在Costs中以字面常量值-1表示)。当可量化的成本估算因素被置为未知时,所有操作的成本都将变成未知的,因此这将导致在进行优化裁剪期间,无法决策出哪个偏向的操作。在这种情况下,启发式的成本估算因素必须能发挥作用,它应该包含一个值来确保以不同策略执行的运算符是可比较的(甚至在无法估算的情况下)。

如何估算成本

成本的估算借助于成本估算器(CostEstimator),CostEstimator定义了一系列增加成本的方法,这些方法有待具体的估算器实现,它们大致分为三大类:

  • 增加传输策略的成本;
  • 增加本地策略的成本;
  • 增加屏障的成本;

CostEstimator借助于以上这几类方法,可完成对一个运算符总成本的计算,具体的计算逻辑封装在方法costOperator中,该方法接收一个计划节点(PlanNode)参数,然后按照传输策略和本地策略分别进行枚举与计算。完整的方法如下:

public void costOperator(PlanNode n) { //构建一个成本对象用来存储总成本 final Costs totalCosts = new Costs(); //获得该节点的最少可用内存 final long availableMemory = n.getGuaranteedAvailableMemory(); //----------------------------- // 增加传输策略产生的成本 //----------------------------- //遍历该节点的所有输入端通道     for (Channel channel : n.getInputs()) { final Costs costs = new Costs(); //匹配当前通道的传输策略 switch (channel.getShipStrategy()) { case NONE: throw new CompilerException( "Cannot determine costs: Shipping strategy has not been set for an input."); case FORWARD: break; //随机重分区 case PARTITION_RANDOM: addRandomPartitioningCost(channel, costs); break; //哈希分区与自定义分区增加成本的方式相同 case PARTITION_HASH: case PARTITION_CUSTOM: addHashPartitioningCost(channel, costs); break; //范围分区 case PARTITION_RANGE: addRangePartitionCost(channel, costs); break; //广播 case BROADCAST: addBroadcastCost(channel, channel.getReplicationFactor(), costs); break; //强制重平衡分区 case PARTITION_FORCED_REBALANCE:
                addRandomPartitioningCost(channel, costs);
                break;
            default:
                throw new CompilerException("Unknown shipping strategy for input: " 
                    + channel.getShipStrategy());
        }

        //匹配当前通道的本地策略
        switch (channel.getLocalStrategy()) {
            case NONE:
                break;
            //排序与合并排序都增加本地的排序成本
            case SORT:
            case COMBININGSORT:
                addLocalSortCost(channel, costs);
                break;
            default:
                throw new CompilerException("Unsupported local strategy for input: " 
                    + channel.getLocalStrategy());
        }

        //增加屏障成本
        if (channel.getTempMode() != null && channel.getTempMode() != TempMode.NONE) {
            addArtificialDamCost(channel, 0, costs);
        }

        //如果通道在动态路径上,则需要调整成本计算的权重
        if (channel.isOnDynamicPath()) {
            costs.multiplyWith(channel.getCostWeight());
        }

        //将当前通道的成本加入总成本
        totalCosts.addCosts(costs);
    } 

    Channel firstInput = null;
    Channel secondInput = null;
    Costs driverCosts = new Costs();
    int costWeight = 1;

    //如果节点在动态路径上,则重新调整成本权重
    if (n.isOnDynamicPath()) {
        costWeight = n.getCostWeight();
    }

    //获得当前节点的所有输入端通道
    {
        Iterator<Channel> channels = n.getInputs().iterator();
        if (channels.hasNext()) {
            firstInput = channels.next();
        }
        if (channels.hasNext()) {
            secondInput = channels.next();
        }
    }

    //根据计划节点的执行策略来计算本地成本
    switch (n.getDriverStrategy()) {
        //以下这些执行策略不计算本地成本
        case NONE:
        case UNARY_NO_OP:
        case BINARY_NO_OP:
        case MAP:
        case MAP_PARTITION:
        case FLAT_MAP:

        case ALL_GROUP_REDUCE:
        case ALL_REDUCE:

        case CO_GROUP:
        case CO_GROUP_RAW:
        case SORTED_GROUP_REDUCE:
        case SORTED_REDUCE:

        case SORTED_GROUP_COMBINE:

        case ALL_GROUP_COMBINE:

        case UNION:

            break;

        //各种形式的合并成本
        case INNER_MERGE:
        case FULL_OUTER_MERGE:
        case LEFT_OUTER_MERGE:
        case RIGHT_OUTER_MERGE:
            addLocalMergeCost(firstInput, secondInput, driverCosts, costWeight);
            break;

        //混合哈希join的成本(第一个输入边是构建边,第二个输入边是扫描边)
        case HYBRIDHASH_BUILD_FIRST:
        case RIGHT_HYBRIDHASH_BUILD_FIRST:
        case LEFT_HYBRIDHASH_BUILD_FIRST:
        case FULL_OUTER_HYBRIDHASH_BUILD_FIRST:
            addHybridHashCosts(firstInput, secondInput, driverCosts, costWeight);
            break;

        //混合哈希join的成本(第二个输入边是构建边,第一个输入边是扫描边)
        case HYBRIDHASH_BUILD_SECOND:
        case LEFT_HYBRIDHASH_BUILD_SECOND:
        case RIGHT_HYBRIDHASH_BUILD_SECOND:
        case FULL_OUTER_HYBRIDHASH_BUILD_SECOND:
            addHybridHashCosts(secondInput, firstInput, driverCosts, costWeight);
            break;

        //各种其他的执行策略
        case HYBRIDHASH_BUILD_FIRST_CACHED:
            addCachedHybridHashCosts(firstInput, secondInput, driverCosts, costWeight);
            break;
        case HYBRIDHASH_BUILD_SECOND_CACHED:
            addCachedHybridHashCosts(secondInput, firstInput, driverCosts, costWeight);
            break;
        case NESTEDLOOP_BLOCKED_OUTER_FIRST:
            addBlockNestedLoopsCosts(firstInput, secondInput, availableMemory, driverCosts, costWeight);
            break;
        case NESTEDLOOP_BLOCKED_OUTER_SECOND:
            addBlockNestedLoopsCosts(secondInput, firstInput, availableMemory, driverCosts, costWeight);
            break;
        case NESTEDLOOP_STREAMED_OUTER_FIRST:
            addStreamedNestedLoopsCosts(firstInput, secondInput, availableMemory, driverCosts, costWeight);
            break;
        case NESTEDLOOP_STREAMED_OUTER_SECOND:
            addStreamedNestedLoopsCosts(secondInput, firstInput, availableMemory, driverCosts, costWeight);
            break;
        default:
            throw new CompilerException("Unknown local strategy: " + n.getDriverStrategy().name());
    }

    //将驱动器的执行成本加入到总成本,将得到的总成本作为当前节点的成本
    totalCosts.addCosts(driverCosts);
    n.setCosts(totalCosts);
}

DefaultCostEstimator继承自CostEstimator,作为默认的(也是唯一的)成本估算器。它实现了上面计算成本逻辑中调用的一系列增加成本的addXXX方法。这些方法中的绝大部分,又依赖于预算提供器(EstimateProvider)所提供的预算数据,然后根据不同的增加成本的算法逻辑,利用这些预算数据做计算。比如我们以新增广播成本的addBroadcastCost方法为示例,其实广播传输方式说白了就是将数据复制到当前运算符的所有输出通道中,因此这里对成本的计算取决于复制因子,代码如下:

public void addBroadcastCost(EstimateProvider estimates, int replicationFactor, Costs costs) {
    //检查复制因子的合法性
    if (replicationFactor <= 0) {
        throw new IllegalArgumentException("The replication factor of must be larger than zero.");
    }

    if (replicationFactor > 0) {
        //所估算的需要输出数据的大小
        final long estOutShipSize = estimates.getEstimatedOutputSize();
        //如果数据大小小于等于零,则标记网络成本为“未知”
        if (estOutShipSize <= 0) {
            costs.setNetworkCost(Costs.UNKNOWN);
        } 
        //否则网络成本拿数据大小乘以复制因子
        else {
            costs.addNetworkCost(replicationFactor * estOutShipSize);
        }
        //增加启发式网络成本,通过启发式成本基数乘以复制因子后再扩大十倍
        costs.addHeuristicNetworkCost(HEURISTIC_COST_BASE * 10 * replicationFactor);
    } else {
        costs.addHeuristicNetworkCost(HEURISTIC_COST_BASE * 1000);
    }
}

预算提供者

前面我们谈论了如何通过CostEstimator来估算成本,但其实CostEstimator是在已获得预算数据的基础上应用相关的算法来算出成本的,而用来估算成本的预算数据其实是来自预算提供者(EstimateProvider)。Flink批处理中所有的运算符都有一个基于优化器的内部表示,我们可以称它们为优化器运算符,这些运算符创建于优化操作之前,且它们都必须实现EstimateProvider接口。各个优化器运算符根据自己的实现以及语义将成本估算相关的信息暴露给外部查询。目前被纳入预算的信息有:

  • 输出的数据流大小:由接口方法getEstimatedOutputSize提供;
  • 输出的记录数:由接口方法getEstimatedNumRecords提供;
  • 单个输出记录的平均字节数:由接口方法getEstimatedAvgWidthPerOutputRecord提供;

在dag包下,EstimateProvider接口的继承关系图如下:

EstimateProvider

其中,OptimizerNode是所有被优化的运算符继承的基类,因此所有优化器运算符都是预算提供者。OptimizerNode为绝大部分的优化器运算符提供了统一的预算计算方法computeOutputEstimates。

为什么说是绝大部分运算符呢?因为有些运算符是特殊的,比如双输入端union运算符BinaryUnionNode以及迭代相关的运算符。

所有的运算符都会在优化时被遍历,Flink提供了一个编号及预算遍历器(IdAndEstimatesVisitor)来对所有运算符进行逐个遍历并计算预算,这一点体现在Optimizer的compile方法的下面这行代码中:

rootNode.accept(new IdAndEstimatesVisitor(this.statistics));

而在IdAndEstimatesVisitor的postVisit方法中即调用computeOutputEstimates方法来计算预算。下面,我们来分析一下预算是如何计算得出的,总得来说computeOutputEstimates的逻辑被分为两部分:

  • 各个具体的运算符计算它们特定的预算;
  • 根据编译提示(CompilerHints)覆盖原有的预算计算;

OptimizerNode将特定运算符的预算计算定义成名为computeOperatorSpecificDefaultEstimates的抽象方法开放给派生类根据自身的特定逻辑实现。然后,如果该运算符如果设置有CompilerHints的话,将会根据CompilerHints覆盖原有的预算结果。

所谓CompilerHints,它是封装了描述用户函数行为的编译提示,它可用于改进优化器对计划的选择。如果给某个运算符设置编译提示的话,那么在计算预算时,将会用它来覆盖运算符自身给出的中间结果的预算。当前,CompilerHints在优化器中没有得到太大的机会发挥。

因为CompilerHints没有被广泛应用,所以预算的计算还是依赖各个运算符具体提供,所以我们关注一下computeOperatorSpecificDefaultEstimates方法。该方法完全是按照具体运算符的语义特征来实现的,我们选择看其中的几个实现:

二元union运算符的预算就是累加其两个输入端:

protected void computeOperatorSpecificDefaultEstimates(DataStatistics statistics) {
    long card1 = getFirstPredecessorNode().getEstimatedNumRecords();
    long card2 = getSecondPredecessorNode().getEstimatedNumRecords();
    this.estimatedNumRecords = (card1 < 0 || card2 < 0) ? -1 : card1 + card2;

    long size1 = getFirstPredecessorNode().getEstimatedOutputSize();
    long size2 = getSecondPredecessorNode().getEstimatedOutputSize();
    this.estimatedOutputSize = (size1 < 0 || size2 < 0) ? -1 : size1 + size2;
}

Cross运算符的处理方式是:

protected void computeOperatorSpecificDefaultEstimates(DataStatistics statistics) {
    long card1 = getFirstPredecessorNode().getEstimatedNumRecords();
    long card2 = getSecondPredecessorNode().getEstimatedNumRecords();
    //输出的总记录数为第一个输入节点和第二个输入节点的记录数的乘积;
    this.estimatedNumRecords = (card1 < 0 || card2 < 0) ? -1 : card1 * card2;

    //如果记录数大于等于零,则会计算输出数据的大小
    if (this.estimatedNumRecords >= 0) {
        //获得第一个、第二个输入节点的单条记录大小,两者相加则是cross运算符单条输出记录的大小
        float width1 = getFirstPredecessorNode().getEstimatedAvgWidthPerOutputRecord();
        float width2 = getSecondPredecessorNode().getEstimatedAvgWidthPerOutputRecord();
        float width = (width1 <= 0 || width2 <= 0) ? -1 : width1 + width2;

        if (width > 0) {
            this.estimatedOutputSize = (long) (width * this.estimatedNumRecords);
        }
    }
}

从上面的两个运算符对预算的计算可见,它们大都依赖上游运算符的输出预算。而最初的预算肯定由source运算符决定,因为只有source才能知道数据的具体规模。

所以,我们来看一下DataSourceNode,很明显它作为数据的输入源,是最有可能了解初始数据集大小的运算符,为此Flink定义了一个专门用于统计的对象BaseStatistics,它用于统计对接外部的数据源的预算信息。但并非每个数据源的信息都能被统计到,而Flink当前也只实现了以文件为输入的FileInputFormat的预算统计FileBaseStatistics。


原文发布时间为:2017-03-28

本文作者:vinoYang

本文来自云栖社区合作伙伴CSDN博客,了解相关信息可以关注CSDN博客。

相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
目录
相关文章
|
SQL 分布式计算 大数据
统一批处理流处理——Flink批流一体实现原理
统一批处理流处理——Flink批流一体实现原理
1596 0
统一批处理流处理——Flink批流一体实现原理
|
3月前
|
Java 关系型数据库 MySQL
实时计算 Flink版操作报错合集之在使用批处理模式中使用flat_aggregate函数时报错,该如何解决
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
|
3月前
|
监控 搜索推荐 数据挖掘
Flink流处理与批处理大揭秘:实时与离线,一文让你彻底解锁!
【8月更文挑战第24天】Apache Flink 是一款开源框架,擅长流处理与批处理。流处理专攻实时数据流,支持无限数据流及事件驱动应用,实现数据的连续输入与实时处理。批处理则聚焦于静态数据集,进行一次性处理。两者差异体现在处理方式与应用场景:流处理适合实时性要求高的场景(例如实时监控),而批处理更适用于离线数据分析任务(如数据挖掘)。通过提供的示例代码,读者可以直观理解两种模式的不同之处及其实际应用。
211 0
|
6月前
|
存储 算法 API
Flink DataStream API 批处理能力演进之路
本文由阿里云 Flink 团队郭伟杰老师撰写,旨在向 Flink Batch 社区用户介绍 Flink DataStream API 批处理能力的演进之路。
607 2
Flink DataStream API 批处理能力演进之路
|
6月前
|
关系型数据库 数据处理 流计算
【Flink】Flink 流处理和批处理
【1月更文挑战第26天】【Flink】Flink 流处理和批处理
|
机器学习/深度学习 算法
减少内存消耗、降低大模型训练成本,ACL杰出论文作者揭秘CAME优化器
减少内存消耗、降低大模型训练成本,ACL杰出论文作者揭秘CAME优化器
233 0
|
SQL 存储 运维
如何降低 Flink 开发和运维成本?阿里云实时计算平台建设实践
本次分享主要介绍阿里云实时计算平台从 2.0 基于 Yarn 的架构到 3.0 云原生时代的演进,以及在 3.0 平台上一些核心功能的建设实践,如健康分,智能诊断,细粒度资源,作业探查以及企业级安全的建设等。
如何降低 Flink 开发和运维成本?阿里云实时计算平台建设实践
|
人工智能 算法 调度
马腾宇团队新出大模型预训练优化器,比Adam快2倍,成本减半
马腾宇团队新出大模型预训练优化器,比Adam快2倍,成本减半
115 0
|
SQL 缓存 运维
更快更稳更易用: Flink 自适应批处理能力演进
朱翥、贺小令在 9.24 Apache Flink Meetup 的演讲内容整理。
更快更稳更易用: Flink 自适应批处理能力演进
|
SQL 分布式计算 算法
Apache Spark 2.2中基于成本的优化器(CBO)
Apache Spark 2.2中基于成本的优化器(CBO)
240 0
Apache Spark 2.2中基于成本的优化器(CBO)