开发者社区> nicenelly> 正文

Flink批处理优化器之范围分区重写

简介: 为最终计划应用范围分区重写 Flink的批处理程序允许用户使用partitionByRange API来基于某个(或某些)字段进行按范围分区且可以选择性地指定排序顺序,示例代码如下: final ExecutionEnvironment env = ExecutionEnvironment.
+关注继续查看


为最终计划应用范围分区重写

Flink的批处理程序允许用户使用partitionByRange API来基于某个(或某些)字段进行按范围分区且可以选择性地指定排序顺序,示例代码如下:

final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

final DataSet<Tuple2<Integer, String>> ds = getTupleDataSet(env);
ds.partitionByRange(0).withOrders(Order.ASCENDING, Order.DESCENDING);

在使用范围分区这一特性时,需要尽可能保证各分区所处理的数据集均衡性以最大化利用计算资源并减少作业的执行时间。为此,优化器提供了范围分区重写器(RangePartitionRewriter)来对范围分区的分区策略进行优化,使其尽可能平均地分配数据,避免数据倾斜。要做到这一点需要对数据集的范围有足够的“了解”,RangePartitionRewriter通过对数据集进行采样来得到分区的范围。接下来我们就来分析RangePartitionRewriter的实现细节。

范围分区重写器

范围分区重写器(RangePartitionRewriter)同样遍历的是最终选择的计划并作用于计划节点(PlanNode),其主要用于在后置遍历时对传输策略为范围分区节点的输入端通道的连接情况进行重写,核心逻辑如下:

//提取当前所有的计划节点的输入通道
final Iterable<Channel> inputChannels = node.getInputs();
//遍历输入通道
for (Channel channel : inputChannels) {
    ShipStrategyType shipStrategy = channel.getShipStrategy();
    // 确保优化的通道的数据传输策略为范围分区
    if (shipStrategy == ShipStrategyType.PARTITION_RANGE) {

        if(channel.getDataDistribution() == null) {
            if (node.isOnDynamicPath()) {
                throw new InvalidProgramException("Range Partitioning not supported within iterations " 
                    + " if users do not supply the data distribution.");
            }

            //对该通道的范围分区进行“重写”,并将当前通道从源计划节点的通道中删除,然后加入新的通道集合
            PlanNode channelSource = channel.getSource();
            List<Channel> newSourceOutputChannels = rewriteRangePartitionChannel(channel);
            channelSource.getOutgoingChannels().remove(channel);
            channelSource.getOutgoingChannels().addAll(newSourceOutputChannels);
        }
    }
}

上述代码段的关键在于对rewriteRangePartitionChannel方法的调用,它封装了对最终计划进行改写的逻辑,改写产生的逻辑Dataflow对比示意图如下:

由上图可见,改写后的逻辑dataflow被拆分成两个分支:上层分支主要完成的功能是采样跟构建范围边界,我们将其简称为“采样分支”;下层分支则用于对记录分区的索引进行查找、路由以及相关处理,可简称为“数据处理分支”。两分支之间有一个衔接关系在于:采样分支最终会输出“范围边界”,并将其以广播变量的形式传递给数据处理分支(见图中的虚线部分),数据处理分支将依据范围边界为来自source的记录查找该归属的分区编号。你可能会产生疑惑:依照这种表述来看,采样分支和数据处理分支是有前后的时序依赖关系的,而单纯的逻辑Dataflow中从source分拆的两个分支通常没有这种关系。那么Flink是如何保证该依赖关系的呢?答案在于数据处理分支的第一个channel,其数据交换模式(DataExchangeMode)被设置为Batch模式(见图中括号的标注,没有特别备注的数据交换模式默认都是Pipeline),Batch模式将数据生产者跟消费者解耦并使得它们不必时刻互相依赖(当数据都生产完成之后,消费者才消费),这也避免了数据处理分支开始处理数据流时还没有收到来自采样分支的范围边界广播变量。

具体而言,其核心流程可分解为如下六步:

  1. 为每个分区采样固定数目的记录作为样本;
  2. 让中央协调器从每个分区的样本中采样固定数目的样本作为最终的样本;
  3. 基于最终样本数据构建范围边界;
  4. 将范围边界作为广播变量传递同时为每个记录构建<分区编号,记录>的二元组并输出然后以自定义分区来分区记录;
  5. 找到记录的分区之后,分区编号就没有存在的意义了,因此为流中的记录移除分区编号;
  6. 连接目标节点

关于采样算法的细节我们将会在下一小节专门进行分析,因此这里我们先假设已采样完成并从广播变量中得到了范围边界。接下来我们来分析数据处理分支的核心逻辑。当记录到来后需要确定它要落到哪个分区,这需要对范围边界集合进行查找并定位分区编号,优化器提供了一个RangeBoundaries接口,其定义了一个方法来提供该功能:

int getRangeIndex(T record);

其通用实现CommonRangeBoundaries使用二分查找来实现该方法:

public int getRangeIndex(T record) {
    return binarySearch(record);
}

CommonRangeBoundaries将会被应用在一个名为AssignRangeIndex的UDF(扩展自:RichMapPartitionFunction)中。AssignRangeIndex首先获取“范围边界”这一广播变量,然后构建CommonRangeBoundaries的实例,随之遍历当前聚集的分区数据并一一查找其分区编号以构建二元组,然后输出到下游,代码如下:

public void mapPartition(Iterable<IN> values, Collector<Tuple2<Integer, IN>> out) throws Exception {
    List<Object> broadcastVariable = getRuntimeContext().getBroadcastVariable("RangeBoundaries");
    if (broadcastVariable == null || broadcastVariable.size() != 1) {
        throw new RuntimeException("AssignRangeIndex require a single RangeBoundaries as broadcast input.");
    }
    Object[][] boundaryObjects = (Object[][]) broadcastVariable.get(0);
    RangeBoundaries rangeBoundaries = new CommonRangeBoundaries(typeComparator.createComparator(), 
        boundaryObjects);

    Tuple2<Integer, IN> tupleWithPartitionId = new Tuple2<>();

    for (IN record : values) {
        tupleWithPartitionId.f0 = rangeBoundaries.getRangeIndex(record);
        tupleWithPartitionId.f1 = record;
        out.collect(tupleWithPartitionId);
    }
}

以AssignRangeIndex构建的运算符所产生的计划节点连接着自定义的分区器来对为记录路由到指定的分区:

//以下标为0的字段(也即上面查找到的分区索引)作为分区依据
final FieldList keys = new FieldList(0);
partChannel.setShipStrategy(ShipStrategyType.PARTITION_CUSTOM, keys, 
    idPartitioner, DataExchangeMode.PIPELINED);

当记录(此时已是上面的二元组了)被路由到正确的分区之后,分区编号已没有用了,不需要再往下游传输了,优化器又定义了一个名为RemoveRangeIndex的UDF来移除分区编号,具体的做法是只输出二元组里下标为1的字段。最终将以RemoveRangeIndex构建的运算符所生成的计划节点替换通道原先的source节点并使得其与target节点进行连接。


原文发布时间为:2017-04-09

本文作者:vinoYang

本文来自云栖社区合作伙伴CSDN博客,了解相关信息可以关注CSDN博客。

版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。

相关文章
京东搜索排序在线学习的 Flink 优化实践
本文由京东搜索算法架构团队分享,主要介绍 Apache Flink 在京东商品搜索排序在线学习中的应用实践
2403 0
Flink批处理优化器之成本估算
成本估算 在基于成本的优化器中,成本估算非常重要,它直接影响着候选计划的生成。在Flink中成本估算依赖于每个不同的运算符所提供的自己的“预算”,本篇我们将分析什么是成本、运算符如何提供自己的预算以及如何基于预算估算成本。
1365 0
阿里云服务器如何登录?阿里云服务器的三种登录方法
购买阿里云ECS云服务器后如何登录?场景不同,阿里云优惠总结大概有三种登录方式: 登录到ECS云服务器控制台 在ECS云服务器控制台用户可以更改密码、更换系.
25239 0
阿里云服务器端口号设置
阿里云服务器初级使用者可能面临的问题之一. 使用tomcat或者其他服务器软件设置端口号后,比如 一些不是默认的, mysql的 3306, mssql的1433,有时候打不开网页, 原因是没有在ecs安全组去设置这个端口号. 解决: 点击ecs下网络和安全下的安全组 在弹出的安全组中,如果没有就新建安全组,然后点击配置规则 最后如上图点击添加...或快速创建.   have fun!  将编程看作是一门艺术,而不单单是个技术。
18995 0
Flink SQL FileSystem Connector 分区提交与自定义小文件合并策略
本文先通过源码简单过一下分区提交机制的两个要素——即触发(trigger)和策略(policy)的实现,然后用合并小文件的实例说一下自定义分区提交策略的方法。
2239 0
Apache Flink 在京东的实践与优化
Flink 助力京东实时计算平台朝着批流一体的方向演进。
698 0
hive 动态分区(Dynamic Partition)异常处理
Changing Hive Dynamic Partition Limits Symptoms: Hive enforces limits on the number of dynamic partitions that it creates.
1353 0
Flink批处理优化器之范围分区重写
为最终计划应用范围分区重写 Flink的批处理程序允许用户使用partitionByRange API来基于某个(或某些)字段进行按范围分区且可以选择性地指定排序顺序,示例代码如下: final ExecutionEnvironment env = ExecutionEnvironment.
1401 0
+关注
716
文章
646
问答
文章排行榜
最热
最新
相关电子书
更多
JS零基础入门教程(上册)
立即下载
性能优化方法论
立即下载
手把手学习日志服务SLS,云启实验室实战指南
立即下载