1:讲讲flink checkpoint原理?对齐式和非对齐式checkpoint有什么区别?
checkpoint作用?
保证Flink集群在某个算子因为某些原因(如异常退出)出现故障时,能够将整个应用流图的状态恢复到故障之前的某一状态,保证应用流图状态的一致性。checkpoint是一种容错恢复机制
checkpoint保存的是什么数据?
当前检查点开始时数据源(例如Kafka)中消息的offset。
记录了所有有状态的operator当前的状态信息(例如sum中的数值)。
checkpoint的执行流程?
1:Checkpoint Coordinator 周期性的向所有 source 节点 trigger Checkpoint;
2:source 节点向下游广播 barrier,这个 barrier 就是实现 Chandy-Lamport 分布式快照算法的核心,下游的 task 只有收到所有 input 的 barrier 才会执行相应的 Checkpoint
3:当 task 完成 state 备份后,会将备份数据的地址(state handle)通知给 Checkpointcoordinator
4:下游的 sink 节点收集齐上游两个 input 的 barrier 之后,会执行本地快照,这里特地展示了 RocksDB incremental Checkpoint 的流程,首先 RocksDB 会全量刷数据到磁盘上(红色大三角表示),然后 Flink 框架会从中选择没有上传的文件进行持久化备份(紫色小三角)。同样的,sink 节点在完成自己的 Checkpoint 之后,会将 state handle 返回通知 Coordinator
5:最后,当 Checkpoint coordinator 收集齐所有 task 的 state handle,就认为这一次的 Checkpoint 全局完成了,向持久化存储中再备份一个 Checkpoint meta 文件
barrier对齐和非对齐的区别?
checkpoint底层逻辑是通过插入序号单调递增的barrier,把无界的数据流划分成逻辑上的数据段,并通过段落标记barrier 来为这些数据段,加持'事务’特性:
每一段数据流要么被完整成功处理;
要么回滚一切不完整的影响(状态变化)
最终需要保证:快照在各个算子间的状态必须统一(必须是经过了相同数据的影响之后的状态值)
barrier对齐与barrier不对齐区别在于:
不对齐:at least once 最少一次,消息不会丢失,但是可能会重复
对齐:exactly once 精确一次性,但是处理性能会降低。
2:讲讲watermark工作机制?
watermark的意义:
标识 Flink 任务的事件时间进度,从而能够推动事件时间窗口的触发、计算。
解决事件时间窗口的乱序问题。
watermark的触发时机:
1:watermark时间 >= window_end_time 即max(timestamp, currentMaxTimestamp....)-allowedLateness >= window_end_time
2:在[window_start_time,window_end_time)中有数据存在
乱序处理可归纳为:
窗口window 的作用是为了周期性的获取数据。
watermark的作用是防止数据出现乱序(经常),事件时间内获取不到指定的全部数据,而做的一种保险方法。
allowLateNess是将窗口关闭时间再延迟一段时间。
sideOutPut是最后兜底操作,所有过期延迟数据,指定窗口已经彻底关闭了,就会把数据放到侧输出流。
watermark的分类:
flink1.10以前有以下两种:
方式一:AssignerWithPeriodicWatermarks【1.10及以前的版本,建议使用这种方式】
周期性水位线
周期性的生成 watermark,默认周期是200ms,也可以通过setAutoWatermarkInterval设置周期时间
常用的实现类是:BoundedOutOfOrdernessTimestampExtractor(延时时间)
方式二:AssignerWithPunctuatedWatermarks:
标点水位线
阶段性的生成 watermark,即每来一条数据就生成一个wartermark。这种方式下,窗口的触发与时间无关,而是决定于何时收到标记事件。在实际的生产中Punctuated方式在TPS很高的场景下会产生大量的Watermark,在一定程度上对下游算子造成压力,所以只有在实时性要求非常高的场景才会选择Punctuated的方式进行Watermark的生成。
flink1.11.0及版本以上推荐使用:WatermarkStrategy
方式一:固定乱序长度策略(forBoundedOutOfOrderness)
通过调用WatermarkStrategy对象上的forBoundedOutOfOrderness方法来实现,接收一个Duration类型的参数作为最大乱序(out of order)长度。WatermarkStrategy对象上的withTimestampAssigner方法为从事件数据中提取时间戳提供了接口
一般使用这种策略
方式二:单调递增策略(forMonotonousTimestamps)
通过调用WatermarkStrategy对象上的forMonotonousTimestamps方法来实现,无需任何参数,相当于将forBoundedOutOfOrderness策略的最大乱序长度outOfOrdernessMillis设置为0。
方式三:不生成策略(noWatermarks)
WatermarkStrategy.noWatermarks()
注意点:
多并行度的条件下, 向下游传递WaterMark的时候, 总是以最小的那个WaterMark为准! 木桶原理!
数据源中的某一个分区/分片在一段时间内未发送事件数据,则意味着watermarkStrategy也不会获得任何数据去生成watermark,在这种情况下可以通过设置有一个空闲时间(withIdleness),当超过这个时间则将这个分片或分区标记为空闲状态。
watermark对齐参数:WatermarkStrategy .<Tuple2<Long, String>>forBoundedOutOfOrderness(Duration.ofSeconds(20)).withWatermarkAlignment("alignment-group-1", Duration.ofSeconds(20), Duration.ofSeconds(1));
3:flink双流join?
1:flink window join
join()
通俗理解,将两条实时流中元素分配到同一个时间窗口中完成Join。两条实时流数据缓存在Window State中,当窗口触发计算时,执行join操作。(窗口对齐才会触发)
支持Tumbling Window Join (滚动窗口),Sliding Window Join (滑动窗口),Session Widnow Join(会话窗口),支持处理时间和事件时间两种时间特征。
源码核心总结:windows窗口 + state存储 + 双层for循环执行join()
val env = ... // kafka 订单流 val orderStream = ... // kafka 订单明细流 val orderDetailStream = ... orderStream.join(orderDetailStream) .where(r => r._1) //订单id .equalTo(r => r._2) //订单id .window(TumblingProcessTimeWindows.of( Time.seconds(60))) .apply {(r1, r2) => r1 + " : " + r2} .print()
coGroup()
coGroup的作用和join基本相同,但有一点不一样的是,如果未能找到新到来的数据与另一个流在window中存在的匹配数据,仍会将其输出。
只有 inner join 肯定还不够,如何实现 left/right outer join 呢?答案就是利用 coGroup() 算子。
它的调用方式类似于 join() 算子,也需要开窗,但是 CoGroupFunction 比 JoinFunction 更加灵活,
可以按照用户指定的逻辑匹配左流和/或右流的数据并输出。以下的例子就实现了点击流 left join 订单流的功能,是很朴素的 nested loop join 思想(二重循环)。
clickRecordStream .coGroup(orderRecordStream) .where(record -> record.getMerchandiseId()) .equalTo(record -> record.getMerchandiseId()) .window(TumblingProcessingTimeWindows.of(Time.seconds(10))) .apply(new CoGroupFunction<AnalyticsAccessLogRecord, OrderDoneLogRecord, Tuple2<String, Long>>() { @Override public void coGroup(Iterable<AnalyticsAccessLogRecord> accessRecords, Iterable<OrderDoneLogRecord> orderRecords, Collector<Tuple2<String, Long>> collector) throws Exception { for (AnalyticsAccessLogRecord accessRecord : accessRecords) { boolean isMatched = false; for (OrderDoneLogRecord orderRecord : orderRecords) { // 右流中有对应的记录 collector.collect(new Tuple2<>(accessRecord.getMerchandiseName(), orderRecord.getPrice())); isMatched = true; } if (!isMatched) { // 右流中没有对应的记录 collector.collect(new Tuple2<>(accessRecord.getMerchandiseName(), null)); } } } }) .print().setParallelism(1);
2:Flink Interval Join
join() 和 coGroup() 都是基于窗口做关联的。但是在某些情况下,两条流的数据步调未必一致。例如,订单流的数据有可能在点击流的购买动作发生之后很久才被写入,如果用窗口来圈定,很容易 join 不上。所以 Flink 又提供了"Interval join"的语义,按照指定字段以及右流相对左流偏移的时间区间进行关联,即:
right.timestamp ∈ [left.timestamp + lowerBound; left.timestamp + upperBound]
interval join 也是 inner join,虽然不需要开窗,但是需要用户指定偏移区间的上下界,并且只支持事件时间。示例代码如下。注意在运行之前,需要分别在两个流上应用 assignTimestampsAndWatermarks() 方法获取事件时间戳和水印。interval join 与 window join 不同,是两个 KeyedStream 之上的操作,并且需要调用 between() 方法指定偏移区间的上下界。如果想令上下界是开区间,可以调用 upperBoundExclusive()/lowerBoundExclusive() 方法。
clickRecordStream .keyBy(record -> record.getMerchandiseId()) .intervalJoin(orderRecordStream.keyBy(record -> record.getMerchandiseId())) .between(Time.seconds(-30), Time.seconds(30)) .process(new ProcessJoinFunction<AnalyticsAccessLogRecord, OrderDoneLogRecord, String>() { @Override public void processElement(AnalyticsAccessLogRecord accessRecord, OrderDoneLogRecord orderRecord, Context context, Collector<String> collector) throws Exception { collector.collect(StringUtils.join(Arrays.asList( accessRecord.getMerchandiseId(), orderRecord.getPrice(), orderRecord.getCouponMoney(), orderRecord.getRebateAmount() ), '\t')); } }) .print().setParallelism(1);
3:Flinksql Regular Join
Regular Join 是最为基础的没有缓存剔除策略的 Join。Regular Join 中两个表的输入和新都会对全局可见,影响之后所有的 Join 结果。举例,在一个如下的 Join 查询里,
Orders 表的新纪录会和 Product 表所有历史纪录以及未来的纪录进行匹配。-号代表回撤,+号代表最新数据
SELECT * FROM OrdersINNER JOIN ProductON Orders.productId = Product.id
因为历史数据不会被清理,所以 Regular Join 允许对输入表进行任意种类的更新操作(insert、update、delete)。然而因为资源问题 Regular Join 通常是不可持续的,一般只用做有界数据流的 Join。
4:flinksql Time-Windowed Join
Time-Windowed Join 利用窗口给两个输入表设定一个 Join 的时间界限,超出时间范围的数据则对 JOIN 不可见并可以被清理掉。值得注意的是,这里涉及到的一个问题是时间的语义,时间可以指计算发生的系统时间(即 Processing Time),也可以指从数据本身的时间字段提取的 Event Time。如果是 Processing Time,Flink 根据系统时间自动划分 Join 的时间窗口并定时清理数据;如果是 Event Time,Flink 分配 Event Time 窗口并依据 Watermark 来清理数据。
以更常用的 Event Time Windowed Join 为例,一个将 Orders 订单表和 Shipments 运输单表依据订单时间和运输时间 Join 的查询如下:
SELECT * FROM Orders o, Shipments s WHERE o.id = s.orderId AND s.shiptime BETWEEN o.ordertime AND o.ordertime + INTERVAL '4' HOUR
5:flinksql 时态表
虽然 Timed-Windowed Join 解决了资源问题,但也限制了使用场景: Join 两个输入流都必须有时间下界,超过之后则不可访问。这对于很多 Join 维表的业务来说是不适用的,因为很多情况下维表并没有时间界限。针对这个问题,Flink 提供了 Temporal Table Join 来满足用户需求。
Temporal Table Join 类似于 Hash Join,将输入分为 Build Table 和 Probe Table。前者一般是纬度表的 changelog,后者一般是业务数据流,典型情况下后者的数据量应该远大于前者。在 Temporal Table Join 中,Build Table 是一个基于 append-only 数据流的带时间版本的视图,所以又称为 Temporal Table。Temporal Table 要求定义一个主键和用于版本化的字段(通常就是 Event Time 时间字段),以反映记录在不同时间的内容。
时态表可以划分成一系列带版本的表快照集合,表快照中的版本代表了快照中所有记录的有效区间,有效区间的开始时间和结束时间可以通过用户指定,根据时态表是否可以追踪自身的历史版本与否,时态表可以分为 版本表 和 普通表。
SELECT * FROM RatesHistory; currency_time currency rate ============= ========= ==== 09:00:00 US Dollar 102 09:00:00 Euro 114 09:00:00 Yen 1 10:45:00 Euro 116 11:15:00 Euro 119 11:49:00 Pounds 108 -- 视图中的去重 query 会被 Flink 优化并高效地产出 changelog stream, 产出的 changelog 保留了主键约束和事件时间。 CREATE VIEW versioned_rates AS SELECT currency, rate, currency_time -- (1) `currency_time` 保留了事件时间 FROM ( SELECT *, ROW_NUMBER() OVER (PARTITION BY currency -- (2) `currency` 是去重 query 的 unique key,可以作为主键 ORDER BY currency_time DESC) AS rowNum FROM RatesHistory ) WHERE rowNum = 1; -- 视图 `versioned_rates` 将会产出如下的 changelog: (changelog kind) currency_time currency rate ================ ============= ========= ==== +(INSERT) 09:00:00 US Dollar 102 +(INSERT) 09:00:00 Euro 114 +(INSERT) 09:00:00 Yen 1 +(UPDATE_AFTER) 10:45:00 Euro 116 +(UPDATE_AFTER) 11:15:00 Euro 119 +(INSERT) 11:49:00 Pounds 108
比如典型的一个例子是对商业订单金额进行汇率转换。假设有一个 Orders 流记录订单金额,需要和 RatesHistory 汇率流进行 Join。RatesHistory 代表不同货币转为日元的汇率,每当汇率有变化时就会有一条更新记录。两个表在某一时间节点内容如下:
SELECT o.amount * r.rate FROM Orders o, LATERAL Table(Rates(o.time)) r WHERE o.currency = r.currency
值得注意的是,不同于在 Regular Join 和 Time-Windowed Join 中两个表是平等的,任意一个表的新记录都可以与另一表的历史记录进行匹配,在 Temporal Table Join 中,Temoparal Table 的更新对另一表在该时间节点以前的记录是不可见的。这意味着我们只需要保存 Build Side 的记录直到 Watermark 超过记录的版本字段。因为 Probe Side 的输入理论上不会再有早于 Watermark 的记录,这些版本的数据可以安全地被清理掉。
时态表注意事项:
Temporal Table 可提供历史某个时间点上的数据。
Temporal Table 根据时间来跟踪版本。
Temporal Table 需要提供时间属性和主键。
Temporal Table 一般和关键词 LATERAL TABLE 结合使用。
Temporal Table 在基于 ProcessingTime 时间属性处理时,每个主键只保存最新版本的数据。
Temporal Table 在基于 EventTime 时间属性处理时,每个主键保存从上个 Watermark 到当前系统时间的所有版本。
Append-Only 表 Join 右侧 Temporal Table ,本质上还是左表驱动 Join ,即从左表拿到 Key ,根据 Key 和时间(可能是历史时间)去右侧 Temporal Table 表中查询。
Temporal Table Join 目前只支持 Inner Join。
Temporal Table Join 时,右侧 Temporal Table 表返回最新一个版本的数据。
4:讲讲flink状态后端,怎么选择,各有什么优缺点?
flink1.12之前:
MemoryStateBackend(默认使用)
该持久化存储主要将快照数据保存到 JobManager 的内存中,仅适合作为测试以 及快照的数据量非常小时使用,并不推荐用作大规模商业部署
局限性:
每个独立的状态(state)默认限制大小为5MB, 可以通过构造函数增加容量;
状态的大小不能超过akka的framesize大小。参考:配置 ;
聚合状态(aggregate state )必须放入JobManager的内存
适用场景:
本地开发和调试。状态很少的作业,例如仅包含一次记录功能的作业(Map,FlatMap。Filter,...), kafka 的消费者需要很少的状态
FsStateBackend
基于文件系统进行存储, 需要注意而是虽然选择使用了 FsStateBackend ,但正在进行的数据仍然是存储在 TaskManager 的内存中的,只有在 checkpoint 时,才会将状态快照写入到指定文件系统上。
局限性:
由于带备份的状态先会存在 TaskManager 中,故状态的大小不能超过 TaskManager 的内存,以免发生OOM。
适用场景:
FsStateBackend 适用于处理大状态,长窗口,或大键值状态的有状态处理任务。
FsStateBackend 比较适合用于高可用方案。
可以在生产环境中使用。
RocksDBState-Backend
RocksDBStateBackend将工作状态保存在RocksDB数据库(RocksDB 是一个基于 LSM 实现的 KV 数据库,所以个人理解State数据部分存储在内存中,一部分存储在磁盘文件上)。
通过checkpoint, 整个RocksDB数据库被复制到配置的文件系统中。最小元数据保存jobManager的内存中。RocksDBStateBackend可以通过enableIncrementalCheckpointing参数配置是否进行增量Checkpoint(而MemoryStateBackend 和 FsStateBackend不能)。
跟FsStateBackend 不同的是,RocksDBStateBackend仅支持异步快照(asynchronous snapshots)。
局限性:
由于RocksDB的JNI API 是基于byte[] 的, 故 RocksDB 支持的单 key 和单 value 的大小不能超过 2^31 字节。
对于使用具有合并操作的状态的应用程序,例如 ListState,随着时间可能会累积到超过 2^31 字节大小,这将会导致在接下来的查询中失败。
当使用 RocksDB 时,状态大小只受限于TaskManager的磁盘可用空间的大小。这也使得 RocksDBStateBackend 成为管理超大状态的最佳选择。使用RocksDB 的权衡点在于所有的状态相关的操作都需要序列化(或反序列化)才能跨越 JNI 边界。与上面提到的堆上后端相比,这可能会影响应用程序的吞吐量。
适用场景:
RocksDBStateBackend 最适合用于处理大状态,长窗口,或大键值状态的有状态处理任务。
RocksDBStateBackend 非常适合用于高可用(HA)方案。
RocksDBStateBackend 是目前唯一支持增量 checkpoint 的后端。增量 checkpoint 非常适用于超大状态的场景。
最好是对状态读写性能要求不高的作业。
flink 1.12之后
HashMapStateBackend是内存计算,读写速度非常快;
可以支持写入Memory和文件系统。
JobManagerCheckpointStorage() : jobmanager内存
JobManagerCheckpointStorage("file://path"): jobmanager本地文件
FileSystemCheckpointStorage(): filesystem中(oss、hdfs等)
局限性:
随着时间不停地增长,会耗尽内存资源
EmbeddedRocksDBStateBackend(同RocksDBState-Backend)