5:flink 维表关联的各种方案和优缺点?
1:预加载维表
通过定义一个类实现RichMapFunction,在open()中读取维表数据加载到内存中,在probe流map()方法中与维表数据进行关联。RichMapFunction中open方法里加载维表数据到内存的方式特点如下:
优点:实现简单. 缺点:因为数据存于内存维度信息全量加载到内存中,所以只适合小数据量并且维表数据更新频率不高的情况下。虽然可以在open中定义一个定时器定时更新维表,但是还是存在维表更新不及时的情况。下面是一个例子:
/ 使用redis ,这个代码核心是 open方法里开java 定时调度更新redis到算子本地变量 // mysql预加载到redis也是一样的 // 不只是可以从 MySQL 中去读取,可以自定义各种数据源、 //各种 DB,甚至可以读取文件,也可以读取 Flink 的 Distributed Cache。 static class SimpleFlatMapFunction extends RichFlatMapFunction<String,OutData>{ private transient ConcurrentHashMap<String, String> hashMap = null; @Override public void open(Configuration parameters) throws Exception { super.open(parameters); //redis Jedis jedisCluster = RedisFactory.getJedisCluster(); ScanResult<Map.Entry<String, String>> areas =jedisCluster.hscan("areas", "0"); List<Map.Entry<String, String>> result = areas.getResult(); System.out.println("更新缓存"); hashMap = new ConcurrentHashMap<>(); for (Map.Entry<String, String> stringStringEntry : result) { String key = stringStringEntry.getKey(); String[] split = stringStringEntry.getValue().split(","); for (String s : split) { hashMap.put(s, key); } } jedisCluster.close(); ScheduledExecutorService scheduledExecutorService =Executors.newScheduledThreadPool(1); scheduledExecutorService.scheduleAtFixedRate(new Runnable() { @Override public void run() { System.out.println("更新缓存"); Jedis jedisCluster = RedisFactory.getJedisCluster(); ScanResult<Map.Entry<String, String>> areas = jedisCluster.hscan("areas", "0"); List<Map.Entry<String, String>> result = areas.getResult(); hashMap = new ConcurrentHashMap<>(); for (Map.Entry<String, String> stringStringEntry : result) { String key = stringStringEntry.getKey(); String[] split = stringStringEntry.getValue().split(","); for (String s : split) { hashMap.put(s, key); } } jedisCluster.close(); } }, 0, 3, TimeUnit.SECONDS); } @Override public void flatMap(String s, Collector<OutData> collector) throws Exception { OriginData originData = JSONObject.parseObject(s, OriginData.class); String countryCode = originData.countryCode; ArrayList<Data> data = originData.data; String dt = originData.dt; String coutryCode = hashMap.get(countryCode); for (Data datum : data) { OutData of = OutData.of(dt, coutryCode, datum.type, datum.score, datum.level); collector.collect(of); } } }
2:热存储维表
这种方式是将维表数据存储在Redis、HBase、MySQL等外部存储中,实时流在关联维表数据的时候实时去外部存储中查询,这种方式特点如下:
优点:维度数据量不受内存限制,可以存储很大的数据量。缺点:因为维表数据在外部存储中,读取速度受制于外部存储的读取速度;另外维表的同步也有延迟。
3:广播维表
利用Flink的Broadcast State将维度数据流广播到下游做join操作。特点如下:
优点:维度数据变更后可以即时更新到结果中。
缺点:数据保存在内存中,支持的维度数据量比较小。
使用:
1.将维度数据发送到Kafka作为流S1。事实数据是流S2。
2.定义状态描述符MapStateDescriptor,如descriptor。
3.结合状态描述符,将S1广播出去,如S1.broadcast(descriptor),形成广播流(BroadcastStream) B1。
4.事实流S2和广播流B1连接,形成连接后的流BroadcastConnectedStream BC。
5.基于BC流,在KeyedBroadcastProcessFunction/BroadcastProcessFunction中实现Join的逻 辑处理。
4:异步IO+guava
异步IO主要目的是为了解决与外部系统交互时网络延迟成为了系统瓶颈的问题
使用Aysnc I/O的前提条件
1)为了实现以异步I/O访问数据库或K/V存储,数据库等需要有能支持异步请求的client;若是没有,可以通过创建多个同步的client并使用线程池处理同步call的方式实现类似并发的client,但是这方式没有异步I/O的性能好。
2)AsyncFunction不是以多线程方式调用的,一个AsyncFunction实例按顺序为每个独立消息发送请求;
Flink中可以使用异步IO来读写外部系统,这要求外部系统客户端支持异步IO,不过目前很多系统都支持异步IO客户端。但是如果使用异步就要涉及到三个问题:
超时:如果查询超时那么就认为是读写失败,需要按失败处理;
并发数量:如果并发数量太多,就要触发Flink的反压机制来抑制上游的写入;
返回顺序错乱:顺序错乱了要根据实际情况来处理,Flink支持两种方式:允许乱序、保证顺序
使用缓存来存储一部分常访问的维表数据,以减少访问外部系统的次数,比如使用guava Cache
优点:维度数据不受限于内存,支持较多维度数据
缺点:需要热存储资源,维度更新反馈到结果有延迟(热存储导入,cache)
适用场景:维度数据量大,可接受维度更新有一定的延迟。
5:Temporal table function join(时态表)
Temporal table是持续变化表上某一时刻的视图,Temporal table function是一个表函数,(历史表)
传递一个时间参数,返回Temporal table这一指定时刻的视图。
可以将维度数据流映射为Temporal table,主流与这个Temporal table进行关联,可以关联到某一个版本(历史上某一个时刻)的维度数据。
优点:维度数据量可以很大,维度数据更新及时,不依赖外部存储,可以关联不同版本的维度数据。
缺点:只支持在Flink SQL API中使用。
6:flink异步io?
异步io前置条件:
- 对外部系统进行异步IO访问的客户端API
- 或者在没有这样的客户端的情况下,可以通过创建多个客户端并使用线程池处理同步调用来尝试将同步客户端转变为有限的并发客户端。但是,这种方法通常比适当的异步客户端效率低。
异步io流式转换三步:
1、实现用来分发请求的AsyncFunction,用来向数据库发送异步请求并设置回调
2、获取操作结果的callback,并将它提交给ResultFuture
3、将异步I/O操作应用于DataStream, 当异步I/O请求超时时,默认情况下会抛出异常并重新启动Job,如果希望处理超时,可以覆盖AsyncFunction的timeout方法
异步io参数含义:
- Timeout:超时参数定义了异步操作执行多久未完成、最终认定为失败的时长,如果启用重试,则可能包括多个重试请求。它可以防止一直等待得不到响应的请求。
- Capacity:容量参数定义了可以同时进行的异步请求数。即使异步 I/O 通常带来更高的吞吐量,执行异步 I/O 操作的算子仍然可能成为流处理的瓶颈。限制并发请求的数量可以确保算子不会持续累积待处理的请求进而造成积压,而是在容量耗尽时触发反压。
- AsyncRetryStrategy: 重试策略参数定义了什么条件会触发延迟重试以及延迟的策略,例如,固定延迟、指数后退延迟、自定义实现等。
超时处理 #
当异步 I/O 请求超时的时候,默认会抛出异常并重启作业。如果你想处理超时,可以重写 AsyncFunction#timeout 方法。重写 AsyncFunction#timeout 时别忘了调用 ResultFuture.complete() 或者 ResultFuture.completeExceptionally() 以便告诉Flink这条记录的处理已经完成。如果超时发生时你不想发出任何记录,你可以调用 ResultFuture.complete(Collections.emptyList()) 。
结果的顺序 #
AsyncFunction 发出的并发请求经常以不确定的顺序完成,这取决于请求得到响应的顺序。Flink 提供两种模式控制结果记录以何种顺序发出。
- 无序模式:异步请求一结束就立刻发出结果记录。流中记录的顺序在经过异步 I/O 算子之后发生了改变。当使用 处理时间 作为基本时间特征时,这个模式具有最低的延迟和最少的开销。此模式使用 AsyncDataStream.unorderedWait(...) 方法。
- 有序模式: 这种模式保持了流的顺序。发出结果记录的顺序与触发异步请求的顺序(记录输入算子的顺序)相同。为了实现这一点,算子将缓冲一个结果记录直到这条记录前面的所有记录都发出(或超时)。由于记录或者结果要在 checkpoint 的状态中保存更长的时间,所以与无序模式相比,有序模式通常会带来一些额外的延迟和 checkpoint 开销。此模式使用 AsyncDataStream.orderedWait(...) 方法。
事件时间 #
当流处理应用使用事件时间时,异步 I/O 算子会正确处理 watermark。对于两种顺序模式,这意味着以下内容:
- 无序模式:Watermark 既不超前于记录也不落后于记录,即 watermark 建立了顺序的边界。只有连续两个 watermark 之间的记录是无序发出的。在一个 watermark 后面生成的记录只会在这个 watermark 发出以后才发出。在一个 watermark 之前的所有输入的结果记录全部发出以后,才会发出这个 watermark。
这意味着存在 watermark 的情况下,无序模式 会引入一些与有序模式 相同的延迟和管理开销。开销大小取决于 watermark 的频率。
- 有序模式:连续两个 watermark 之间的记录顺序也被保留了。开销与使用处理时间 相比,没有显著的差别。
请记住,摄入时间 是一种特殊的事件时间,它基于数据源的处理时间自动生成 watermark。
详细说明:异步 I/O | Apache Flink
7:flink savepoint与checkpoint的应用和区别?
savepoint:
触发:用户手动触发
应用:有计划的备份,比如修改代码,并行度。
savepoint是对某个时间进行全局快照,在快照时,会对整个flink程序有一定影响。
checkpoint:
触发:根据用户设置的参数,交给flink程序自行触发。
应用:某个task的任务,由网络抖动导致超时异常。checkpoint能够快速恢复。
特点:任务量轻,自动从故障点恢复。
8:flink task slot 并行度之间的关系
TaskSlot是静态的概念,代表着Taskmanager具有的并发执行能力
parallelism是动态的概念,是指程序运行时实际使用的并发能力
Flink中slot是任务执行所申请资源的最小单元,同一个TaskManager上的所有slot都只是做了内存分离,没有做CPU隔离。
每一个TaskManager都是一个JVM进程,如果某个TaskManager 上只有一个 slot,这意味着每个 task 组都在单独的 JVM 中运行,如果有多个 slot 就意味着更多 subtask 共享同一 JVM。
一般情况下有多少个subtask,就是有多少个并行线程,而并行执行的subtask要发布到不同的slot中去执行。
Flink 默认会将能链接的算子尽可能地进行链接,也就是算子链,flink 会将同一个算子链分组内的subtask都发到同一个slot去执行,也就是说一个slot可能要执行多个subtask,即多个线程。
flink 可以根据需要手动地将各个算子隔离到不同的 slot 中。
一个任务所用的总共slot为所有资源隔离组所占用的slot之和,同一个资源隔离组内,按照算子的最大并行度来分配slot。
经验上讲Slot的数量与CPU-core的数量一致为好。但考虑到超线程,可以slotNumber=2*cpuCore.
在Yarn集群中Job分离模式下,Taskmanger的数量=slot数量/并行度(向上取整)。slotNumber>=taskmanger*并行度;并行度上限不能大于slot的数量
9:flink 内存管理
jobmanger内存:
可以看到JobManager的内存模型很简单了,主要是堆内存,堆外内存,JVM Metaspace和JVM Overhead组成。
TaskManger内存:
总体内存
1、Total Process Memory:Flink Java 应用程序(包括用户代码)和 JVM 运行整个进程所消耗的总内存。
总进程内存(Total Process Memory) = Flink 总内存 + JVM 元空间 + JVM 执行开销
Total Flink Memory
仅 Flink Java 应用程序消耗的内存,包括用户代码,但不包括 JVM 为其运行而分配的内存。
Flink 总内存 = Framework堆内外 + task 堆内外 + network + managed Memory
JVM Heap (JVM 堆上内存)
Framework Heap :框架堆内存,Flink框架本身使用的内存,即TaskManager本身所占用的堆上内存,不计入Slot的资源中。
Task Heap : 任务堆内存,如果内存大小没有指定,它将被推导出为总 Flink 内存减去框架堆内存、框架堆外内存、任务堆外内存、托管内存和网络内存。Task执行用户代码时所使用的堆上内存。
TaskManager 的堆内存主要被分成了三个部分:
Network Buffers:一定数量的32KB大小的 buffer,主要用于数据的网络传输。在 TaskManager 启动的时候就会分配。默认数量是 2048 个,可以通过 taskmanager.network.numberOfBuffers 来配置。
Memory Manager Pool:这是一个由 MemoryManager 管理的,由众多MemorySegment组成的超大集合。Flink 中的算法(如 sort/shuffle/join)会向这个内存池申请 MemorySegment,将序列化后的数据存于其中,使用完后释放回内存池。默认情况下,池子占了堆内存的 70% 的大小。
Remaining (Free) Heap: 这部分的内存是留给用户代码以及 TaskManager 的数据结构使用的。因为这些数据结构一般都很小,所以基本上这些内存都是给用户代码使用的。从GC的角度来看,可以把这里看成的新生代,也就是说这里主要都是由用户代码生成的短期对象。
注意:Memory Manager Pool 主要在Batch模式下使用。在Steaming模式下,该池子不会预分配内存,也不会向该池子请求内存块。也就是说该部分的内存都是可以给用户代码使用的。不过社区是打算在 Streaming 模式下也能将该池子利用起来。(flink1.13之后已经可以实现流批统一了)
Off-Heap Mempry(JVM 堆外内存)
Managed memory: 托管内存
由 Flink 管理的原生托管内存,保留用于排序、哈希表、中间结果缓存和 RocksDB 状态后端。
托管内存由 Flink 管理并分配为原生内存(堆外)。以下工作负载使用托管内存:
流式作业可以将其用于 RocksDB 状态后端。流和批处理作业都可以使用它进行排序、哈希表、中间结果的缓存。流作业和批处理作业都可以使用它在 Python 进程中执行用户定义的函数。
托管内存配置时如果两者都设置,则大小将覆盖分数。如果大小和分数均未明确配置,则将使用默认分数。
DirectMemory:JVM 直接内存
Framework Off-Heap Memory:Flink 框架堆外内存。即 TaskManager 本身所占用的对外内存,不计入 Slot 资源。
Task Off-Heap :Task 堆外内存。专用于Flink 框架的堆外直接(或本机)内存。Network Memory:网络内存。网络数据交换所使用的堆外内存大小,如网络数据交换 缓冲区。
JVM metaspace:JVM 元空间。
Flink JVM 进程的元空间大小,默认为256MB。
JVM Overhead:JVM执行开销。
JVM 执行时自身所需要的内容,包括线程堆栈、IO、 编译缓存等所使用的内存,这是一个上限分级成分的的总进程内存
内存数据结构
内存段:MemorySegment,是 Flink 中最小的内存分配单元,即可以是堆上内存(Java 的 byte 数组),也可以是堆外内存(基于 Netty 的 DirectByteBuffer)
内存页:是 MemorySegment 之上的数据访问视图,数据读取抽象为 DataInputView,数据写入抽象为 DataOutputView。使用时就无需关心 MemorySegment 的细节,会自 动处理跨 MemorySegment 的读取和写入。
buffer:Task 算子之间在网络层面上传输数据,使用的是 Buffer,申请和释放由 Flink自行管理
Buffer 资源池:BufferPool 用来管理 Buffer,包含 Buffer 的申请、释放、销毁、可用 Buffer 通知等,实现类是 LocalBufferPool,每个 Task 拥有自己的 LocalBufferPool。BufferPoolFactory 用 来 提 供 BufferPool 的 创 建 和 销 毁 , 唯 一 的 实 现 类 是NetworkBufferPool , 每 个 TaskManager 只 有 一 个 NetworkBufferPool 。同一个TaskManager 上的 Task 共享 NetworkBufferPool,在 TaskManager 启动的时候创建并分配内存。
10:flink三种提交方式的区别?
session模式:
集群生命周期:客户端连接到一个预先存在的、长期运行的集群,该集群可以接受多个作业提交。即使所有作业完成后,集群(和 JobManager)仍将继续运行直到手动停止 session 为止。因此,Flink Session 集群的寿命不受任何 Flink 作业寿命的约束。
资源隔离:TaskManager slot 由 ResourceManager 在提交作业时分配,并在业完成时释放。由于所有作业都共享同一集群,因此在集群资源方面存在一些竞争 — 例如提交工作阶段的网络带宽。此共享设置的局限性在于,如果 TaskManager 崩溃,则在此 TaskManager 上运行 task 的所有作业都将失败;类似的,如果 JobManager 上发生一些致命错误,它将影响集群中正在运行的所有作业。(作业共享集群资源)
其他注意事项:拥有一个预先存在的集群可以节省大量时间申请资源和启动 TaskManager。有种场景很重要,作业执行时间短并且启动时间长会对端到端的用户体验产生负面的影响 — 就像对简短查询的交互式分析一样,希望作业可以使用现有资源快速执行计算。
适用场景:Session模式一般用来部署那些对延迟非常敏感但运行时长较短的作业,需要频繁提交小job的场景。
pre-job模式:
集群生命周期:在 Flink pre-Job 集群中,可用的集群管理器(例如 YARN)用于为每个提交的作业启动一个集群,并且该集群仅可用于该作业。在这里,客户端首先从集群管理器请求资源启动 JobManager,然后将作业提交给在这个进程中运行的 Dispatcher。然后根据作业的资源请求惰性的分配 TaskManager。一旦作业完成,Flink Job 集群将被拆除。
资源隔离:JobManager 中的致命错误仅影响在 Flink Job 集群中运行的一个作业。(每个作业单独启动集群)
其他注意事项:由于 ResourceManager 必须应用并等待外部资源管理组件来启动 TaskManager 进程和分配资源,因此 Flink Job 集群更适合长期运行、具有高稳定性要求且对较长的启动时间不敏感的大型作业。
适用场景:Per-Job模式一般用来部署那些长时间运行的作业。
application模式(推荐生产使用):
集群生命周期:Flink Application 集群是专用的 Flink 集群,仅从 Flink 应用程序执行作业,并且 main()方法在集群上而不是客户端上运行。提交作业是一个单步骤过程:无需先启动 Flink 集群,然后将作业提交到现有的 session 集群;相反,将应用程序逻辑和依赖打包成一个可执行的作业 JAR 中,并且集群入口(ApplicationClusterEntryPoint)负责调用 main()方法来提取 JobGraph。例如,这允许你像在 Kubernetes 上部署任何其他应用程序一样部署 Flink 应用程序。因此,Flink Application 集群的寿命与 Flink 应用程序的寿命有关。
资源隔离:在 Flink Application 集群中,ResourceManager 和 Dispatcher 作用于单个的 Flink 应用程序,相比于 Flink Session 集群,它提供了更好的隔离。(每个job独享一个集群)
总结:
application:每个job独享一个集群,job退出则集群退出。main方法在集群上运行。
session:多个job共享集群资源,job退出集群也不会退出。main方法在客户端运行。
pre-job:每个job独享一个集群,job退出则集群退出。main方法在客户端运行。
11:flink 背压原理?定位与解决方式?
反压是什么?
反压是流式系统中关于处理能力的动态反馈机制,并且是从下游到上游的反馈。(上下游数据生产和消费速率不均衡)
上游 Producer 向下游 Consumer 发送数据,在发送端和接受端都有相应的 Send Buffer 和 Receive Buffer,但是上游 Producer 生成数据的速率比下游 Consumer 消费数据的速率快。
此时可能会出现两种情况:
下游消费者会丢弃新到达的数据,因为下游消费者的缓冲区放不下
为了不丢弃数据,所以下游消费者的 Receive Buffer 持续扩张,最后耗尽消费者的内存,OOM,程序挂掉
Flink 的 checkpoint 反压还会影响到两项指标: checkpoint 时长和 state 大小。
反压原理?
v1.5之前 TCP反压
流程太长,省略...
TCP的反压,通过callback实现的,当socket发送数据到receive buffer后,receiver反馈给send端,目前receiver端的buffer还有多少剩余空间,让后send端会根据剩余空间,控制发送速率。
TCP反压的弊端:
① 单个Task的反压,阻塞了整个TaskManager的socket,导致checkpoint barrier也无法传播,最终导致checkpoint时间增长甚至checkpoint超时失败。② 反压路径太长,导致反压时间延迟
v1.5之后 Credit-based反压
Credit信用值,backlog值===》专有buffer 队列
反压机制作用于 Flink 的应用层,即在 ResultSubPartition 和 InputChannel这一层引入了反压机制。
① 每一次 ResultPartition 向 InputGate 发送数据的时候,都会发送一个 backlog size 告诉下游准备发送多少消息,下游就会去计算有多少的 Buffer 去接收消息。(backlog 的作用是为了让消费端感知到我们生产端的情况)
② 如果下游有充足的 Buffer ,就会返还给上游 Credit (表示剩余 buffer 数量),告知发送消息(图上两个虚线是还是采用 Netty 和 Socket 进行通信)。
详细:Flink如何分析及处理反压? - 民宿 - 博客园 (cnblogs.com)
反压定位?
要解决反压首先要做的是定位到造成反压的节点,这主要有两种办法:
通过 Flink Web UI 自带的反压监控面板;
通过 Flink Task Metrics。
监控面板:
backpressure Tab页面:backpressure status 和backpressured/Idle/BUsy
idleTimeMsPerSecond
busyTimeMsPerSecond
backPressuredTimeMsPerSecond
Flink Task Metrics
解释:
① outPoolUsage 和 inPoolUsage 同为低表明当前 Subtask 是正常的,同为高分别表明当前 Subtask 被下游反压。
② 如果一个 Subtask 的 outPoolUsage 是高,通常是被下游 Task 所影响,所以可以排查它本身是反压根源的可能性。
③ 如果一个 Subtask 的 outPoolUsage 是低,但其 inPoolUsage 是高,则表明它有可能是反压的根源。因为通常反压会传导至其上游,导致上游某些 Subtask 的 outPoolUsage 为高。
注意:反压有时是短暂的且影响不大,比如来自某个 channel 的短暂网络延迟或者 TaskManager 的正常 GC,这种情况下可以不用处理。
① floatingBuffersUsage 为高则表明反压正在传导至上游。
② exclusiveBuffersUsage 则表明了反压可能存在倾斜。如果floatingBuffersUsage 高、exclusiveBuffersUsage 低,则存在倾斜。因为少数 channel 占用了大部分的 floating Buffer(channel 有自己的 exclusive buffer,当 exclusive buffer 消耗完,就会使用floating Buffer)。
反压常见原因和解决方案?
(1)数据倾斜
通过 Web UI 各个 SubTask 的 Records Sent 和 Record Received 来确认,另外 Checkpoint detail 里不同 SubTask 的 State size 也是一个分析数据倾斜的有用指标。解决方式把数据分组的 key 进行本地/预聚合来消除/减少数据倾斜。或者增加并发或者增加机器
(2)用户代码的执行效率
对 TaskManager 进行 CPU profile,分析 TaskThread 是否跑满一个 CPU 核:如果没有跑满,需要分析 CPU 主要花费在哪些函数里面,比如生产环境中偶尔会卡在 Regex 的用户函数(ReDoS);如果没有跑满,需要看 Task Thread 阻塞在哪里,可能是用户函数本身有些同步的调用,可能是 checkpoint 或者 GC 等系统活动。
(3)TaskManager 的内存以及 GC
TaskManager JVM 各区内存不合理导致的频繁 Full GC 甚至失联。可以加上 -XX:+PrintGCDetails 来打印 GC 日志的方式来观察 GC 的问题。推荐TaskManager 启用 G1 垃圾回收器来优化 GC。