在一段时间之前我们已介绍过IP(Interesting Property)对于优化器的意义以及它将对优化器的优化决策产生的影响。本篇我们将介绍Flink的批处理优化器中涉及到的所有的IP,我们将其统称为数据属性。后续我们会介绍Flink如何为优化器节点计算IP,并在之后的“剪枝”(pruning)阶段发挥作用。
数据属性
数据属性是个统称,来自于Flink优化器模块定义的子包名:dataproperties,需要注意的是这里属性的含义不是代码实现层面上类里的属性,而是指代对优化器的优化决策产生影响的一种指标。所以这里的属性对应到代码中的类。总体而言,有两类属性:
- 本地属性:用于表述对本地处理的任务(如,排序)产生影响的属性;
- 全局属性:用于表述对跨分区的数据传输(如,广播、哈希分区等)产生影响的属性;
再结合是否不可或缺,又可进一步细分为:
- LocalProperties:本地属性;
- GlobalProperties:全局属性;
- RequestedLocalProperties:必不可少的本地属性,是本地属性的子集,一旦缺少会导致对程序错误地优化并返回错误的结果;
- RequestedGlobalProperties:必不可少的全局属性,是全局属性的子集,一旦缺少会导致对程序错误地优化并返回错误的结果;
另外的两个额外的属性:
- InterestingProperties:它就是我们常说的IP,是一个属性容器类,包含了一系列的RequestedLocalProperties和RequestedGlobalProperties集合。IP将直接对优化器寻找最优计划产生影响,它将会从父运算符传递给子运算符(以sink为顶点的倒置的遍历树),并告知子运算符哪些属性可以帮助它获得最廉价的执行方案。例如,一个Reduce运算符,将告诉其子运算符分区信息,如果子运算符是join,那么它将根据获得的信息,保留数据分区形式并选择更合适的执行策略。
- PartitioningProperty:分区属性,枚举了所有被支持的分区类型;
以上这些属性类之间的关联关系如下图所示:
接下来我们会对以上这些属性类中的关键字段进行解读。
- LocalProperties
属性 | 类型 | 描述 |
---|---|---|
ordering(*) | Ordering | 一个分区内部的排序方式 |
groupedFields(*) | FieldList | 用于分组的字段集 |
uniqueFields | Set<FieldSet> | 在合并时值唯一的字段 |
* GlobalProperties
属性 | 类型 | 描述 |
---|---|---|
partitioning(*) | PartitioningProperty | 表示分区类型的属性 |
partitioningFields(*) | FieldList | 分区的字段 |
ordering(*) | Ordering | 如果分区方式为范围分区,该字段表示分区字段的排序顺序 |
uniqueFieldCombinations | Set<FieldSet> | 在合并时值唯一的字段 |
customPartitioner(*) | Partitioner<?> | 当partitioning指定为CUSTOM_PARTITIONING时,使用的分区器 |
distribution(*) | DataDistribution | 数据分布对象,当分区类型为RANCE_PARTITION时需要设置 |
上面两个表格中,被标记为”*”的属性,就是RequestedLocalProperties和RequestedGlobalProperties中的属性。
而InterestingProperties由RequestedLocalProperties以及RequestedGlobalProperties属性集合组成:
属性 | 类型 | 描述 |
---|---|---|
localProps | Set<RequestedLocalProperties> | 必备的本地属性集合 |
globalProps | Set<RequestedGlobalProperties> | 必备的全局属性集合 |
PartitioningProperty类枚举了跨分区或并行工作节点之间数据的shuffle形式:
- ANY_DISTRIBUTION:任何可能的数据分布形式,包括随机分区和完全复制;
- RANDOM_PARTITION:一种随机性的非复制型的数据分布方式;
- HASH_PARTITION:基于给定键的哈希分区方式;
- RANGE_PARTITION:基于特定键的范围分区方式;
- ANY_PARTITIONING:不在键上指定明确的分区方式;
- FULL_REPLICATION:将数据完全复制到每个并行的实例上去;
- FORCED_REBALANCED:强制重平衡,尽量保证每个分区上数据记录的均等;
- CUSTOM_PARTITIONING:自定义分区,可通过分区器(Partitioner)指定;
对于这些枚举值,哪些是事实上真正意义的分区呢?PartitioningProperty提供了一个isPartitioned方法来进行判断:
public boolean isPartitioned() {
return this != FULL_REPLICATION && this != FORCED_REBALANCED && this != ANY_DISTRIBUTION;
}
从代码段中可见,非FULL_REPLICATION、非FORCED_REBALANCED以及非ANY_DISTRIBUTION,其余的都被认为是真正意义上的分区。