Flink批处理优化器之范围分区重写采用算法

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: 采样算法 上一篇我们分析了RangePartitionRewriter的数据处理分支,接下来我们开始分析采样分支,采样分支的核心在于采样算法。因为范围分区输入端每个分区的数据量无从得知,也就是说我们无法得出采样比例。

采样算法

上一篇我们分析了RangePartitionRewriter的数据处理分支,接下来我们开始分析采样分支,采样分支的核心在于采样算法。因为范围分区输入端每个分区的数据量无从得知,也就是说我们无法得出采样比例。此时,如果先对每分区内的所有数据进行遍历,再记录出数据总量会显得很低效,因此Flink选择借助于水塘抽样算法(https://en.wikipedia.org/wiki/Reservoir_sampling)来解决这个问题。

水塘抽样法是一种在线抽样法,可以在不知道样本总量或因样本数量太大而无法载入内存的情况下实现等概率抽样。

在实现时,Flink参考了IBM研究中心对该算法进行改进的一篇论文(Tirthapura, S., & Woodruff, D. P. (2011). Optimal Random Sampling from Distributed Streams Revisited. Lecture Notes in Computer Science),该论文对水塘抽样算法进行了改进以支持对大规模数据流进行随机采样,当输入元素是分布式且跨多个站点,这些站点之间的通信基于一个中央协调器。该算法被封装在ReservoirSamplerWithoutReplacement和ReservoirSamplerWithReplacement这两个类中。它们的继承关系图如下:

这两个采样类的实现基本都遵循如下两步:第一步,在每个分区中为其中的每个元素生成权重,选择权重最大的top K个元素作为每个分区的输出;第二步,从第一步的每个分区收集的K个元素中(此时总共是K * 分区个数的元素数目)选择权重最大的top K个元素。对于这两个采样类而言,第二步两者都是一致的,这部分的逻辑被封装在它们的父类DistributedRandomSampler中。区别在于第一步,ReservoirSamplerWithoutReplacement为每个输入元素生成一个随机数作为其权重,所以其不会重复选择元素,而ReservoirSamplerWithReplacement在第一步会为每个元素生成k次权重,这会导致一个元素可能会在计算top K时被多次选中。

就实现而言,第一步在DistributedRandomSampler中被定义为抽象方法sampleInPartition供子类实现,并要求在单个分区上执行,第二步则由DistributedRandomSampler自行实现,方法名为sampleInCoordinator,在一个全局归约函数中执行。

Flink基于MapPartition实现了一个UDF名为SampleInPartition,将两个采样算法的第一步应用其中,其对应的计划节点的并行度跟改写前的source节点的并行度一致。而对于第二步,Flink基于GroupReduceFunction实现了一个名为SampleInCoordinator的UDF用于归并所有来自各个SampleInPartition的样本输出,它会在全局归约函数中执行协调端的总体采样逻辑。用户必须确保该计划节点的并行度为1,才能使其成为唯一的中央协调器。当采样的样本数据确定之后就可以确定边界了,承担该职责的是范围分区构建器(RangeBoundaryBuilder),它是函数RichMapPartitionFunction的UDF实现,其计划节点的并行度跟SampleInCoordinator所对应的并行度保持一致。采样分支的并行化Dataflow示意图如下:

下面我们来分析一下代码实现,先确定的是样本总量(也就是top K的K的值),计算方式为每个分区的样本数乘以通道连接的下游目标范围分区的并行度(因为其关系到最终范围的划分边界):

final int sampleSize = SAMPLES_PER_PARTITION * targetParallelism;

这里,SAMPLES_PER_PARTITION常量表示每个分区的采样数,默认值为1000条。

在样本总量确定之后,就可以进行采样了,采样的具体实现并没什么特别的,就是按照上面的分析来实现,不再细述。我们主要来看一下如何根据最终的样本数据确定范围分区的每个分区的边界。

第一步对样本进行排序:

Collections.sort(sampledData, new Comparator<T>() {

    @Override
    public int compare(T first, T second) {
        return comparator.compare(first, second);
    }

});

第二步采用平均划分法来计算每个分区的边界,边界被存储于一个二维数组中,因为根据样本提取的临界值将会作为比较器的键存储在Object[]中。

int boundarySize = parallelism - 1;
Object[][] boundaries = new Object[boundarySize][];
if (sampledData.size() > 0) {
    //计算拆分的段
    double avgRange = sampledData.size() / (double) parallelism;
    int numKey = comparator.getFlatComparators().length;
    //每个并行度(分区)一个边界值
    for (int i = 1; i < parallelism; i++) {
        //计算得到靠近段尾的采样记录作为边界界定标准
        T record = sampledData.get((int) (i * avgRange));
        Object[] keys = new Object[numKey];
        comparator.extractKeys(record, keys, 0);
        boundaries[i-1] = keys;
    }
}

计算得到的boundaries会被输出到广播通道:

final NamedChannel broadcastChannel = new NamedChannel("RangeBoundaries", rbPlanNode);
broadcastChannel.setShipStrategy(ShipStrategyType.BROADCAST, DataExchangeMode.PIPELINED);
broadcastChannel.setTarget(ariPlanNode);
List<NamedChannel> broadcastChannels = new ArrayList<>(1);
broadcastChannels.add(broadcastChannel);
ariPlanNode.setBroadcastInputs(broadcastChannels);

广播通道连接着采样分支的尾部和数据处理分支的头部。


原文发布时间为:2017-04-07

本文作者:vinoYang

本文来自云栖社区合作伙伴CSDN博客,了解相关信息可以关注CSDN博客。

相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
目录
相关文章
|
6月前
|
算法 测试技术 C++
【动态规划】【C++算法】2518. 好分区的数目
【动态规划】【C++算法】2518. 好分区的数目
|
6月前
|
消息中间件 Kafka 流计算
Flink的分区表订阅功能是通过Kafka的topic分区来实现的
Flink的分区表订阅功能是通过Kafka的topic分区来实现的【1月更文挑战第6天】【1月更文挑战第26篇】
131 1
|
消息中间件 存储 Kafka
Flink---11、状态管理(按键分区状态(值状态、列表状态、Map状态、归约状态、聚合状态)算子状态(列表状态、广播状态))
Flink---11、状态管理(按键分区状态(值状态、列表状态、Map状态、归约状态、聚合状态)算子状态(列表状态、广播状态))
|
流计算
在Flink中,Regular Join(包括Left Join)的结果顺序是由Flink的分区策略和数据的分布方式共同决定的
在Flink中,Regular Join(包括Left Join)的结果顺序是由Flink的分区策略和数据的分布方式共同决定的
63 1
|
6月前
|
消息中间件 SQL Kafka
实时计算 Flink版产品使用合集之如何重写序列化器
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStreamAPI、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
3月前
|
Java 关系型数据库 MySQL
实时计算 Flink版操作报错合集之在使用批处理模式中使用flat_aggregate函数时报错,该如何解决
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
|
3月前
|
监控 搜索推荐 数据挖掘
Flink流处理与批处理大揭秘:实时与离线,一文让你彻底解锁!
【8月更文挑战第24天】Apache Flink 是一款开源框架,擅长流处理与批处理。流处理专攻实时数据流,支持无限数据流及事件驱动应用,实现数据的连续输入与实时处理。批处理则聚焦于静态数据集,进行一次性处理。两者差异体现在处理方式与应用场景:流处理适合实时性要求高的场景(例如实时监控),而批处理更适用于离线数据分析任务(如数据挖掘)。通过提供的示例代码,读者可以直观理解两种模式的不同之处及其实际应用。
211 0
|
4月前
|
算法 Java 开发者
Java面试题:Java内存探秘与多线程并发实战,Java内存模型及分区:理解Java堆、栈、方法区等内存区域的作用,垃圾收集机制:掌握常见的垃圾收集算法及其优缺点
Java面试题:Java内存探秘与多线程并发实战,Java内存模型及分区:理解Java堆、栈、方法区等内存区域的作用,垃圾收集机制:掌握常见的垃圾收集算法及其优缺点
39 0
|
5月前
|
存储 NoSQL 算法
Redis集群,集群的概念 三种主流分片方式1.哈希求余 一致性哈希算法:方案三:哈希槽分区算法问题一Redis集群是最多有16384个分片吗问题二:为什么是16384个,集群扩容:1.新的主节点
Redis集群,集群的概念 三种主流分片方式1.哈希求余 一致性哈希算法:方案三:哈希槽分区算法问题一Redis集群是最多有16384个分片吗问题二:为什么是16384个,集群扩容:1.新的主节点
|
6月前
|
存储 算法 API
Flink DataStream API 批处理能力演进之路
本文由阿里云 Flink 团队郭伟杰老师撰写,旨在向 Flink Batch 社区用户介绍 Flink DataStream API 批处理能力的演进之路。
607 2
Flink DataStream API 批处理能力演进之路