flink面试问题总结(1)

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: flink面试问题总结(1)

1:讲讲flink checkpoint原理?对齐式和非对齐式checkpoint有什么区别?

checkpoint作用?

保证Flink集群在某个算子因为某些原因(如异常退出)出现故障时,能够将整个应用流图的状态恢复到故障之前的某一状态,保证应用流图状态的一致性。checkpoint是一种容错恢复机制

checkpoint保存的是什么数据?

当前检查点开始时数据源(例如Kafka)中消息的offset。

记录了所有有状态的operator当前的状态信息(例如sum中的数值)。

checkpoint的执行流程?
1:Checkpoint Coordinator 周期性的向所有 source 节点 trigger Checkpoint
2:source 节点向下游广播 barrier,这个 barrier 就是实现 Chandy-Lamport 分布式快照算法的核心,下游的 task 只有收到所有 input 的 barrier 才会执行相应的 Checkpoint
3:当 task 完成 state 备份后,会将备份数据的地址(state handle)通知给 Checkpointcoordinator
4:下游的 sink 节点收集齐上游两个 input 的 barrier 之后,会执行本地快照,这里特地展示了 RocksDB incremental Checkpoint 的流程,首先 RocksDB 会全量刷数据到磁盘上(红色大三角表示),然后 Flink 框架会从中选择没有上传的文件进行持久化备份(紫色小三角)同样的,sink 节点在完成自己的 Checkpoint 之后,会将 state handle 返回通知 Coordinator
5:最后,当 Checkpoint coordinator 收集齐所有 task 的 state handle,就认为这一次的 Checkpoint 全局完成了,向持久化存储中再备份一个 Checkpoint meta 文件

barrier对齐和非对齐的区别?

checkpoint底层逻辑是通过插入序号单调递增的barrier,把无界的数据流划分成逻辑上的数据段,并通过段落标记barrier 来为这些数据段,加持'事务’特性:

每一段数据流要么被完整成功处理;

要么回滚一切不完整的影响(状态变化)

最终需要保证:快照在各个算子间的状态必须统一(必须是经过了相同数据的影响之后的状态值


barrier对齐与barrier不对齐区别在于:

   不对齐:at least once 最少一次,消息不会丢失,但是可能会重复

   对齐:exactly once 精确一次性,但是处理性能会降低。

2:讲讲watermark工作机制?

watermark的意义:

标识 Flink 任务的事件时间进度,从而能够推动事件时间窗口的触发、计算。

解决事件时间窗口的乱序问题。

watermark的触发时机:

1:watermark时间 >= window_end_time 即max(timestamp, currentMaxTimestamp....)-allowedLateness >= window_end_time

2:在[window_start_time,window_end_time)中有数据存在

乱序处理可归纳为:

窗口window 的作用是为了周期性的获取数据。

watermark的作用是防止数据出现乱序(经常),事件时间内获取不到指定的全部数据,而做的一种保险方法。

allowLateNess是将窗口关闭时间再延迟一段时间。

sideOutPut是最后兜底操作,所有过期延迟数据,指定窗口已经彻底关闭了,就会把数据放到侧输出流。

watermark的分类:

flink1.10以前有以下两种:
方式一:AssignerWithPeriodicWatermarks【1.10及以前的版本,建议使用这种方式】

周期性水位线

周期性的生成 watermark,默认周期是200ms,也可以通过setAutoWatermarkInterval设置周期时间

常用的实现类是:BoundedOutOfOrdernessTimestampExtractor(延时时间)

方式二:AssignerWithPunctuatedWatermarks:

标点水位线

阶段性的生成 watermark,即每来一条数据就生成一个wartermark。这种方式下,窗口的触发与时间无关,而是决定于何时收到标记事件。在实际的生产中Punctuated方式在TPS很高的场景下会产生大量的Watermark,在一定程度上对下游算子造成压力,所以只有在实时性要求非常高的场景才会选择Punctuated的方式进行Watermark的生成。


flink1.11.0及版本以上推荐使用:WatermarkStrategy

方式一:固定乱序长度策略(forBoundedOutOfOrderness)

通过调用WatermarkStrategy对象上的forBoundedOutOfOrderness方法来实现,接收一个Duration类型的参数作为最大乱序(out of order)长度。WatermarkStrategy对象上的withTimestampAssigner方法为从事件数据中提取时间戳提供了接口

一般使用这种策略

方式二:单调递增策略(forMonotonousTimestamps)

通过调用WatermarkStrategy对象上的forMonotonousTimestamps方法来实现,无需任何参数,相当于将forBoundedOutOfOrderness策略的最大乱序长度outOfOrdernessMillis设置为0。

方式三:不生成策略(noWatermarks)

WatermarkStrategy.noWatermarks()


注意点:

多并行度的条件下, 向下游传递WaterMark的时候, 总是以最小的那个WaterMark为准! 木桶原理!

数据源中的某一个分区/分片在一段时间内未发送事件数据,则意味着watermarkStrategy也不会获得任何数据去生成watermark,在这种情况下可以通过设置有一个空闲时间(withIdleness),当超过这个时间则将这个分片或分区标记为空闲状态。

watermark对齐参数:WatermarkStrategy .<Tuple2<Long, String>>forBoundedOutOfOrderness(Duration.ofSeconds(20)).withWatermarkAlignment("alignment-group-1", Duration.ofSeconds(20), Duration.ofSeconds(1));

3:flink双流join?

1:flink window join

join()

通俗理解,将两条实时流中元素分配到同一个时间窗口中完成Join。两条实时流数据缓存在Window State中,当窗口触发计算时,执行join操作。(窗口对齐才会触发)

支持Tumbling Window Join (滚动窗口),Sliding Window Join (滑动窗口),Session Widnow Join(会话窗口),支持处理时间和事件时间两种时间特征。

源码核心总结:windows窗口 + state存储 + 双层for循环执行join()

val env = ...
// kafka 订单流
val orderStream = ... 
// kafka 订单明细流
val orderDetailStream = ...
orderStream.join(orderDetailStream)
    .where(r => r._1)  //订单id
    .equalTo(r => r._2) //订单id
    .window(TumblingProcessTimeWindows.of(
          Time.seconds(60)))
    .apply {(r1, r2) => r1 + " : " + r2}
    .print()

coGroup()

coGroup的作用和join基本相同,但有一点不一样的是,如果未能找到新到来的数据与另一个流在window中存在的匹配数据,仍会将其输出。

只有 inner join 肯定还不够,如何实现 left/right outer join 呢?答案就是利用 coGroup() 算子。

它的调用方式类似于 join() 算子,也需要开窗,但是 CoGroupFunction 比 JoinFunction 更加灵活,

可以按照用户指定的逻辑匹配左流和/或右流的数据并输出。以下的例子就实现了点击流 left join 订单流的功能,是很朴素的 nested loop join 思想(二重循环)。

clickRecordStream
  .coGroup(orderRecordStream)
  .where(record -> record.getMerchandiseId())
  .equalTo(record -> record.getMerchandiseId())
  .window(TumblingProcessingTimeWindows.of(Time.seconds(10)))
  .apply(new CoGroupFunction<AnalyticsAccessLogRecord, OrderDoneLogRecord, Tuple2<String, Long>>() {
    @Override
    public void coGroup(Iterable<AnalyticsAccessLogRecord> accessRecords, Iterable<OrderDoneLogRecord> orderRecords, Collector<Tuple2<String, Long>> collector) throws Exception {
      for (AnalyticsAccessLogRecord accessRecord : accessRecords) {
        boolean isMatched = false;
        for (OrderDoneLogRecord orderRecord : orderRecords) {
          // 右流中有对应的记录
          collector.collect(new Tuple2<>(accessRecord.getMerchandiseName(), orderRecord.getPrice()));
          isMatched = true;
        }
        if (!isMatched) {
          // 右流中没有对应的记录
          collector.collect(new Tuple2<>(accessRecord.getMerchandiseName(), null));
        }
      }
    }
  })
  .print().setParallelism(1);

2:Flink Interval Join

join() 和 coGroup() 都是基于窗口做关联的。但是在某些情况下,两条流的数据步调未必一致。例如,订单流的数据有可能在点击流的购买动作发生之后很久才被写入,如果用窗口来圈定,很容易 join 不上。所以 Flink 又提供了"Interval join"的语义,按照指定字段以及右流相对左流偏移的时间区间进行关联,即:

right.timestamp ∈ [left.timestamp + lowerBound; left.timestamp + upperBound]

interval join 也是 inner join,虽然不需要开窗,但是需要用户指定偏移区间的上下界,并且只支持事件时间。示例代码如下。注意在运行之前,需要分别在两个流上应用 assignTimestampsAndWatermarks() 方法获取事件时间戳和水印。interval join 与 window join 不同,是两个 KeyedStream 之上的操作,并且需要调用 between() 方法指定偏移区间的上下界。如果想令上下界是开区间,可以调用 upperBoundExclusive()/lowerBoundExclusive() 方法

clickRecordStream
  .keyBy(record -> record.getMerchandiseId())
  .intervalJoin(orderRecordStream.keyBy(record -> record.getMerchandiseId()))
  .between(Time.seconds(-30), Time.seconds(30))
  .process(new ProcessJoinFunction<AnalyticsAccessLogRecord, OrderDoneLogRecord, String>() {
    @Override
    public void processElement(AnalyticsAccessLogRecord accessRecord, OrderDoneLogRecord orderRecord, Context context, Collector<String> collector) throws Exception {
      collector.collect(StringUtils.join(Arrays.asList(
        accessRecord.getMerchandiseId(),
        orderRecord.getPrice(),
        orderRecord.getCouponMoney(),
        orderRecord.getRebateAmount()
      ), '\t'));
    }
  })
  .print().setParallelism(1);

3:Flinksql Regular Join

Regular Join 是最为基础的没有缓存剔除策略的 Join。Regular Join 中两个表的输入和新都会对全局可见,影响之后所有的 Join 结果。举例,在一个如下的 Join 查询里,

Orders 表的新纪录会和 Product 表所有历史纪录以及未来的纪录进行匹配。-号代表回撤,+号代表最新数据

SELECT * FROM OrdersINNER JOIN ProductON Orders.productId = Product.id

因为历史数据不会被清理,所以 Regular Join 允许对输入表进行任意种类的更新操作(insert、update、delete)。然而因为资源问题 Regular Join 通常是不可持续的,一般只用做有界数据流的 Join。

4:flinksql Time-Windowed Join

Time-Windowed Join 利用窗口给两个输入表设定一个 Join 的时间界限,超出时间范围的数据则对 JOIN 不可见并可以被清理掉。值得注意的是,这里涉及到的一个问题是时间的语义,时间可以指计算发生的系统时间(即 Processing Time),也可以指从数据本身的时间字段提取的 Event Time。如果是 Processing Time,Flink 根据系统时间自动划分 Join 的时间窗口并定时清理数据;如果是 Event Time,Flink 分配 Event Time 窗口并依据 Watermark 来清理数据。

以更常用的 Event Time Windowed Join 为例,一个将 Orders 订单表和 Shipments 运输单表依据订单时间和运输时间 Join 的查询如下:

SELECT *
FROM 
  Orders o, 
  Shipments s
WHERE 
  o.id = s.orderId AND
  s.shiptime BETWEEN o.ordertime AND o.ordertime + INTERVAL '4' HOUR

5:flinksql 时态表

虽然 Timed-Windowed Join 解决了资源问题,但也限制了使用场景: Join 两个输入流都必须有时间下界,超过之后则不可访问。这对于很多 Join 维表的业务来说是不适用的,因为很多情况下维表并没有时间界限。针对这个问题,Flink 提供了 Temporal Table Join 来满足用户需求。

Temporal Table Join 类似于 Hash Join,将输入分为 Build Table 和 Probe Table。前者一般是纬度表的 changelog,后者一般是业务数据流,典型情况下后者的数据量应该远大于前者。在 Temporal Table Join 中,Build Table 是一个基于 append-only 数据流的带时间版本的视图,所以又称为 Temporal Table。Temporal Table 要求定义一个主键和用于版本化的字段(通常就是 Event Time 时间字段),以反映记录在不同时间的内容。


时态表可以划分成一系列带版本的表快照集合,表快照中的版本代表了快照中所有记录的有效区间,有效区间的开始时间和结束时间可以通过用户指定,根据时态表是否可以追踪自身的历史版本与否,时态表可以分为 版本表 和 普通表。

SELECT * FROM RatesHistory; 
currency_time currency  rate
============= ========= ====
09:00:00      US Dollar 102
09:00:00      Euro      114
09:00:00      Yen       1
10:45:00      Euro      116
11:15:00      Euro      119
11:49:00      Pounds    108
-- 视图中的去重 query 会被 Flink 优化并高效地产出 changelog stream, 产出的 changelog 保留了主键约束和事件时间。
CREATE VIEW versioned_rates AS              
SELECT currency, rate, currency_time            -- (1) `currency_time` 保留了事件时间
  FROM (
      SELECT *,
      ROW_NUMBER() OVER (PARTITION BY currency  -- (2) `currency` 是去重 query 的 unique key,可以作为主键
         ORDER BY currency_time DESC) AS rowNum 
      FROM RatesHistory )
WHERE rowNum = 1; 
-- 视图 `versioned_rates` 将会产出如下的 changelog: 
(changelog kind) currency_time currency   rate
================ ============= =========  ====
+(INSERT)        09:00:00      US Dollar  102
+(INSERT)        09:00:00      Euro       114
+(INSERT)        09:00:00      Yen        1
+(UPDATE_AFTER)  10:45:00      Euro       116
+(UPDATE_AFTER)  11:15:00      Euro       119
+(INSERT)        11:49:00      Pounds     108

比如典型的一个例子是对商业订单金额进行汇率转换。假设有一个 Orders 流记录订单金额,需要和 RatesHistory 汇率流进行 Join。RatesHistory 代表不同货币转为日元的汇率,每当汇率有变化时就会有一条更新记录。两个表在某一时间节点内容如下:

SELECT 
  o.amount * r.rate
FROM
  Orders o,
  LATERAL Table(Rates(o.time)) r
WHERE
  o.currency = r.currency

值得注意的是,不同于在 Regular Join 和 Time-Windowed Join 中两个表是平等的,任意一个表的新记录都可以与另一表的历史记录进行匹配,在 Temporal Table Join 中,Temoparal Table 的更新对另一表在该时间节点以前的记录是不可见的。这意味着我们只需要保存 Build Side 的记录直到 Watermark 超过记录的版本字段。因为 Probe Side 的输入理论上不会再有早于 Watermark 的记录,这些版本的数据可以安全地被清理掉。


时态表注意事项:

Temporal Table 可提供历史某个时间点上的数据。

Temporal Table 根据时间来跟踪版本。

Temporal Table 需要提供时间属性和主键。

Temporal Table 一般和关键词 LATERAL TABLE 结合使用。

Temporal Table 在基于 ProcessingTime 时间属性处理时,每个主键只保存最新版本的数据

Temporal Table 在基于 EventTime 时间属性处理时,每个主键保存从上个 Watermark 到当前系统时间的所有版本

Append-Only 表 Join 右侧 Temporal Table ,本质上还是左表驱动 Join ,即从左表拿到 Key ,根据 Key 和时间(可能是历史时间)去右侧 Temporal Table 表中查询。

Temporal Table Join 目前只支持 Inner Join。

Temporal Table Join 时,右侧 Temporal Table 表返回最新一个版本的数据。

4:讲讲flink状态后端,怎么选择,各有什么优缺点?

flink1.12之前:

MemoryStateBackend(默认使用)      

该持久化存储主要将快照数据保存到 JobManager 的内存中仅适合作为测试以 及快照的数据量非常小时使用,并不推荐用作大规模商业部署

局限性:

每个独立的状态(state)默认限制大小为5MB, 可以通过构造函数增加容量;

状态的大小不能超过akka的framesize大小。参考:配置

聚合状态(aggregate state )必须放入JobManager的内存


适用场景:

本地开发和调试。状态很少的作业,例如仅包含一次记录功能的作业(Map,FlatMap。Filter,...), kafka 的消费者需要很少的状态


FsStateBackend

基于文件系统进行存储, 需要注意而是虽然选择使用了 FsStateBackend ,但正在进行的数据仍然是存储在 TaskManager 的内存中的,只有在 checkpoint 时,才会将状态快照写入到指定文件系统上。

局限性:

由于带备份的状态先会存在 TaskManager 中,故状态的大小不能超过 TaskManager 的内存,以免发生OOM。

适用场景:

FsStateBackend 适用于处理大状态,长窗口,或大键值状态的有状态处理任务。

FsStateBackend 比较适合用于高可用方案

可以在生产环境中使用。


RocksDBState-Backend

RocksDBStateBackend将工作状态保存在RocksDB数据库(RocksDB 是一个基于 LSM 实现的 KV 数据库,所以个人理解State数据部分存储在内存中,一部分存储在磁盘文件上)。

   通过checkpoint, 整个RocksDB数据库被复制到配置的文件系统中。最小元数据保存jobManager的内存中。RocksDBStateBackend可以通过enableIncrementalCheckpointing参数配置是否进行增量Checkpoint(而MemoryStateBackend 和 FsStateBackend不能)。

   跟FsStateBackend 不同的是,RocksDBStateBackend仅支持异步快照(asynchronous snapshots)。

局限性:

由于RocksDB的JNI API 是基于byte[] 的, 故 RocksDB 支持的单 key 和单 value 的大小不能超过 2^31 字节。

对于使用具有合并操作的状态的应用程序,例如 ListState,随着时间可能会累积到超过 2^31 字节大小,这将会导致在接下来的查询中失败。

当使用 RocksDB 时,状态大小只受限于TaskManager的磁盘可用空间的大小。这也使得 RocksDBStateBackend 成为管理超大状态的最佳选择。使用RocksDB 的权衡点在于所有的状态相关的操作都需要序列化(或反序列化)才能跨越 JNI 边界。与上面提到的堆上后端相比,这可能会影响应用程序的吞吐量。





适用场景:

RocksDBStateBackend 最适合用于处理大状态,长窗口,或大键值状态的有状态处理任务。

RocksDBStateBackend 非常适合用于高可用(HA)方案

RocksDBStateBackend 是目前唯一支持增量 checkpoint 的后端增量 checkpoint 非常适用于超大状态的场景。

最好是对状态读写性能要求不高的作业。



flink 1.12之后

HashMapStateBackend内存计算,读写速度非常快

可以支持写入Memory和文件系统

JobManagerCheckpointStorage() : jobmanager内存

JobManagerCheckpointStorage("file://path"): jobmanager本地文件

FileSystemCheckpointStorage(): filesystem中(oss、hdfs等)


局限性:

随着时间不停地增长,会耗尽内存资源


EmbeddedRocksDBStateBackend(同RocksDBState-Backend



相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
相关文章
|
18天前
|
消息中间件 API 数据处理
Flink常见面试问题(附答案)
Apache Flink是开源的流批处理框架,提供低延迟、高吞吐的数据处理。与Hadoop不同,Flink专注于实时数据流。其核心特性包括事件时间和处理时间的概念,事件时间通过水印处理乱序事件。Flink通过检查点实现容错,支持滚动、滑动和会话窗口进行流数据处理。状态后端用于管理应用程序状态,水印用于处理延迟数据。Flink与Kafka集成能保证事件顺序,支持多种连接器如Kafka、JDBC等。其处理延迟数据、乱序事件的能力,以及Exactly-Once语义,使其在大规模数据处理中具有优势。Flink还支持表格API和DataStream API,以及多种容错和性能优化策略。
115 1
Flink常见面试问题(附答案)
|
14天前
|
机器学习/深度学习 分布式计算 BI
Flink实时流处理框架原理与应用:面试经验与必备知识点解析
【4月更文挑战第9天】本文详尽探讨了Flink实时流处理框架的原理,包括运行时架构、数据流模型、状态管理和容错机制、资源调度与优化以及与外部系统的集成。此外,还介绍了Flink在实时数据管道、分析、数仓与BI、机器学习等领域的应用实践。同时,文章提供了面试经验与常见问题解析,如Flink与其他系统的对比、实际项目挑战及解决方案,并展望了Flink的未来发展趋势。附带Java DataStream API代码样例,为学习和面试准备提供了实用素材。
37 0
|
4月前
|
分布式计算 Java API
大数据Flink面试考题___Flink高频考点,万字超全整理(建议)
大数据Flink面试考题___Flink高频考点,万字超全整理(建议)
116 0
|
负载均衡 Linux 应用服务中间件
Linux下各种锁地理解和使用以及总结解决一下epoll惊群问题(面试常考)
Linux下各种锁地理解和使用以及总结解决一下epoll惊群问题(面试常考)
Linux下各种锁地理解和使用以及总结解决一下epoll惊群问题(面试常考)
|
缓存 算法 Java
PHP面试总结
PHP面试总结
112 0
|
存储 资源调度 分布式计算
Flink灵魂17问,最新面试题
Flink灵魂17问,最新面试题
211 0
|
SQL 算法 JavaScript
西安软件园面试总结(一)
西安软件园面试总结(一)
西安软件园面试总结(一)
|
2月前
|
消息中间件 Kafka Apache
Apache Flink 是一个开源的分布式流处理框架
Apache Flink 是一个开源的分布式流处理框架
482 5
|
1月前
|
SQL Java API
官宣|Apache Flink 1.19 发布公告
Apache Flink PMC(项目管理委员)很高兴地宣布发布 Apache Flink 1.19.0。
1353 1
官宣|Apache Flink 1.19 发布公告
|
1月前
|
SQL Apache 流计算
Apache Flink官方网站提供了关于如何使用Docker进行Flink CDC测试的文档
【2月更文挑战第25天】Apache Flink官方网站提供了关于如何使用Docker进行Flink CDC测试的文档
143 3