Flink批处理优化器之范围分区重写

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: 为最终计划应用范围分区重写 Flink的批处理程序允许用户使用partitionByRange API来基于某个(或某些)字段进行按范围分区且可以选择性地指定排序顺序,示例代码如下: final ExecutionEnvironment env = ExecutionEnvironment.


为最终计划应用范围分区重写

Flink的批处理程序允许用户使用partitionByRange API来基于某个(或某些)字段进行按范围分区且可以选择性地指定排序顺序,示例代码如下:

final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

final DataSet<Tuple2<Integer, String>> ds = getTupleDataSet(env);
ds.partitionByRange(0).withOrders(Order.ASCENDING, Order.DESCENDING);

在使用范围分区这一特性时,需要尽可能保证各分区所处理的数据集均衡性以最大化利用计算资源并减少作业的执行时间。为此,优化器提供了范围分区重写器(RangePartitionRewriter)来对范围分区的分区策略进行优化,使其尽可能平均地分配数据,避免数据倾斜。要做到这一点需要对数据集的范围有足够的“了解”,RangePartitionRewriter通过对数据集进行采样来得到分区的范围。接下来我们就来分析RangePartitionRewriter的实现细节。

范围分区重写器

范围分区重写器(RangePartitionRewriter)同样遍历的是最终选择的计划并作用于计划节点(PlanNode),其主要用于在后置遍历时对传输策略为范围分区节点的输入端通道的连接情况进行重写,核心逻辑如下:

//提取当前所有的计划节点的输入通道
final Iterable<Channel> inputChannels = node.getInputs();
//遍历输入通道
for (Channel channel : inputChannels) {
    ShipStrategyType shipStrategy = channel.getShipStrategy();
    // 确保优化的通道的数据传输策略为范围分区
    if (shipStrategy == ShipStrategyType.PARTITION_RANGE) {

        if(channel.getDataDistribution() == null) {
            if (node.isOnDynamicPath()) {
                throw new InvalidProgramException("Range Partitioning not supported within iterations " 
                    + " if users do not supply the data distribution.");
            }

            //对该通道的范围分区进行“重写”,并将当前通道从源计划节点的通道中删除,然后加入新的通道集合
            PlanNode channelSource = channel.getSource();
            List<Channel> newSourceOutputChannels = rewriteRangePartitionChannel(channel);
            channelSource.getOutgoingChannels().remove(channel);
            channelSource.getOutgoingChannels().addAll(newSourceOutputChannels);
        }
    }
}

上述代码段的关键在于对rewriteRangePartitionChannel方法的调用,它封装了对最终计划进行改写的逻辑,改写产生的逻辑Dataflow对比示意图如下:

由上图可见,改写后的逻辑dataflow被拆分成两个分支:上层分支主要完成的功能是采样跟构建范围边界,我们将其简称为“采样分支”;下层分支则用于对记录分区的索引进行查找、路由以及相关处理,可简称为“数据处理分支”。两分支之间有一个衔接关系在于:采样分支最终会输出“范围边界”,并将其以广播变量的形式传递给数据处理分支(见图中的虚线部分),数据处理分支将依据范围边界为来自source的记录查找该归属的分区编号。你可能会产生疑惑:依照这种表述来看,采样分支和数据处理分支是有前后的时序依赖关系的,而单纯的逻辑Dataflow中从source分拆的两个分支通常没有这种关系。那么Flink是如何保证该依赖关系的呢?答案在于数据处理分支的第一个channel,其数据交换模式(DataExchangeMode)被设置为Batch模式(见图中括号的标注,没有特别备注的数据交换模式默认都是Pipeline),Batch模式将数据生产者跟消费者解耦并使得它们不必时刻互相依赖(当数据都生产完成之后,消费者才消费),这也避免了数据处理分支开始处理数据流时还没有收到来自采样分支的范围边界广播变量。

具体而言,其核心流程可分解为如下六步:

  1. 为每个分区采样固定数目的记录作为样本;
  2. 让中央协调器从每个分区的样本中采样固定数目的样本作为最终的样本;
  3. 基于最终样本数据构建范围边界;
  4. 将范围边界作为广播变量传递同时为每个记录构建<分区编号,记录>的二元组并输出然后以自定义分区来分区记录;
  5. 找到记录的分区之后,分区编号就没有存在的意义了,因此为流中的记录移除分区编号;
  6. 连接目标节点

关于采样算法的细节我们将会在下一小节专门进行分析,因此这里我们先假设已采样完成并从广播变量中得到了范围边界。接下来我们来分析数据处理分支的核心逻辑。当记录到来后需要确定它要落到哪个分区,这需要对范围边界集合进行查找并定位分区编号,优化器提供了一个RangeBoundaries接口,其定义了一个方法来提供该功能:

int getRangeIndex(T record);

其通用实现CommonRangeBoundaries使用二分查找来实现该方法:

public int getRangeIndex(T record) {
    return binarySearch(record);
}

CommonRangeBoundaries将会被应用在一个名为AssignRangeIndex的UDF(扩展自:RichMapPartitionFunction)中。AssignRangeIndex首先获取“范围边界”这一广播变量,然后构建CommonRangeBoundaries的实例,随之遍历当前聚集的分区数据并一一查找其分区编号以构建二元组,然后输出到下游,代码如下:

public void mapPartition(Iterable<IN> values, Collector<Tuple2<Integer, IN>> out) throws Exception {
    List<Object> broadcastVariable = getRuntimeContext().getBroadcastVariable("RangeBoundaries");
    if (broadcastVariable == null || broadcastVariable.size() != 1) {
        throw new RuntimeException("AssignRangeIndex require a single RangeBoundaries as broadcast input.");
    }
    Object[][] boundaryObjects = (Object[][]) broadcastVariable.get(0);
    RangeBoundaries rangeBoundaries = new CommonRangeBoundaries(typeComparator.createComparator(), 
        boundaryObjects);

    Tuple2<Integer, IN> tupleWithPartitionId = new Tuple2<>();

    for (IN record : values) {
        tupleWithPartitionId.f0 = rangeBoundaries.getRangeIndex(record);
        tupleWithPartitionId.f1 = record;
        out.collect(tupleWithPartitionId);
    }
}

以AssignRangeIndex构建的运算符所产生的计划节点连接着自定义的分区器来对为记录路由到指定的分区:

//以下标为0的字段(也即上面查找到的分区索引)作为分区依据
final FieldList keys = new FieldList(0);
partChannel.setShipStrategy(ShipStrategyType.PARTITION_CUSTOM, keys, 
    idPartitioner, DataExchangeMode.PIPELINED);

当记录(此时已是上面的二元组了)被路由到正确的分区之后,分区编号已没有用了,不需要再往下游传输了,优化器又定义了一个名为RemoveRangeIndex的UDF来移除分区编号,具体的做法是只输出二元组里下标为1的字段。最终将以RemoveRangeIndex构建的运算符所生成的计划节点替换通道原先的source节点并使得其与target节点进行连接。


原文发布时间为:2017-04-09

本文作者:vinoYang

本文来自云栖社区合作伙伴CSDN博客,了解相关信息可以关注CSDN博客。

相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
目录
相关文章
|
4月前
|
消息中间件 Kafka 流计算
Flink的分区表订阅功能是通过Kafka的topic分区来实现的
Flink的分区表订阅功能是通过Kafka的topic分区来实现的【1月更文挑战第6天】【1月更文挑战第26篇】
118 1
|
11月前
|
消息中间件 存储 Kafka
Flink---11、状态管理(按键分区状态(值状态、列表状态、Map状态、归约状态、聚合状态)算子状态(列表状态、广播状态))
Flink---11、状态管理(按键分区状态(值状态、列表状态、Map状态、归约状态、聚合状态)算子状态(列表状态、广播状态))
|
11月前
|
流计算
在Flink中,Regular Join(包括Left Join)的结果顺序是由Flink的分区策略和数据的分布方式共同决定的
在Flink中,Regular Join(包括Left Join)的结果顺序是由Flink的分区策略和数据的分布方式共同决定的
56 1
|
4月前
|
消息中间件 SQL Kafka
实时计算 Flink版产品使用合集之如何重写序列化器
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStreamAPI、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
SQL 分布式计算 大数据
统一批处理流处理——Flink批流一体实现原理
统一批处理流处理——Flink批流一体实现原理
1575 0
统一批处理流处理——Flink批流一体实现原理
|
1月前
|
Java 关系型数据库 MySQL
实时计算 Flink版操作报错合集之在使用批处理模式中使用flat_aggregate函数时报错,该如何解决
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
|
1月前
|
监控 搜索推荐 数据挖掘
Flink流处理与批处理大揭秘:实时与离线,一文让你彻底解锁!
【8月更文挑战第24天】Apache Flink 是一款开源框架,擅长流处理与批处理。流处理专攻实时数据流,支持无限数据流及事件驱动应用,实现数据的连续输入与实时处理。批处理则聚焦于静态数据集,进行一次性处理。两者差异体现在处理方式与应用场景:流处理适合实时性要求高的场景(例如实时监控),而批处理更适用于离线数据分析任务(如数据挖掘)。通过提供的示例代码,读者可以直观理解两种模式的不同之处及其实际应用。
56 0
|
4月前
|
存储 算法 API
Flink DataStream API 批处理能力演进之路
本文由阿里云 Flink 团队郭伟杰老师撰写,旨在向 Flink Batch 社区用户介绍 Flink DataStream API 批处理能力的演进之路。
567 2
Flink DataStream API 批处理能力演进之路
|
4月前
|
SQL 消息中间件 Kafka
实时计算 Flink版产品使用合集之支持sink到多分区的kafka ,还能保持有序吗
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
4月前
|
流计算
Flink CDC里关于doris的动态分区问题,对以及建好的动态分区表,可以再次修改历史分区的保留时间嘛?
【1月更文挑战第24天】【1月更文挑战第117篇】Flink CDC里关于doris的动态分区问题,对以及建好的动态分区表,可以再次修改历史分区的保留时间嘛?
107 6