全网最详细4W字Flink入门笔记(上) 5

本文涉及的产品
实时计算 Flink 版,1000CU*H 3个月
简介: 全网最详细4W字Flink入门笔记(上)

分区策略

在 Apache Flink 中,分区(Partitioning)是将数据流按照一定的规则划分成多个子数据流或分片,以便在不同的并行任务或算子中并行处理数据。分区是实现并行计算和数据流处理的基础机制。Flink 的分区决定了数据在作业中的流动方式,以及在并行任务之间如何分配和处理数据。

在 Flink 中,数据流可以看作是一个有向图,图中的节点代表算子(Operators),边代表数据流(Data Streams)。数据从源算子流向下游算子,这些算子可能并行地处理输入数据,而分区就是决定数据如何从一个算子传递到另一个算子的机制。

shuffle

场景:增大分区、提高并行度,解决数据倾斜

DataStream → DataStream

分区元素随机均匀分发到下游分区,网络开销比较大

val env = StreamExecutionEnvironment.getExecutionEnvironment
val stream = env.generateSequence(1,10).setParallelism(1)
println(stream.getParallelism)
stream.shuffle.print()
env.execute()

console result:上游数据比较随意的分发到下游

2> 1
1> 4
7> 10
4> 6
6> 3
5> 7
8> 2
1> 5
1> 8
1> 9

rebalance

场景:增大分区、提高并行度,解决数据倾斜

DataStream → DataStream

轮询分区元素,均匀的将元素分发到下游分区,下游每个分区的数据比较均匀,在发生数据倾斜时非常有用,网络开销比较大

val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(3)
val stream = env.generateSequence(1,100)
val shuffleStream = stream.rebalance
shuffleStream.print()
env.execute()

console result:上游数据比较均匀的分发到下游

8> 6
3> 1
5> 3
7> 5
1> 7
2> 8
6> 4
4> 2
3> 9
4> 10

rescale

场景:减少分区  防止发生大量的网络传输   不会发生全量的重分区

DataStream → DataStream

通过轮询分区元素,将一个元素集合从上游分区发送给下游分区,发送单位是集合,而不是一个个元素

注意:rescale发生的是本地数据传输,而不需要通过网络传输数据,比如taskmanager的槽数。简单来说,上游的数据只会发送给本TaskManager中的下游。

val env = StreamExecutionEnvironment.getExecutionEnvironment
val stream = env.generateSequence(1,10).setParallelism(2)
stream.writeAsText("./data/stream1").setParallelism(2)
stream.rescale.writeAsText("./data/stream2").setParallelism(4)
env.execute()

console result:stream1:1内容分发给stream2:1和stream2:2

stream1:1

1
3
5
7
9

stream1:2

2
4
6
8
10

stream2:1

1
5
9

stream2:2

3
7

stream2:3

2
6
10

stream2:4

4
8

broadcast

场景:需要使用映射表、并且映射表会经常发生变动的场景

DataStream → DataStream

上游中每一个元素内容广播到下游每一个分区中

val env = StreamExecutionEnvironment.getExecutionEnvironment
val stream = env.generateSequence(1,10).setParallelism(2)
stream.writeAsText("./data/stream1").setParallelism(2)
stream.broadcast.writeAsText("./data/stream2").setParallelism(4)
env.execute()

console result:stream1:1、2内容广播到了下游每个分区中

stream1:1

1
3
5
7
9

stream1:2

2
4
6
8
10

stream2:1

1
3
5
7
9
2
4
6
8
10

global

场景:并行度降为1

DataStream → DataStream

上游分区的数据只分发给下游的第一个分区

val env = StreamExecutionEnvironment.getExecutionEnvironment
val stream = env.generateSequence(1,10).setParallelism(2)
stream.writeAsText("./data/stream1").setParallelism(2)
stream.global.writeAsText("./data/stream2").setParallelism(4)
env.execute()

console result:stream1:1、2内容只分发给了stream2:1

stream1:1

1
3
5
7
9

stream1:2

2
4
6
8
10

stream2:1

1
3
5
7
9
2
4
6
8
10

forward

场景:一对一的数据分发,map、flatMap、filter 等都是这种分区策略

DataStream → DataStream

上游分区数据分发到下游对应分区中

partition1->partition1

partition2->partition2

注意:必须保证上下游分区数(并行度)一致,不然会有如下异常:

Forward partitioning does not allow change of parallelism
* Upstream operation: Source: Sequence Source-1 parallelism: 2,
* downstream operation: Sink: Unnamed-4 parallelism: 4
* stream.forward.writeAsText("./data/stream2").setParallelism(4)
val env = StreamExecutionEnvironment.getExecutionEnvironment
val stream = env.generateSequence(1,10).setParallelism(2)
stream.writeAsText("./data/stream1").setParallelism(2)
stream.forward.writeAsText("./data/stream2").setParallelism(2)
env.execute()

console result:stream1:1->stream2:1、stream1:2->stream2:2

stream1:1

1
3
5
7
9

stream1:2

2
4
6
8
10

stream2:1

1
3
5
7
9

stream2:2

2
4
6
8
10

keyBy

场景:与业务场景匹配

DataStream → DataStream

根据上游分区元素的Hash值与下游分区数取模计算出,将当前元素分发到下游哪一个分区

MathUtils.murmurHash(keyHash)(每个元素的Hash值) % maxParallelism(下游分区数)
val env = StreamExecutionEnvironment.getExecutionEnvironment
val stream = env.generateSequence(1,10).setParallelism(2)
stream.writeAsText("./data/stream1").setParallelism(2)
stream.keyBy(0).writeAsText("./data/stream2").setParallelism(2)
env.execute()

console result:根据元素Hash值分发到下游分区中

PartitionCustom

DataStream → DataStream

通过自定义的分区器,来决定元素是如何从上游分区分发到下游分区

object ShuffleOperator {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(2)
    val stream = env.generateSequence(1,10).map((_,1))
    stream.writeAsText("./data/stream1")
    stream.partitionCustom(new customPartitioner(),0)
      .writeAsText("./data/stream2").setParallelism(4)
    env.execute()
  }
  class customPartitioner extends Partitioner[Long]{
    override def partition(key: Long, numPartitions: Int): Int = {
      key.toInt % numPartitions
    }
  }
}




本篇文章就到这里,感谢阅读,如果本篇博客有任何错误和建议,欢迎给我留言指正。

相关实践学习
基于Hologres+Flink搭建GitHub实时数据大屏
通过使用Flink、Hologres构建实时数仓,并通过Hologres对接BI分析工具(以DataV为例),实现海量数据实时分析.
实时计算 Flink 实战课程
如何使用实时计算 Flink 搞定数据处理难题?实时计算 Flink 极客训练营产品、技术专家齐上阵,从开源 Flink功能介绍到实时计算 Flink 优势详解,现场实操,5天即可上手! 欢迎开通实时计算 Flink 版: https://cn.aliyun.com/product/bigdata/sc Flink Forward Asia 介绍: Flink Forward 是由 Apache 官方授权,Apache Flink Community China 支持的会议,通过参会不仅可以了解到 Flink 社区的最新动态和发展计划,还可以了解到国内外一线大厂围绕 Flink 生态的生产实践经验,是 Flink 开发者和使用者不可错过的盛会。 去年经过品牌升级后的 Flink Forward Asia 吸引了超过2000人线下参与,一举成为国内最大的 Apache 顶级项目会议。结合2020年的特殊情况,Flink Forward Asia 2020 将在12月26日以线上峰会的形式与大家见面。
目录
相关文章
|
Java 流计算
【极数系列】Flink搭建入门项目Demo & 秒懂Flink开发运行原理(05)
【极数系列】Flink搭建入门项目Demo & 秒懂Flink开发运行原理(05)
470 3
|
存储 Java Linux
10分钟入门Flink--安装
本文介绍Flink的安装步骤,主要是Flink的独立部署模式,它不依赖其他平台。文中内容分为4块:前置准备、Flink本地模式搭建、Flink Standalone搭建、Flink Standalong HA搭建。
10分钟入门Flink--安装
|
Java Linux API
flink入门-流处理
flink入门-流处理
423 0
|
分布式计算 Java API
Flink教程(04)- Flink入门案例
Flink教程(04)- Flink入门案例
356 0
|
存储 缓存 算法
[尚硅谷flink] 检查点笔记
[尚硅谷flink] 检查点笔记
388 3
|
分布式计算 监控 API
flink 入门编程day02
flink 入门编程day02
176 5
|
存储 传感器 消息中间件
[尚硅谷 flink] 状态管理 笔记
[尚硅谷 flink] 状态管理 笔记
221 0
|
SQL 关系型数据库 Apache
Apache Doris 整合 FLINK CDC 、Paimon 构建实时湖仓一体的联邦查询入门
Apache Doris 整合 FLINK CDC 、Paimon 构建实时湖仓一体的联邦查询入门
2160 3
|
分布式计算 大数据 Hadoop
终于学完了阿里云大数据架构师推荐的Flink入门与实战PDF
Flink项目是大数据计算领域冉冉升起的一颗新星。大数据计算引擎的发展经历了几个过程,从第1代的MapReduce,到第2代基于有向无环图的Tez,第3代基于内存计算的Spark,再到第4代的Flink。因为Flink可以基于Hadoop进行开发和使用,所以Flink并不会取代Hadoop,而是和Hadoop紧密结合。
终于学完了阿里云大数据架构师推荐的Flink入门与实战PDF
|
存储 缓存 分布式计算
Flink教程(02)- Flink入门(下)
Flink教程(02)- Flink入门(下)
201 0

热门文章

最新文章