分区策略
在 Apache Flink 中,分区(Partitioning)是将数据流按照一定的规则划分成多个子数据流或分片,以便在不同的并行任务或算子中并行处理数据。分区是实现并行计算和数据流处理的基础机制。Flink 的分区决定了数据在作业中的流动方式,以及在并行任务之间如何分配和处理数据。
在 Flink 中,数据流可以看作是一个有向图,图中的节点代表算子(Operators),边代表数据流(Data Streams)。数据从源算子流向下游算子,这些算子可能并行地处理输入数据,而分区就是决定数据如何从一个算子传递到另一个算子的机制。
shuffle
场景:增大分区、提高并行度,解决数据倾斜
DataStream → DataStream
分区元素随机均匀分发到下游分区,网络开销比较大
val env = StreamExecutionEnvironment.getExecutionEnvironment val stream = env.generateSequence(1,10).setParallelism(1) println(stream.getParallelism) stream.shuffle.print() env.execute()
console result:上游数据比较随意的分发到下游
2> 1 1> 4 7> 10 4> 6 6> 3 5> 7 8> 2 1> 5 1> 8 1> 9
rebalance
场景:增大分区、提高并行度,解决数据倾斜
DataStream → DataStream
轮询分区元素,均匀的将元素分发到下游分区,下游每个分区的数据比较均匀,在发生数据倾斜时非常有用,网络开销比较大
val env = StreamExecutionEnvironment.getExecutionEnvironment env.setParallelism(3) val stream = env.generateSequence(1,100) val shuffleStream = stream.rebalance shuffleStream.print() env.execute()
console result:上游数据比较均匀的分发到下游
8> 6 3> 1 5> 3 7> 5 1> 7 2> 8 6> 4 4> 2 3> 9 4> 10
rescale
场景:减少分区 防止发生大量的网络传输 不会发生全量的重分区
DataStream → DataStream
通过轮询分区元素,将一个元素集合从上游分区发送给下游分区,发送单位是集合,而不是一个个元素
注意:rescale发生的是本地数据传输,而不需要通过网络传输数据,比如taskmanager的槽数。简单来说,上游的数据只会发送给本TaskManager中的下游。
val env = StreamExecutionEnvironment.getExecutionEnvironment val stream = env.generateSequence(1,10).setParallelism(2) stream.writeAsText("./data/stream1").setParallelism(2) stream.rescale.writeAsText("./data/stream2").setParallelism(4) env.execute()
console result:stream1:1内容分发给stream2:1和stream2:2
stream1:1
1 3 5 7 9
stream1:2
2 4 6 8 10
stream2:1
1 5 9
stream2:2
3 7
stream2:3
2 6 10
stream2:4
4 8
broadcast
场景:需要使用映射表、并且映射表会经常发生变动的场景
DataStream → DataStream
上游中每一个元素内容广播到下游每一个分区中
val env = StreamExecutionEnvironment.getExecutionEnvironment val stream = env.generateSequence(1,10).setParallelism(2) stream.writeAsText("./data/stream1").setParallelism(2) stream.broadcast.writeAsText("./data/stream2").setParallelism(4) env.execute()
console result:stream1:1、2内容广播到了下游每个分区中
stream1:1
1 3 5 7 9
stream1:2
2 4 6 8 10
stream2:1
1 3 5 7 9 2 4 6 8 10
global
场景:并行度降为1
DataStream → DataStream
上游分区的数据只分发给下游的第一个分区
val env = StreamExecutionEnvironment.getExecutionEnvironment val stream = env.generateSequence(1,10).setParallelism(2) stream.writeAsText("./data/stream1").setParallelism(2) stream.global.writeAsText("./data/stream2").setParallelism(4) env.execute()
console result:stream1:1、2内容只分发给了stream2:1
stream1:1
1 3 5 7 9
stream1:2
2 4 6 8 10
stream2:1
1 3 5 7 9 2 4 6 8 10
forward
场景:一对一的数据分发,map、flatMap、filter 等都是这种分区策略
DataStream → DataStream
上游分区数据分发到下游对应分区中
partition1->partition1
partition2->partition2
注意:必须保证上下游分区数(并行度)一致,不然会有如下异常:
Forward partitioning does not allow change of parallelism * Upstream operation: Source: Sequence Source-1 parallelism: 2, * downstream operation: Sink: Unnamed-4 parallelism: 4 * stream.forward.writeAsText("./data/stream2").setParallelism(4)
val env = StreamExecutionEnvironment.getExecutionEnvironment val stream = env.generateSequence(1,10).setParallelism(2) stream.writeAsText("./data/stream1").setParallelism(2) stream.forward.writeAsText("./data/stream2").setParallelism(2) env.execute()
console result:stream1:1->stream2:1、stream1:2->stream2:2
stream1:1
1 3 5 7 9
stream1:2
2 4 6 8 10
stream2:1
1 3 5 7 9
stream2:2
2 4 6 8 10
keyBy
场景:与业务场景匹配
DataStream → DataStream
根据上游分区元素的Hash值与下游分区数取模计算出,将当前元素分发到下游哪一个分区
MathUtils.murmurHash(keyHash)(每个元素的Hash值) % maxParallelism(下游分区数)
val env = StreamExecutionEnvironment.getExecutionEnvironment val stream = env.generateSequence(1,10).setParallelism(2) stream.writeAsText("./data/stream1").setParallelism(2) stream.keyBy(0).writeAsText("./data/stream2").setParallelism(2) env.execute()
console result:根据元素Hash值分发到下游分区中
PartitionCustom
DataStream → DataStream
通过自定义的分区器,来决定元素是如何从上游分区分发到下游分区
object ShuffleOperator { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment env.setParallelism(2) val stream = env.generateSequence(1,10).map((_,1)) stream.writeAsText("./data/stream1") stream.partitionCustom(new customPartitioner(),0) .writeAsText("./data/stream2").setParallelism(4) env.execute() } class customPartitioner extends Partitioner[Long]{ override def partition(key: Long, numPartitions: Int): Int = { key.toInt % numPartitions } } }
本篇文章就到这里,感谢阅读,如果本篇博客有任何错误和建议,欢迎给我留言指正。