采样算法
上一篇我们分析了RangePartitionRewriter的数据处理分支,接下来我们开始分析采样分支,采样分支的核心在于采样算法。因为范围分区输入端每个分区的数据量无从得知,也就是说我们无法得出采样比例。此时,如果先对每分区内的所有数据进行遍历,再记录出数据总量会显得很低效,因此Flink选择借助于水塘抽样算法(https://en.wikipedia.org/wiki/Reservoir_sampling)来解决这个问题。
水塘抽样法是一种在线抽样法,可以在不知道样本总量或因样本数量太大而无法载入内存的情况下实现等概率抽样。
在实现时,Flink参考了IBM研究中心对该算法进行改进的一篇论文(Tirthapura, S., & Woodruff, D. P. (2011). Optimal Random Sampling from Distributed Streams Revisited. Lecture Notes in Computer Science),该论文对水塘抽样算法进行了改进以支持对大规模数据流进行随机采样,当输入元素是分布式且跨多个站点,这些站点之间的通信基于一个中央协调器。该算法被封装在ReservoirSamplerWithoutReplacement和ReservoirSamplerWithReplacement这两个类中。它们的继承关系图如下:
这两个采样类的实现基本都遵循如下两步:第一步,在每个分区中为其中的每个元素生成权重,选择权重最大的top K个元素作为每个分区的输出;第二步,从第一步的每个分区收集的K个元素中(此时总共是K * 分区个数的元素数目)选择权重最大的top K个元素。对于这两个采样类而言,第二步两者都是一致的,这部分的逻辑被封装在它们的父类DistributedRandomSampler中。区别在于第一步,ReservoirSamplerWithoutReplacement为每个输入元素生成一个随机数作为其权重,所以其不会重复选择元素,而ReservoirSamplerWithReplacement在第一步会为每个元素生成k次权重,这会导致一个元素可能会在计算top K时被多次选中。
就实现而言,第一步在DistributedRandomSampler中被定义为抽象方法sampleInPartition供子类实现,并要求在单个分区上执行,第二步则由DistributedRandomSampler自行实现,方法名为sampleInCoordinator,在一个全局归约函数中执行。
Flink基于MapPartition实现了一个UDF名为SampleInPartition,将两个采样算法的第一步应用其中,其对应的计划节点的并行度跟改写前的source节点的并行度一致。而对于第二步,Flink基于GroupReduceFunction实现了一个名为SampleInCoordinator的UDF用于归并所有来自各个SampleInPartition的样本输出,它会在全局归约函数中执行协调端的总体采样逻辑。用户必须确保该计划节点的并行度为1,才能使其成为唯一的中央协调器。当采样的样本数据确定之后就可以确定边界了,承担该职责的是范围分区构建器(RangeBoundaryBuilder),它是函数RichMapPartitionFunction的UDF实现,其计划节点的并行度跟SampleInCoordinator所对应的并行度保持一致。采样分支的并行化Dataflow示意图如下:
下面我们来分析一下代码实现,先确定的是样本总量(也就是top K的K的值),计算方式为每个分区的样本数乘以通道连接的下游目标范围分区的并行度(因为其关系到最终范围的划分边界):
final int sampleSize = SAMPLES_PER_PARTITION * targetParallelism;
这里,SAMPLES_PER_PARTITION常量表示每个分区的采样数,默认值为1000条。
在样本总量确定之后,就可以进行采样了,采样的具体实现并没什么特别的,就是按照上面的分析来实现,不再细述。我们主要来看一下如何根据最终的样本数据确定范围分区的每个分区的边界。
第一步对样本进行排序:
Collections.sort(sampledData, new Comparator<T>() {
@Override
public int compare(T first, T second) {
return comparator.compare(first, second);
}
});
第二步采用平均划分法来计算每个分区的边界,边界被存储于一个二维数组中,因为根据样本提取的临界值将会作为比较器的键存储在Object[]中。
int boundarySize = parallelism - 1;
Object[][] boundaries = new Object[boundarySize][];
if (sampledData.size() > 0) {
//计算拆分的段
double avgRange = sampledData.size() / (double) parallelism;
int numKey = comparator.getFlatComparators().length;
//每个并行度(分区)一个边界值
for (int i = 1; i < parallelism; i++) {
//计算得到靠近段尾的采样记录作为边界界定标准
T record = sampledData.get((int) (i * avgRange));
Object[] keys = new Object[numKey];
comparator.extractKeys(record, keys, 0);
boundaries[i-1] = keys;
}
}
计算得到的boundaries会被输出到广播通道:
final NamedChannel broadcastChannel = new NamedChannel("RangeBoundaries", rbPlanNode);
broadcastChannel.setShipStrategy(ShipStrategyType.BROADCAST, DataExchangeMode.PIPELINED);
broadcastChannel.setTarget(ariPlanNode);
List<NamedChannel> broadcastChannels = new ArrayList<>(1);
broadcastChannels.add(broadcastChannel);
ariPlanNode.setBroadcastInputs(broadcastChannels);
广播通道连接着采样分支的尾部和数据处理分支的头部。
原文发布时间为:2017-04-07
本文作者:vinoYang
本文来自云栖社区合作伙伴CSDN博客,了解相关信息可以关注CSDN博客。