50道大数据精选面试题

本文涉及的产品
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
云数据库 RDS MySQL,集群版 2核4GB 100GB
推荐场景:
搭建个人博客
云原生内存数据库 Tair,内存型 2GB
简介: 50道大数据精选面试题

hive面试题
kafka面试题
spark面试题
flink面试题
离线数仓面试题

以下是一些常见面试题,不包含数据倾斜,小文件,内存模型与资源分配、性能优化等问题,这些会单独有几篇文章讲解.

hive面试问题?

面试题1:hive中四个by的区别?

1)Sort By:分区内有序;
不是全局排序,其在数据进入reducer前完成排序,也就是说它会在数据进入reduce之前为每个reducer都产生一个排序后的文件。因此,如果用sort by进行排序,并且设置mapreduce.job.reduces>1,则sort by只保证每个reducer的输出有序,不保证全局有序。
2)Order By:全局排序,只有一个 Reducer;
order by会对输入做全局排序,因此只有一个Reducer(多个Reducer无法保证全局有序),然而只有一个reducer,会导致当输入规模较大时,消耗较长的计算时间
3)Distrbute By:类似 MR 中 Partition,进行分区,结合 sort by 使用。
distribute by是控制在map端如何拆分数据给reduce端的。类似于MapReduce中分区partationer对数据进行分区
hive会根据distribute by后面列,将数据分发给对应的reducer,默认是采用hash算法+取余数的方式。
4)Cluster By:等同于Distribute by + Sort by,只能按默认升序排序
当 Distribute by 和 Sorts by 字段相同时,可以使用 Cluster by 方式。Cluster
by 除了具有 Distribute by 的功能外还兼具 Sort by 的功能。但是排序只能是升序排序,不能
指定排序规则为 ASC 或者 DESC

面试题2:hive中静态分区和动态分区的区别?

静态分区与动态分区的主要区别在于静态分区是手动指定,而动态分区是通过数据来进行判断。 详细来说,静态分区的列实在编译时期,通过用户传递来决定的;动态分区只有在SQL执行时才能决定。 静态分区不管有没有数据都将会创建该分区,动态分区是有结果集将创建,否则不创建。

静态分区 SP(static partition)
  1、静态分区是在编译期间指定的指定分区名
  2、支持load和insert两种插入方式
    2.1load方式
      1)会将分区字段的值全部修改为指定的内容
      2)一般是确定该分区内容是一致的时候才会使用
    2.2insert方式
      1)必须先将数据放在一个没有设置分区的普通表中
      2)该方式可以在一个分区内存储一个范围的内容
      3)从普通表中选出的字段不能包含分区字段
  3、适用于分区数少,分区名可以明确的数据
动态分区 DP(dynamic partition)
  1、根据分区字段的实际值,动态进行分区
  2、是在sql执行的时候进行分区
  3、需要先将动态分区设置打开(set hive.exec.dynamic.partition.mode=nonstrict )
  4、只能用insert方式

面试题3:hive内部表与外部表的区别?使用场景分别是?

区别:
内部表数据由Hive自身管理,外部表数据由HDFS管理;
内部表数据存储的位置是hive.metastore.warehouse.dir。hive自身管理,外部表数据由HDFS管理;
删除内部表会直接删除元数据(metadata)及存储数据;删除外部表仅仅会删除元数据,HDFS上的文件并不会被删除;
对内部表的修改会将修改直接同步给元数据,而对外部表的表结构和分区进行修改,则需要修复。
使用场景:
外部表:相对来说更加安全些,数据组织也更加灵活,方便共享源数据。如果数据的处理由hive和其他工具一起处理,则创建外部表。
内部表:如果所有的数据都由hive处理,则创建内部表。

kafka面试题

面试题4:kafka如何保证数据不丢失?

##broker端:
Topic 副本因子个数:replication.factor >= 3
同步副本列表(ISR):min.insync.replicas = 2
禁用unclean选举:unclean.leader.election.enable=false
##producer端:
#同步方式:
producer.type=sync
request.required.acks=1
副本数量>=2
增加重试次数
#异步方式
producer.type=async 
request.required.acks=1 
queue.buffering.max.ms=5000 
queue.buffering.max.messages=10000 
queue.enqueue.timeout.ms = -1 
batch.num.messages=200
queue.buffering.max.ms=5000
通过buffer来进行控制数据的发送,有两个值来进行控制,缓冲时间阈值与缓冲消息的数量阈值,
如果buffer满了数据还没有发送出去,如果设置的是立即清理模式,风险很大,一定要设置为阻塞模式
##consumner端:
1:关闭自动 offset,手动提交offset
设置 enable.auto.commit = false , 默认值true,自动提交
使用kafka的Consumer的类,用方法consumer.commitSync()提交
或者使用spring-kafka的 Acknowledgment类,用方法ack.acknowledge()提交(推荐使用)
2:另一个方法同样需要手动commit offset,另外在consumer端再将所有fetch到的数据缓存到queue里,
当把queue里所有的数据处理完之后,再批量提交offset,这样就能保证只有处理完的数据才被commit。

面试题5:kafka如何保证数据exactly-once

1) Producer exactly-once
enable.idempotence=true  
分区副本数>= 2
isr >=2
ProducerID+SequenceNumber+Ack=-1(幂等性)
2)Consumer exactly-once
手动维护并提交偏移量。
1:设置enable.auto.commit=false,关闭自动提交偏移量
2:借助外部数据库,如redis的pipeline,mysql的事务机制管理存储偏移量
再同一事物中,在消息被处理完之后在提交偏移量。并更新偏移量。
否则消息需回滚,并获取到上一次偏移量的位置从新进行处理。

面试题6:kafka数据积压怎么解决?

1、增加broker节点,增加分区数量,提高并行度
2、修改单个消费为批量消费
3、增加单线程消费为线程池异步消费
4、缩短批次时间间隔;
5、老版本SparkStreaming控制消费的速率(spark.streaming.kafka.maxRatePerPartition),可以控制最大的消费速率,在参数中设置;新版本设置背压机制实现消费处理的动态平衡。
6.对代码进行优化,尽可能的一次性计算多个结果,减少shuffer过程;
7.处理的结果如果过多,可以将数据保存到MySQL集群、MongoDB集群【支持事物】或ES【不支持事物】,增大吞吐量
8、消费线程将拉取的消息放到一个滑动窗口中,通过滑动窗口控制拉取的速度
9、对于倾斜的key加以处理,加随机数等方式打散

面试题7:kafka reblance发生时机和分区分配策略?

reblance触发时机?
     当出现以下几种情况时,Kafka 会进行一次重新分区分配操作,即 Kafka 消费者端的 Rebalance 操作
     ① 同一个 consumer 消费者组 group.id 中,新增了消费者进来,会执行 Rebalance 操作
     ② 消费者离开当期所属的 consumer group组。比如 主动停机  或者  宕机
     ③ 分区数量发生变化时(即 topic 的分区数量发生变化时)
     ④ 消费者主动取消订阅
reblance三种策略?
kafka新版本提供了三种rebalance分区分配策略:(partition.assignment.strategy)
    range
    round-robin
    sticky

面试题8:kafka的分区数如何确定?

在partition级别上达到均衡负载是实现吞吐量的关键,合适的partition数量可以达到高度并行读写和负载均衡的目的,需要根据每个分区的生产者和消费者的目标吞吐量进行估计。
可以遵循一定的步骤来确定分区数:根据某个topic日常"接收"的数据量等经验确定分区的初始值,然后测试这个topic的producer吞吐量和consumer吞吐量。假设它们的值分别是Tp和Tc,单位可以是MB/s。然后假设总的目标吞吐量是Tt,那么
numPartitions = Tt / max(Tp, Tc)
说明:Tp表示producer的吞吐量。测试producer通常是很容易的,因为它的逻辑非常简单,就是直接发送消息到Kafka就好了。Tc表示consumer的吞吐量。测试Tc通常与应用消费消息后进行什么处理的关系更大,相对复杂一些。
分区数过多的危害?
一、客户端/服务器端需要使用的内存就越多
二、文件句柄的开销
三、越多的分区可能增加端对端的延迟
四、降低高可用性

面试题9:数据发往kafka的分区规则?

key 和 value 的类型,一般都用字符串即可。 数据到底写入到哪一个分区中: 
    如果指定了分区,就写入到指定的分区中。 
    如果没有指定分区,指定了 key,按照 key 的 hashcode,取模,写入对应的分区 
    如果没有指定分区和 key,轮询机制

面试题10:kafka producer buffer pool的作用?

Kafka通过使用内存缓冲池的设计,让整个发送过程中的存储空间循环利用,有效减少JVM GC造成的影响,从而提高发送性能,提升吞吐量。

面试题11:kafka时间轮的作用?

kakfa通过时间轮来处理延迟任务,只将时间轮的槽保存到延迟队列,大大的减少了延迟队列的元素数量,这样对于元素的增加删除性能有很大提高;
kafka通过阻塞的方式poll延迟队列的,减少了大量的空转;
为了保证线程安全,灵活运用读写锁、原子对象、synchronized控制时间轮的操作;

面试题12:kafka为什么这么快?

spark面试问题?

面试题13::Spark为什么比mapreduce快?

这是一道常见的面试题,回答时可以从IO、shuffle与排序、资源、部署模式、内存管理策略等各个方面来回答。
1:MR基于磁盘的分布式计算引擎,频繁的磁盘IO。spark基于内存进行计算,DAG计算模型,大大减少了磁盘IO。
2:spark多线程运行,MR多进程运行
3:spark粗粒度资源申请,MR细粒度资源申请
4:spark支持多种部署模式,MR只支持yarn上部署
5:shuffle与排序;MR有reducer必排序,一般会经过3次排序,Spark Shuffle数据的排序操作不是必须的。
                  spark有多种shuffle类型,spark不一定会发生shuffle,MR一定会发生shuffle
6:spark具有灵活的内存管理策略

面试题14:spark Repartition和Coalesce 的关系与区别,能简单说说吗?

1)关系:
两者都是用来改变RDD的partition数量的,repartition底层调用的就是coalesce方法:coalesce(numPartitions, shuffle = true)
2)区别:
repartition一定会发生shuffle,coalesce 根据传入的参数来判断是否发生shuffle。
一般情况下增大rdd的partition数量使用repartition,减少partition数量时使用coalesce。

面试题15:简述下Spark中的缓存(cache和persist)与checkpoint机制,并指出两者的区别和联系?

关于Spark缓存和检查点的区别,大致可以从这4个角度去回答:
位置
Persist 和 Cache将数据保存在内存,Checkpoint将数据保存在HDFS
生命周期
Persist 和 Cache 程序结束后会被清除或手动调用unpersist方法,Checkpoint永久存储不会被删除。
RDD依赖关系
Persist 和 Cache,不会丢掉RDD间的依赖链/依赖关系,CheckPoint会斩断依赖链。
执行与使用
persist 中RDD的逻辑只会执行一次,而checkPoint会执行两次
生产环境中一般都是cache和checkPoint连用,这样RDD逻辑只会执行一次,并且会缓存到checkPoint中

面试题16:spark on yarn client模式与cluster的区别?

1:driver所在位置不同。
client模式下driver线程只在spark-submit命令提交的机器上。
cluster模式下,driver线程只在applicationMaster所在的节点。
2:启动的任务进程名字不一样。
client模式下:ExecutorLanucher 只负责向yarn申请容器来启动executor
cluster模式下,applicationMaster既要负责申请运行executor的资源,又要调Driver线程来做task调度。

面试题17:spark rdd、dataframe、dataset的区别与联系?

三者的共性:
1. RDD、DataFrame、DataSet全都是spark平台下的分布式弹性数据集,为处理超大型数据提供便利;
2. 三者都有惰性机制,在进行创建、转换,如map方法时,不会立即执行,只有在遇到Action如foreach时,三者才会开始遍历运算;
3. 三者有许多共同的函数,如filter,排序等;
4. 在对DataFrame和Dataset进行操作许多操作都需要这个包:import spark.implicits._(在创建好SparkSession对象后尽量直接导入);
5. 三者都会根据 Spark 的内存情况自动缓存运算,这样即使数据量很大,也不用担心会内存溢出;
6. 三者都有partition的概念;
7. DataFrame和Dataset均可使用模式匹配获取各个字段的值和类型。
三者的区别:
1) RDD:
=> RDD 一般和spark mllib同时使用
=> RDD不支持sparksql操作
2) DataFrame:
=> 与RDD 和 DataSet不同,DataFrame每一行的类型固定为Row, 每一列的值没法直接访问,只有通过解析才能获取各个字段的值。
=> DataFrame 与 DataSet 一般不与spark mllib同时使用。
=> DataFrame 与 DataSet 均支持SparkSQL的操作,比如select,groupby 之类,还能注册临时表/视窗,进行sql语句操作。
=> DataFrame 与 DataSet支持一些特别方便的保存方式,比如保存成csv,可以带上表头,这样每一列的字段名一目了然
2) DataSet:
=> DataSet 和 DataFrame拥有完全相同的成员函数,区别只是每一行的数据类型不同。DataFrame其实就是DataSet的一个特例。 type DataFrame = DataSet[Row]
=> DataFrame也可以叫DataSet[Row],每一行类型是Row,不解析,每一行究竟有哪些字段,各个字段又是什么类型都无从得知,只能用上面的getAs方法或者共性中的第七条提到的模式匹配拿出特定字段,而DataSet中,每一行是什么类型是不一定的,在自定义case class之后可以很自由的获取每一行的信息。
三者的转换
1) RDD转DataFrame
方案一:直接将字段名称传入toDF中       .toDF(col1,col2...)
方案二:通过反射的方式   
java:  .createDataFrame(RDD1,JavaBean.class);
scala: 通过case class 如:
 spark.sparkContext.textFile(path).map(line => line.split(",")).map(x => {
            Person(x(0),x(1).trim.toLong)}).toDF()
方案三:构造Schema的方式   .createDataFrame(rdd,scheme)
2)DataFrame转RDD
df.rdd或者df.javaRDD()
3)RDD转DataSet
方案一:使用toDS()算子,需要导入隐式转换(import spark.implicits._)
方案二:使用spark.createDataset(rdd)
4) DataSet转RDD
直接使用.rdd
5)DataFrame转DataSet
封装样例类,调用df.as[xxx]
case class xxx()
df.as[xxx]
6) DataSet转DataFrame
ds.toDF()

面试题18:updateStateByKey与mapWithState 使用区别?

updateStateByKey :统计全局的key的状态,就算没有数据输入,它也会在每一个批次的时候返回之前的key的状态。
缺点:若数据量太大的话,需要checkpoint的数据会占用较大的存储,效率低下。
mapWithState:也是用于全局统计key的状态,但是它如果没有数据输入,便不会返回之前的key的状态,有一点增量的感觉。效率更高,生产中建议使用
优点:我们可以只是关心那些已经发生的变化的key,对于没有数据输入,则不会返回那些没有变化的key的数据。这样的话,即使数据量很大,checkpoint也不会像updateStateByKey那样,占用太多的存储

面试题19:spark sql三种join方式?

Broadcast Hash Join:适合一张很小的表和一张大表进行Join;
Shuffle Hash Join:适合一张小表(比上一个大一点)和一张大表进行Join;
Sort Merge Join:适合两张大表进行Join;
shuffle Hash Join策略必须满足以下条件:
1.仅支持等值 Join,不要求参与 Join 的 Keys 可排序(这点是和sort-merge join相对应)
2.spark.sql.join.preferSortMergeJoin 参数必须设置为 false,参数是从 Spark 2.0.0 版本引入的,默认值为true,也就是默认情况下选择 Sort Merge Join;
3.小表的大小(plan.stats.sizeInBytes)必须小于 spark.sql.autoBroadcastJoinThreshold *spark.sql.shuffle.partitions(默认值200)其实就是让每一个小表的分区都类似于广播变量的小表;
4.而且小表大小(stats.sizeInBytes)的三倍必须小于等于大表的大小(stats.sizeInBytes),也就是a.stats.sizeInBytes * 3 < = b.stats.sizeInBytes
Broadcast Hash Join 策略必须满足以下条件:
1、小表的数据必须很小,可以通过 spark.sql.autoBroadcastJoinThreshold 参数来配置,默认是 10MB;
2、如果内存比较大,可以将阈值适当加大;
3、将 spark.sql.autoBroadcastJoinThreshold 参数设置为 -1,可以关闭这种连接方式;
4、只能用于等值 Join,不要求参与 Join 的 keys 可排序 。
要启用 Shuffle Sort Merge Join 必须满足的条件是仅支持等值 Join,并且要求参与 Join 的 Keys 可排序。

面试题20:RDD有什么缺陷?

1)不支持细粒度的写和更新操作(如网络爬虫),spark写数据是粗粒度的 所谓粗粒度,就是批量写入数据,为了提高效率。但是读数据是细粒度的也就是 说可以一条条的读 
2)不支持增量迭代计算,Flink支持

面试题21:groupByKey和reduceByKey区别?

reduceByKey 和 groupByKey 都存在 shuffle 的操作,但是 reduceByKey 可以在 shuffle 前对分区内相同 key 的数据进行预聚合(combine)功能,这样会减少落盘的 数据量,而 groupByKey 只是进行分组,不存在数据量减少的问题,reduceByKey 性能比较 高。 从功能的角度:reduceByKey 其实包含分组和聚合的功能。GroupByKey 只能分组,不能聚 合,所以在分组聚合的场合下,推荐使用 reduceByKey,如果仅仅是分组而不需要聚合。那 么还是只能使用 groupByKey

面试题22:RDD的弹性表现在哪几点?

1)自动的进行内存和磁盘的存储切换;
2)基于Lineage的高效容错;
3)task如果失败会自动进行特定次数的重试;
4)stage如果失败会自动进行特定次数的重试,而且只会计算失败的分片;
5)checkpoint和persist,数据计算之后持久化缓存;
6)数据调度弹性,DAG task调度和资源无关;
7)数据分片的高度弹性。

面试题23、RDD通过Linage(记录数据更新)的方式为何很高效?

1)lazy记录了数据的来源,RDD是不可变的,且是lazy级别的,且RDD之间构成了链条,lazy是弹性的基石。由于RDD不可变,所以每次操作就产生新的rdd,
不存在全局修改的问题,控制难度下降,所有有计算链条将复杂计算链条存储下来,计算的时候从后往前回溯 900步是上一个stage的结束,要么就checkpoint。
2)记录原数据,是每次修改都记录,代价很大如果修改一个集合,代价就很小,官方说rdd是粗粒度的操作,是为了效率,为了简化,每次都是操作数据集合,
写或者修改操作,都是基于集合的rdd的写操作是粗粒度的,rdd的读操作既可以是粗粒度的也可以是细粒度,读可以读其中的一条条的记录。
3)简化复杂度,是高效率的一方面,写的粗粒度限制了使用场景如网络爬虫,现实世界中,大多数写是粗粒度的场景。

面试题24:spark3.0 AQE新特性?

自动分区合并
自动数据倾斜处理
Join 策略调整
详细见:https://zhuanlan.zhihu.com/p/622617762

面试题25:spark Hash shuffle与Sort shuffle的区别?

Hash shuffle:一种是普通运行机制,另一种是合并的运行机制。

产生的磁盘小文件的个数为maptask*reducetask 每个分区是一个task 磁盘小文件多,I/O增多,产生的GC会增多。 这种shuffle产生的磁盘小文件,容易导致OOM

这种模式不单单产生的磁盘小文件比较多,而且占用内存也比较多。 我们应该降低这种磁盘之间的接触。 Hash shuffle的优化机制

启动HashShuffle的合并机制ConsolidatedShuffle的配置:spark.shuffle.consolidateFiles=true 两个task共用一个buffer缓冲区

如果 Reducer 端的并行任务或者是数据分片过多的话则 Core * Reducer Task 依旧过大,也会产生很多小文件。

sort shuffle:

Spark1.6之前用hash shuffle,在spark1.6之后使用sort shuffle Sort shuffle的两种机制: 估算,去要内存5.01*2-5 要不到的时候就去排序 最终溢写的小的磁盘小文件合并成为了一个大的磁盘小文件 当不需要排序的时候,默认使用Bypass机制

bypass运行机制的触发条件: Shuffle reduce task 数量小于spark.shuffle .sort.bypassMerge Threadshold参数的值小于200,不开启,溢写磁盘不需要排序,小于等于的时候是开启的。

不是聚合类的shuffle算子(比如reduceByKey)。

hash shuffle(合并运行机制)优化机制产生的磁盘小文件的个数:C*R(core*reducer)
Hash shuffle(普通):产生的磁盘小文件:M*R
Sort shuffle产生的磁盘小文件的个数为:2*M
Bypass机制产生的磁盘小文件的个数为:2*M

面试题26、哪些Spark算子会有shuffle过程

去重:distinct
排序:groupByKey ,reduceByKey,sortByKey
重分区:repartition、repartitionAndSortWithinPartitions、coalesce
集合或者表连接操作:join,cogroup

flink面试题?

27: flink中什么时候形成算子链,什么时候断链。

## 形成算子链的条件:
上下游的并行度一致(槽一致)
该节点必须要有上游节点跟下游节点;
下游StreamNode的输入StreamEdge只能有一个) 
上下游节点都在同一个 slot group 中(下面会解释 slot group) 
下游节点的 chain 策略为 ALWAYS(可以与上下游链接,map、flatmap、filter等默认是ALWAYS) 
上游节点的 chain 策略为 ALWAYS 或 HEAD(只能与下游链接,不能与上游链接,Source默认是HEAD) 
上下游算子之间没有数据shuffle (数据分区方式是 forward) 
用户没有禁用 chain 
## 禁用算子链的场景
某个算子需要单独设置资源:当某个算子需要单独设置资源时,比如说 Memory、CPU 等,这个算子就不能被放置在算子链里面,需要单独成为一个 Task。(背压时定位问题)
需要等待外部事件触发:某些算子需要等待外部事件触发才能继续处理数据,例如读取外部文件或者接收网络消息等,这时候算子如果被放到算子链里面,则整个链都会阻塞,产生性能问题,因此需要禁用算子链。
处理时间窗口非常大的数据集:对于非常大的数据集,特别是在窗口结束时间很大的情况下,算子链可能会消耗太长的时间,导致超时或者OOM错误。禁用算子链可以避免此类问题。
需要流数据处理与批数据处理共存:如果同时需要进行流式与批处理,禁用算子链可以让流和批处理同时运行,避免出现串行化的问题。
...

28:flink keyby之后出现数据倾斜的原因是什么?如何定位和解决的?

keyby之后出现数据倾斜常见原因?
Key的选择不合适:如果选择的Key不平衡或者有明显的热点数据,就容易出现数据倾斜的问题。应该尝试选择更加平衡的Key,例如多个属性组合的方式。
数据分布不均匀:有些数据在时间、空间上分布不均匀,导致某些Key的数据量比其他Key大很多。可以通过统计每个Key对应的数据量,找到数据分布不均匀的原因。
算子链长/复杂度高:当算子链过长或者算子的操作很复杂时,也容易导致某些Task的数据处理量过大。可以通过拆分算子链、优化算子操作等方式来解决。
并行度设置不当:并行度过高可能导致资源浪费,过低则会导致数据倾斜。应该根据实际情况,合理设置并行度。
定位:
步骤1:定位反压
定位反压有2种方式:Flink Web UI 自带的反压监控(直接方式)、Flink Task Metrics(间接方式)。通过监控反压的信息,可以获取到数据处理瓶颈的 Subtask。
步骤2:确定数据倾斜
Flink Web UI 自带Subtask 接收和发送的数据量。当 Subtasks 之间处理的数据量有较大的差距,则该 Subtask 出现数据倾斜。如下图所示,红框内的 Subtask 出现数据热点。
解决方案:
keyBy后聚合操作存在数据倾斜(通过Flink LocalKeyBy思想来解决)
在 keyBy 上游算子数据发送之前,首先在上游算子的本地对数据进行聚合后再发送到下游,使下游接收到的数据量大大减少,从而使得 keyBy 之后的聚合操作不再是任务的瓶颈。类似 MapReduce 中 Combiner 的思想,但是这要求聚合操作必须是多条数据或者一批数据才能聚合,单条数据没有办法通过聚合来减少数据量。从 Flink LocalKeyBy 实现原理来讲,必然会存在一个积攒批次的过程,在上游算子中必须攒够一定的数据量,对这些数据聚合后再发送到下游。
注意:Flink 是实时流处理,如果 keyby 之后的聚合操作存在数据倾斜,且没有开窗口
的情况下,简单的认为使用两阶段聚合,是不能解决问题的。因为这个时候 Flink 是来一条
处理一条,且向下游发送一条结果,对于原来 keyby 的维度(第二阶段聚合)来讲,数据
量并没有减少,且结果重复计算(非 FlinkSQL,未使用回撤流)
keyBy后窗口聚合操作存在数据倾斜(两阶段聚合)
因为使用了窗口,变成了有界数据的处理,窗口默认是触发时才会输出一条结果发往下游,所以可以使用两阶段聚合的方式:
实现思路:
第一阶段聚合:key 拼接随机数前缀或后缀,进行 keyby、开窗、聚合
注意:聚合完不再是 WindowedStream,要获取 WindowEnd 作为窗口标记作为第二阶段分组依据,避免不同窗口的结果聚合到一起)
第二阶段聚合:去掉随机数前缀或后缀,按照原来的 key 及 windowEnd 作 keyby、聚合

29:flink taskmanger slot jobmanger 并行度之间资源怎么分配的?

## taskmanager、slot、并行度之间的关系
在Yarn集群中Job分离模式下,Taskmanger的数量=ceil(slot数量/并行度)。slotNumber>=taskmanger*并行度
## TaskManager/slots与cpu的关系
经验上讲Slot的数量与CPU-core的数量一致为好。但考虑到超线程,可以让slotNumber=2*cpuCore。
## slot与并行度
一般我们设置task的并行度不能超过slot的数量。
一个Task的并行度等于分配给它的Slot个数(前提槽资源充足)。

30:flink application、session、pre-job模式的区别和使用场景?

application:每个job独享一个集群,job退出则集群退出。main方法在集群上运行。
session:多个job共享集群资源,job退出集群也不会退出。main方法在客户端运行。
pre-job:每个job独享一个集群,job退出则集群退出。main方法在客户端运行。
适用场景:
Session模式一般用来部署那些对延迟非常敏感但运行时长较短的作业,需要频繁提交小job的场景。
Per-Job模式一般用来部署那些长时间运行的作业
Application模式综合了两种模式的所有优点,建议生产上适用。

31:讲讲flink checkpoint原理?对齐式和非对齐式checkpoint有什么区别?

checkpoint作用?
保证Flink集群在某个算子因为某些原因(如异常退出)出现故障时,能够将整个应用流图的状态恢复到故障之前的某一状态,保证应用流图状态的一致性。checkpoint是一种容错恢复机制
checkpoint保存的是什么数据?
当前检查点开始时数据源(例如Kafka)中消息的offset。
记录了所有有状态的operator当前的状态信息(例如sum中的数值)。
Checkpoint有两种实现方式:对齐式(Aligned Checkpoint)和非对齐式(Unaligned Checkpoint)。
对齐式Checkpoint:
 计算所有执行中的任务完成当前状态后,最终整个程序的一个完整状态。
 取得一个全局会话锁,暂停所有输入数据源的操作,等待所有任务的结果输出。
 对任务进行Barrier插入,通过Barrier Barrier来将任务切分成Snapshotable 和 Non-Snapshotable 两类任务。Snapshotable任务需要将其状态发送到其他TaskManager进行二次备份,而Non-Snapshotable任务则不需要。
 在所有任务都完成Snapshotable操作之后,JobManager根据接收到的各个任务的实际状态,重新计算出恢复点位置。
 恢复以该状态为准的下一个CheckPoint,之前的CheckPoint使用完毕并且作废。
非对齐式Checkpoint:
 每个任务在被触发Checkpoint时,都记录下自己当前的状态信息。
 在所有任务完成状态保存之后,JobManager会选择其中任意一个Checkpoint作为重启点。当它恢复时,每个任务将自己记录的状态发送给它所属的Operator进行恢复。
 非对齐式Checkpoint允许各个Task在不同的时间点异步进行Checkpoint操作,节省了执行任务的总体时间。
对齐式Checkpoint可以保证所有任务的状态是一致的,但是需要等待所有任务都完成Checkpoint后才能进入下一个Checkpoint,因此会影响整个应用程序的处理速度,exactly once 精确一次性支持。
而非对齐式Checkpoint则可以保证任务相互之间的状态是独立的,每个任务在自己的频率上异步进行Checkpoint,可以大大提高系统的可扩展性和容错性,支持at least once语义 最少一次,消息不会丢失,但是可能会重复。
在Flink中,Checkpoint除了帮助我们实现分布式快照外,还可以通过控制Checkpoint的间隔、最大并发数、存储位置等参数来优化应用程序的性能,并确保应用程序的容错性。

32:讲讲watermark工作机制?

## watermark的意义:
标识 Flink 任务的事件时间进度,从而能够推动事件时间窗口的触发、计算。
解决事件时间窗口的乱序问题。
## watermark的触发时机:
1:watermark时间 >= window_end_time 即max(timestamp, currentMaxTimestamp....)-allowedLateness >= window_end_time
2:在[window_start_time,window_end_time)中有数据存在
## 乱序处理可归纳为:
窗口window 的作用是为了周期性的获取数据。
watermark的作用是防止数据出现乱序(经常),事件时间内获取不到指定的全部数据,而做的一种保险方法。
allowLateNess是将窗口关闭时间再延迟一段时间。
sideOutPut是最后兜底操作,所有过期延迟数据,指定窗口已经彻底关闭了,就会把数据放到侧输出流。

33:flink双流join?

1:flink window join
join()
通俗理解,将两条实时流中元素分配到同一个时间窗口中完成Join。两条实时流数据缓存在Window State中,当窗口触发计算时,执行join操作。(窗口对齐才会触发)
支持Tumbling Window Join (滚动窗口),Sliding Window Join (滑动窗口),Session Widnow Join(会话窗口),支持处理时间和事件时间两种时间特征。
源码核心总结:windows窗口 + state存储 + 双层for循环执行join()
coGroup()
coGroup的作用和join基本相同,但有一点不一样的是,如果未能找到新到来的数据与另一个流在window中存在的匹配数据,仍会将其输出。
只有 inner join 肯定还不够,如何实现 left/right outer join 呢?答案就是利用 coGroup() 算子。
它的调用方式类似于 join() 算子,也需要开窗,但是 CoGroupFunction 比 JoinFunction 更加灵活,
可以按照用户指定的逻辑匹配左流和/或右流的数据并输出。(二重循环)
2:Flink Interval Join
join() 和 coGroup() 都是基于窗口做关联的。但是在某些情况下,两条流的数据步调未必一致。例如,订单流的数据有可能在点击流的购买动作发生之后很久才被写入,如果用窗口来圈定,很容易 join 不上。所以 Flink 又提供了"Interval join"的语义,按照指定字段以及右流相对左流偏移的时间区间进行关联,即:
right.timestamp ∈ [left.timestamp + lowerBound; left.timestamp + upperBound]
interval join 也是 inner join,虽然不需要开窗,但是需要用户指定偏移区间的上下界,并且只支持事件时间。示例代码如下。注意在运行之前,需要分别在两个流上应用 assignTimestampsAndWatermarks() 方法获取事件时间戳和水印。 interval join 与 window join 不同,是两个 KeyedStream 之上的操作,并且需要调用 between() 方法指定偏移区间的上下界。
如果想令上下界是开区间,可以调用 upperBoundExclusive()/lowerBoundExclusive() 方法。
3:Flinksql Regular Join
Regular Join 是最为基础的没有缓存剔除策略的 Join。Regular Join 中两个表的输入和新都会对全局可见,影响之后所有的 Join 结果。举例,在一个如下的 Join 查询里,
Orders 表的新纪录会和 Product 表所有历史纪录以及未来的纪录进行匹配。-号代表回撤,+号代表最新数据
SELECT * FROM Orders
INNER JOIN Product
ON Orders.productId = Product.id
因为历史数据不会被清理,所以 Regular Join 允许对输入表进行任意种类的更新操作(insert、update、delete)。然而因为资源问题 Regular Join 通常是不可持续的,一般只用做有界数据流的 Join。
4:flinksql Time-Windowed Join
Time-Windowed Join 利用窗口给两个输入表设定一个 Join 的时间界限,超出时间范围的数据则对 JOIN 不可见并可以被清理掉。值得注意的是,这里涉及到的一个问题是时间的语义,时间可以指计算发生的系统时间(即 Processing Time),也可以指从数据本身的时间字段提取的 Event Time。如果是 Processing Time,Flink 根据系统时间自动划分 Join 的时间窗口并定时清理数据;如果是 Event Time,Flink 分配 Event Time 窗口并依据 Watermark 来清理数据。
5:flinksql 时态表
虽然 Timed-Windowed Join 解决了资源问题,但也限制了使用场景: Join 两个输入流都必须有时间下界,超过之后则不可访问。这对于很多 Join 维表的业务来说是不适用的,因为很多情况下维表并没有时间界限。针对这个问题,Flink 提供了 Temporal Table Join 来满足用户需求。
Temporal Table Join 类似于 Hash Join,将输入分为 Build Table 和 Probe Table。前者一般是纬度表的 changelog,后者一般是业务数据流,典型情况下后者的数据量应该远大于前者。在 Temporal Table Join 中,Build Table 是一个基于 append-only 数据流的带时间版本的视图,所以又称为 Temporal Table。Temporal Table 要求定义一个主键和用于版本化的字段(通常就是 Event Time 时间字段),以反映记录在不同时间的内容。
时态表可以划分成一系列带版本的表快照集合,表快照中的版本代表了快照中所有记录的有效区间,有效区间的开始时间和结束时间可以通过用户指定,根据时态表是否可以追踪自身的历史版本与否,时态表可以分为 版本表 和 普通表。

34: 讲讲flink状态后端,怎么选择,各有什么优缺点?

Flink状态后端是指用来存储和管理分布式数据流应用程序的状态数据的底层系统。Flink提供了多种状态后端实现,包括内存状态后端、文件系统状态后端和RocksDB状态后端。
内存状态后端
内存状态后端是默认的状态后端,使用哈希表来管理状态,并将所有状态存储在JVM堆中。
优点:速度快、延迟低,适合用于非常小的状态或者需要最佳性能的应用场景。
缺点:容量有限,无法处理大量状态或长时间运行的应用程序,同时,当任务发生故障时,由于状态存储在堆中,可能会导致状态丢失。
文件系统状态后端
文件系统状态后端利用Hadoop的文件系统(HDFS)来存储状态信息。
优点:具有较高的容量和可扩展性,支持持久化状态和快速恢复,适合用于高可用方案。
缺点:延迟较高,不适合对状态的实时操作。
RocksDB 状态后端
将工作状态保存在RocksDB数据库(RocksDB 是一个基于 LSM 实现的 KV 数据库,所以个人理解State数据部分存储在内存中,一部分存储在磁盘文件上)。
优点:可以高效地管理大规模状态数据,非常适合在长时间运行的应用程序中使用。RocksDB状态后端比文件系统状态后端具有更好的性能和容量,RocksDB状态后端具有高可用性,并支持快速故障恢复。并且支持增量checkpint,支持长窗口大键控状态,适合用于高可用方案。
缺点:需要更多的资源支持。
选择哪种状态后端,需要根据业务场景和要求进行评估。一般来说,如果关注的是低延迟和高性能,那么使用内存状态后端或者RocksDB状态后端会是比较好的选择;如果注重可靠性和可扩展性,则可以选择文件系统状态后端。在实际的使用中,还可以通过配置Flink的参数调整状态后端的功能、容量以及性能表现等方面,以达到更优的效果。

35:flink 维表关联的各种方案和优缺点?

1:预加载维表
通过定义一个类实现RichMapFunction,在open()中读取维表数据加载到内存中,在probe流map()方法中与维表数据进行关联。RichMapFunction中open方法里加载维表数据到内存的方式特点如下:
优点:实现简单。
缺点:因为数据存于内存维度信息全量加载到内存中,所以只适合小数据量并且维表数据更新频率不高的情况下。虽然可以中定义一个定时器定时更新维表,但是还是存在维表更新不及时的情况。
2:热存储维表(look up join)
这种方式是将维表Redis、Hbase、MySQL等外部存储中,实时流在关联维表数据的时候实时去外部存储中查询,这种方式特点如下:
优点:维度数据量不受内存限制,可以存储很大的数据量
缺点:因为维表数据在外部存储中,读取速度受制于外部存储的读取速度;另外维表的同步也有延迟。
3:广播维表
利用Flink的Broadcast State将维度数据流广播到下游做join操作。特点如下:
优点:维度数据变更后可以即时更新到结果中。
缺点:数据保存在内存中,支持的维度数据量比较小。
使用:
1.将维度数据发送到Kafka作为流S1。事实数据是流S2。
2.定义状态描述符MapStateDescriptor,如descriptor。
3.结合状态描述符,将S1广播出去,如S1.broadcast(descriptor),形成广播流(BroadcastStream) B1。
4.事实流S2和广播流B1连接,形成连接后的流BroadcastConnectedStream BC。
5.基于BC流,在KeyedBroadcastProcessFunction/BroadcastProcessFunction中实现Join的逻辑处理。
4:异步IO+guava
异步IO主要目的是为了解决与外部系统交互时网络延迟成为了系统瓶颈的问题
使用Aysnc I/O的前提条件
1)为了实现以异步I/O访问数据库或K/V存储,数据库等需要有能支持异步请求的client;若是没有,可以通过创建多个同步的client并使用线程池处理同步call的方式实现类似并发的client,但是这方式没有异步I/O的性能好。
2)AsyncFunction不是以多线程方式调用的,一个AsyncFunction实例按顺序为每个独立消息发送请求;
Flink中可以使用异步IO来读写外部系统,这要求外部系统客户端支持异步IO,不过目前很多系统都支持异步IO客户端。但是如果使用异步就要涉及到三个问题:
超时:如果查询超时那么就认为是读写失败,需要按失败处理;
并发数量:如果并发数量太多,就要触发Flink的反压机制来抑制上游的写入;
返回顺序错乱:顺序错乱了要根据实际情况来处理,Flink支持两种方式:允许乱序、保证顺序
使用缓存来存储一部分常访问的维表数据,以减少访问外部系统的次数,比如使用guava Cache
优点:维度数据不受限于内存,支持较多维度数据
缺点:需要热存储资源,维度更新反馈到结果有延迟(热存储导入,cache)
适用场景:维度数据量大,可接受维度更新有一定的延迟。
5:Temporal table function Join
Temporal table是持续变化表上某一时刻的视图,Temporal table function是一个表函数,(历史表)
传递一个时间参数,返回Temporal table这一指定时刻的视图。
可以将维度数据流映射为Temporal table,主流与这个Temporal table进行关联,可以关联到某一个版本(历史上某一个时刻)的维度数据。
优点:维度数据量可以很大,维度数据更新及时,不依赖外部存储,可以关联不同版本的维度数据。
缺点:只支持在Flink SQL API中使用。

36:flink异步io?

Flink异步I/O是一种高效的 I/O 方案,通过异步方式(非阻塞)实现了 I/O 操作和计算之间的解耦,避免了等待I/O操作完成所造成的阻塞。这种技术常用于大量网络I/O请求高并发的场景,如数据流处理和批处理。
在Flink中,异步I/O主要用于维表关联操作,通常使用异步database client(如AsyncMySQLClient)从外部存储系统中查询数据,并将查询到的结果与DataStream进行join操作。相对于同步I/O,异步I/O可以充分利用后台线程的资源,提高了系统的吞吐量和响应性能。
下面是 Flink 异步 I/O 的优势和使用方法:
优势:
非阻塞:异步执行I/O操作,不会阻塞计算操作;
高吞吐量:能够处理大量并发的I/O请求;
低延迟性:不需要等待I/O操作完成,有助于缩短处理时间;
减少CPU占用:异步I/O可以释放CPU资源,提高了系统的效率。
使用方法:
实现 AsyncFunction 接口:需要自己实现AsyncFunction接口,并在其中编写异步I/O的逻辑代码。
使用 AsyncDataStream API:Flink提供了 AsyncDataStream 类作为支持异步I/O的扩展,可以通过该类将异步I/O集成到DataStream API中。
配置 AsyncWaitOperator:在异步 I/O 的数据处理过程中,Flink提供了 AsyncWaitOperator 作为一个Buffer,用于缓存等待异步操作的结果。
需要注意的是,在使用Flink异步I/O时,需要细心处理异常和超时的情况,避免因为异步I/O导致整个应用程序崩溃。另外也要根据具体情况,合理调节AsyncWaitOperator的大小以及并发度等参数,以提高异步I/O的效果。

37: flink savepoint与checkpoint的应用和区别?

Flink中的Checkpoint和Savepoint是两种不同的机制,它们都用于容错,并且在恢复失败任务时起到了重要作用,但二者有着不同的应用场景和使用方法。
Checkpoint
Checkpoint是Flink中提供的一种机制,用于防止数据流处理过程中的数据丢失或错误。Checkpoint会将当前状态的快照备份到持久化存储中,并在发生故障时,使用这些快照来重新启动任务。Checkpoint机制需要开启后才能使用,并且通过设置间隔时间和最大并发数等参数进行配置。
Savepoint
Savepoint是Flink中提供的一种手动触发的机制,用于保存任务当前状态的一个快照,并可以在之后的时间点重新启动任务,或在另一个集群上恢复该状态。与Checkpoint不同,Savepoint是由用户主动触发的,用户可以选择在任何时候保存任务状态。这种机制常用于灰度升级或版本升级后的任务恢复。
区别:
Checkpoint是自动触发的,而Savepoint是手动触发的;
Checkpoint会定期进行,在任务执行过程中会多次生成checkpoint,而Savepoint只在手动发起时生成一次;
在任务停止时,Checkpoint产生的状态快照会被删除,而Savepoint生成的状态快照则会被保留以供后续使用或版本升级;
在任务恢复时,Checkpoint是在本地文件系统或分布式存储中进行,而Savepoint可以在另一个集群或不同的版本上进行。
总的来说,Checkpoint和Savepoint都是Flink中用于容错的机制。Checkpoint用于防止数据丢失或错误,而Savepoint则常用于灰度升级、版本升级等场景下生成状态备份。因此,在实际应用中需要根据具体需求选择合适的机制。

38:flink 背压原理?定位与解决方式?反压定位:

通过 Flink Web UI 自带的反压监控面板;
通过 Flink Task Metrics。
使用火焰图、线程 Dump、CPU Profiler分析:

监控面板:

backpressure Tab页面:backpressure status 和backpressured/Idle/busy idleTimeMsPerSecond busyTimeMsPerSecond backPressuredTimeMsPerSecond

Flink Task Metrics:① floatingBuffersUsage 为高则表明反压正在传导至上游。

② exclusiveBuffersUsage 则表明了反压可能存在倾斜。如果floatingBuffersUsage 高、exclusiveBuffersUsage 低,则存在倾斜。因为少数 channel 占用了大部分的 floating Buffer(channel 有自己的 exclusive buffer,当 exclusive buffer 消耗完,就会使用floating Buffer)。

使用火焰图、线程 Dump、CPU Profiler分析:使用方法和说明详见:

官网:/nightlies.apache.org/flink/flink-docs-release-1.16/zh/docs/ops/debugging/flame_graphs/
微信公众号文章:https://mp.weixin.qq.com/s/6GXuL7WcWhTu2Aq_k2vqAw

常见背压解决方式:

增加资源
通过增加资源(例如 TaskManager、CPU、内存等)的方式,来提高整个系统的处理能力,从而降低背压的风险。需要注意的是,增加资源是一种比较暴力的解决方式,并非所有情况都适用。
调整拓扑结构
通过调整 Flink 的拓扑结构,使得数据流更加合理地流动,从而减少背压的出现。具体来说,可采取以下措施:
 增加缓存队列的长度,以容纳更多的未处理数据。
 优化算子之间的并行度数量,避免出现单节点瓶颈。上下游并行度保持一致,合并算子链,使用共享资源槽位组。
 使用窗口(Window)或 State 来帮助管理状态,降低内存占用率。
 对于大规模任务,可以将任务拆分成多个小任务,以减少单个算子积压数据的风险。
 checkpoint相关优化
数据倾斜解决
  keyby前:把数据进行打散,重新均匀分配。
         通过调整并发度,解决数据源消费不均匀或者数据源反压的情况
 keyby之后:LocalKeyBy思想,本地聚合攒批后发往下游
        两阶段聚合。要获取 WindowEnd 作为窗口标记作为第二阶段分组依据(key+WindowEnd分组)
外部组件交互
如果发现我们的 Source 端数据读取性能比较低或者 Sink 端写入性能较差,需要检查第三方组件是否遇到瓶颈,还有就是做维表 join 时的性能问题。 
例如:
 1)Kafka 集群是否需要扩容,Kafka 连接器是否并行度较低;
 2)HBase 的 rowkey 是否遇到热点问题,是否请求处理不过来;
 3)ClickHouse 并发能力较弱,是否达到瓶颈。
关于第三方组件的性能问题,需要结合具体的组件来分析,最常用的思路:
1)异步 io+热缓存来优化读写性能
2)先攒批再读写
维表 join 的合理使用与优化

39:flink数据倾斜怎么定位?怎么处理?flink 数据倾斜的常见处理方式

40:flink去重方案?

1:mapState/ValueState+状态后端
使用RocksDBStateBackend,因为数据是存储在磁盘上,元数据保存在内存
                中。适合非常大的状态。在算子中,使用MapState数据结构,对key进行保存。
                数据来了查看MapState是否存在,存在 + 1,不存在设置为1。
       缺点:
   如果使用机械硬盘的话,flink数据量过大,磁盘会成为性能瓶颈。
   随之导致整个IO急剧下降。可能会出现背压情况!
      优点:精确去重
2:基于HyperLogLog:
HyperLogLog是去重计数的利器,能够以很小的精确度误差作为trade-off大幅减少空间占用,在不要求100%准确的计数场景极为常用
优点:高效,占用空间少
缺点:近似去重
3:布隆过滤器+状态后端/布隆过滤器+redis
   类似Set集合,用于判断当前元素是否存在当前集合中。
  布隆过滤器,当前的key是否存在容器中,不存在直接返回
    缺点:
    不能百分之百的保证精确。
    优点:
    插入和查询效率是非常的高
4:基于BitMap
用一个bit位来标记某个元素对应的Value,而Key即是该元素。由于采用了Bit为单位来存储数据,因此可以大大节省存储空间。
5:基于外部数据库
选择使用Redis或者HBase存储数据,我们只需要设计好存储的Key即可,不需要关心Flink任务重启造成的状态丢失问题
6:RoaringBitmap去重(推荐)
BitMap - 优点:精确去重,占用空间小(在数据相对均匀的情况下)。 缺点:只能用于数字类型(int或者long)。
RoaringBitmap:BitMap固然好用,但是对去重的字段只能用int或者long类型;但是如果去重字段不是int或者long怎么办呢?那我们就构建一个字段与BitIndex的映射关系表,通过MapFunction拿到字段对应的BitIndex之后,就可以直接进行去重逻辑了。
7:flink+starrocks/hudi(推荐)
通过starrocks和hudi的主键直接去重
优点:高效快速去重
缺点:超大规模数据性能待验证

41:flink如何保证端到端的exactly-once ?

不能百分之百保证exactly-once,只能尽可能的保证。需从每个阶段保证。
source端保证:使用可以记录数据位置并重设读取位置的组件(如kafka,文件)
flink内部保证:使用checkpint+state 将状态值保存在状态后端里,并且checkpoint需要设置为精确一次性语义
sink端保证:从故障恢复时,数据不会重复写入外部系统(幂等写入、事务写入)
幂等写入:幂等操作是指,同一个操作,可以执行很多次,但是不会对结果造成影响,与执行一次的结果保持一致
事务写入:在CheckPoint开始构建一个事务,当CheckPoint彻底完成时,提交事务。
事务写入又可以分为两种---WAL预写日志和2pc两阶段提交。DataStream API 提供了GenericWriteAheadSink模板类和TwoPhaseCommitSinkFunction 接口,可以方便地实现这两种方式的事务性写入。

42: flink两阶段提交?

两阶段提交流程:
1. jobMaster 会周期性的发送执行checkpoint命令(start checkpoint);
2.当source端收到执行指令后会产生一条barrier消息插入到input消息队列中,当处理到barrier时会执行本地checkpoint, 并且会将barrier发送到下一个节点,当checkpoint完成之后会发送一条ack信息给jobMaster ;
3. 当DAG图中所有节点都完成checkpoint之后,jobMaster会收到来自所有节点的ack信息,那么就表示一次完整的checkpoint的完成;
4. JobMaster会给所有节点发送一条callback信息,表示通知checkpoint完成消息,这个过程是异步的,并非必须的,方便做一些其他的事情,例如kafka offset提交到kafka。
Flink将两阶段提交协议中的通用逻辑抽象为了一个类—TwoPhaseCommitSinkFunction。
我们在实现端到端exactly-once的应用程序时,只需实现这个类的4个方法即可:
beginTransaction:开始事务时,会在目标文件系统上的临时目录中创建一个临时文件,之后将处理数据写入该文件。
preCommit:在预提交时,我们会刷新文件,关闭它并不再写入数据。我们还将为下一个Checkpoint的写操作启动一个新事务。
commit:在提交事务时,我们自动将预提交的文件移动到实际的目标目录。
abort:中止时,将临时文件删除。
如果出现任何故障,Flink将应用程序的状态恢复到最近一次成功的Checkpoint。如果故障发生在预提交成功之后,但还没来得及通知JobManager之前,在这种情况下,Flink会将operator恢复到已经预提交但尚未提交的状态。

43:Operator State与keyed State的区别?

44:任务链(Operator Chains)和 SlotSharing(子任务共享)有什么区别?

任务链(Operator Chains)和 SlotSharing(子任务共享)是 Flink 中用于优化数据流任务的两种机制,二者有着不同的作用和特点。
任务链(Operator Chains)是指把多个算子(operator)连接起来形成一个链式结构,将它们放入同一个线程中执行,从而减少了线程间的通信和序列化开销。在任务链中,一旦开始执行,就会生成一个长时间运行的计算单元,可以有效地提高任务的性能。任务链适用于那些需要频繁传输数据、中间结果计算复杂的场景,在这些场景下,使用任务链可以减少任务之间的通信开销,提高运行效率。
SlotSharing(子任务共享)是指在一个 TaskManager 上启动多个任务(subtask),并将这些任务分配到同一个 slot 中运行。每个 slot 有自己的资源限制和硬件资源,每个任务可以独立地调整资源配置。在 SlotSharing 中,多个任务共享同一个物理节点(TaskManager),可以有效地提高集群的资源利用率。SlotSharing 适用于那些需要大量并发处理的场景,在这些场景下,SlotSharing 可以最大限度地利用资源和加速任务执行。
总的来说,任务链和 SlotSharing 都是 Flink 中用于优化数据流任务的重要机制,但它们的应用场景和特点并不相同。在实际应用中,需要根据具体的需求选择合适的优化机制,以提高任务执行效率和系统的稳定性。

离线数仓常见面试问题?

面试题45:谈谈你对宽表的理解,以及企业建设中宽表的优缺点?

宽表一般是指基于相同粒度的数据进行拉宽整合的表,其中每一行包含多个相关的属性特征或特征,可以分为明细宽表和聚合宽表。相比于传统的长表,宽表具有更好的可读性、可维护性和数据库利用率。
在企业建设过程中,宽表的优点主要包括:
1:数据整合性好:宽表的结构使得不同来源的数据可以更方便地整合在一起,方便进行跨系统、跨部门的数据分析和决策。
2:数据冗余性低:宽表通过将多个属性或特征存储在同一行中,避免了数据的冗余存储,降低了存储和维护的成本。
3:数据查询效率高:宽表的结构有利于数据的快速查询和聚合,减少了多表数据关联的频次,提高了数据的利用率和分析效率。
然而。宽表也存在一些缺点,主要包括:
1:数据结构复杂:宽表的结构相对长表更为复杂,需要更多的数据处理和整合工作,增加了数据处理的难度和成本。
2:数据规模较大:由于宽表需要存储多个维度的数据,数据规模通常较大,需要更多的存储和计算资源。
3:可能存在数据一致性问题:由于宽表中不同属性或特征的更新可能不是同步进行的,来源于不同的业务系统。可能会导致数据的不一致性,需要采取一些措施进行数据一致性的维护。
因此,在企业建设中,宽表的使用需要根据具体情况进行权衡和选择,综合考虑数据的规模、复杂度、资源效率、使用场景等因素,以达到更好的数据分析和决策效果。

面试题46:星型模型、雪花模型的优缺点?以及使用场景?

雪花模型去除了冗余,设计复杂,可读性差,关联的维度表多,查询效率低,但是可扩展性好。
星型模型冗余度高,设计简单,可读性高,关联的维度表少,查询效率高,可扩展性低。
1:查询性能:雪花模型维度表、事实表之间的联接很多,性能比较低;星型模型数据冗余存储所以很多统计查询不需要做外部的连接。
2:模型复杂度:星型架构更简单。雪花模型数据模型的业务层级是由一个不同维度表主键-外键的关系来代表的。而在星形模型中,所有必要的维度表在事实表中都只拥有外键。
3:层次概念:雪花型架构更加贴近OLTP系统的结构,比较符合业务逻辑,层次比较清晰。
4:存储空间:雪花模型使用的是规范化数据,不会产生冗余数据,能够减少数据量,而相比之下星型架构会产生数据冗余。
5:ETL处理:雪花模型由于附属模型的限制,ETL相对复杂,不能并行化。星形模型加载维度表,不需要再维度之间添加附属模型,因此ETL就相对简单,而且可以实现高度的并行化。
总结:适用场景:雪花模型更加适合维度分析的场景,星型模型更加适合指标分析的场景。
根据我们的项目经验,一般建议使用星型架构。因为我们在实际项目中,往往最关注的是查询性能问题,至于磁盘空间一般都不是问题。当然,在维度表数据量极大,需要节省存储空间的情况下,或者是业务逻辑比较复杂、必须要体现清晰的层次概念情况下,可以使用雪花型维度。

面试题47:每天需要全量的删除和插入,数据量比较大,中间会有几分钟的空白时间,在这个期间可能会有人来访问这个数据,会出现查询不到数据的情况,怎么解决这个问题呢?

针对这个问题,可以考虑采用以下两种方案:
1:使用临时表:在每次全量更新前,先创建一张临时表,将新的全量数据插入到这张临时表中,再将主表中的数据全部删除,最后将临时表中数据插入到主表中。这样可以避免在更新过程中出现查询不到的情况,因为在更新期间,主表中的数据并没有被删除,只是被替换了。在更新完成后,再将临时表数据删除即可。
2:使用分区表:将数据按照时间分成多个分区表,每次全量更新时只更新最新的分区,其它分区不受影响。这样可以避免在更新过程中出现查询不到的情况,其它分区数据仍在。同时,分区表的查询性能也比较好,可以提高查询效率。
需要注意的是,这两种方案都需要在更新过程中禁止其它用户对主表进行查询,开通通过锁表或其它机制来实现。另外,在更新过程中需要保证数据的一致性和完整性,可以使用事务来保证更新操作的原子性。

面试题48:仓库中如何保证指标的准确性?

可以从事前,事中,事后三个方面来讲。
事前规范定义、统一计算口径、理解业务、保证业务逻辑正确
事中通过数据质量监控规则和数据血缘发现异常发生的节点和影响的库表,逻辑复核、代码审查,规范建模。
事后和业务人员确认有无异常,建立巡检机制,数据回溯和补数机制。建立相应的监控指标(波动范围、异动率...等指标)

面试题49:数据模型设计好以后,上游业务侧增加、修改或删除字段,该怎么处理?

事前:与上游建立知会机制与协同流程,及时同步业务与模型变更;接管 ODS 层,控制源头,ODS 是业务数据进入数仓的第一站,
是所有数据加工的源头,控制住源头,才能从根本上防止一个重复的数据体系的出现。
事中:通过技术手段捕捉上游元数据与字典值变更,从而方便以后问题追踪与影响分析
对于这种变化,人工处理的话,就是手动在数仓对应的表中增加、修改字段,然后修改同步任务;
这个最好可以搞成自动化的,比如,自动监控上游表结构的变更,变化后,自动去修改数仓中的表结构,自动修改同步任务。
事后:通过事后复盘优化流程与迭代技术

面试题50:离线数仓任务如何保证稳定性和可靠性?

离线数仓通常用于存储、处理和分析大量历史数据。为了保证任务的稳定性,可以采取以下措施:
1.数据备份:定期对数据仓库进行备份,确保在发生故障时可以快速恢复数据。
2.数据质量监控:实施数据质量检测和监控,对数据仓库中的数据进行清洗、整合和校验,以保证数据准确性。
3.分布式架构:采用分布式存储和计算架构,确保数据仓库的扩展性和弹性。在遇到大量请求或负载增加时,可以自动扩展资源来应对。
4.容错机制:实施容错机制,确保在单个节点发生故障时,任务可以继续执行。例如,使用Hadoop等大数据处理框架,它们自带容错和恢复机制。
5.监控与报警:对任务的执行状态进行实时监控,发现异常时及时报警,便于快速诊断和处理问题。
6.任务调度优化:使用任务调度工具(如Apache Airflow、Azkaban等)对任务进行合理的调度,避免任务之间的资源竞争,确保任务按照预期执行。
7.代码质量管理:对开发团队进行代码审查,确保代码质量高,减少因代码问题导致的任务失败。
8.文档和知识库:建立完善的文档和知识库,确保团队成员对系统有深入理解,便于解决问题和提高工作效率。
通过以上措施,可以有效地保证离线数仓任务的稳定性。同时,持续优化和改进系统架构,以适应不断变化的业务需求和技术挑战,是确
相关实践学习
基于MaxCompute的热门话题分析
本实验围绕社交用户发布的文章做了详尽的分析,通过分析能得到用户群体年龄分布,性别分布,地理位置分布,以及热门话题的热度。
SaaS 模式云数据仓库必修课
本课程由阿里云开发者社区和阿里云大数据团队共同出品,是SaaS模式云原生数据仓库领导者MaxCompute核心课程。本课程由阿里云资深产品和技术专家们从概念到方法,从场景到实践,体系化的将阿里巴巴飞天大数据平台10多年的经过验证的方法与实践深入浅出的讲给开发者们。帮助大数据开发者快速了解并掌握SaaS模式的云原生的数据仓库,助力开发者学习了解先进的技术栈,并能在实际业务中敏捷的进行大数据分析,赋能企业业务。 通过本课程可以了解SaaS模式云原生数据仓库领导者MaxCompute核心功能及典型适用场景,可应用MaxCompute实现数仓搭建,快速进行大数据分析。适合大数据工程师、大数据分析师 大量数据需要处理、存储和管理,需要搭建数据仓库?学它! 没有足够人员和经验来运维大数据平台,不想自建IDC买机器,需要免运维的大数据平台?会SQL就等于会大数据?学它! 想知道大数据用得对不对,想用更少的钱得到持续演进的数仓能力?获得极致弹性的计算资源和更好的性能,以及持续保护数据安全的生产环境?学它! 想要获得灵活的分析能力,快速洞察数据规律特征?想要兼得数据湖的灵活性与数据仓库的成长性?学它! 出品人:阿里云大数据产品及研发团队专家 产品 MaxCompute 官网 https://www.aliyun.com/product/odps&nbsp;
相关文章
|
1月前
|
SQL 前端开发 程序员
【面试题】前端开发中如何高效渲染大数据量?
【面试题】前端开发中如何高效渲染大数据量?
|
1月前
|
设计模式 SQL 算法
大数据面试总结
大数据面试总结
51 0
|
1月前
|
存储 安全 Java
Java大数据面试复习30天冲刺 - 日积月累,每日五题【Day03】——JavaSE
Java大数据面试复习30天冲刺 - 日积月累,每日五题【Day03】——JavaSE
50 0
|
3天前
|
SQL 大数据
常见大数据面试SQL-每年总成绩都有所提升的学生
一张学生成绩表(student_scores),有year-学年,subject-课程,student-学生,score-分数这四个字段,请完成如下问题: 问题1:每年每门学科排名第一的学生 问题2:每年总成绩都有所提升的学生
|
1月前
|
分布式计算 监控 大数据
《吊打面试官》- 大数据工程师50道中大厂面试真题保姆级详解
《吊打面试官》- 大数据工程师50道中大厂面试真题保姆级详解
39 1
《吊打面试官》- 大数据工程师50道中大厂面试真题保姆级详解
|
1月前
|
SQL 分布式计算 算法
程序员必备的面试技巧——大数据工程师面试必备技能
程序员必备的面试技巧——大数据工程师面试必备技能
52 0
|
1月前
|
缓存 运维 NoSQL
面试分享:Redis在大数据环境下的缓存策略与实践
【4月更文挑战第10天】探索Redis在大数据缓存的关键作用,本文分享面试经验及必备知识点。聚焦Redis数据结构(String、List、Set、Hash、Sorted Set)及其适用场景,缓存策略(LRU、LFU、TTL)与过期机制,集群和数据分片,以及性能优化和运维技巧。通过代码示例深入理解,助你面试成功,构建高效缓存服务。
74 4
|
1月前
|
Java 大数据
Java大数据面试复习30天冲刺 - 日积月累,每日五题【Day04】——JavaSE
Java大数据面试复习30天冲刺 - 日积月累,每日五题【Day04】——JavaSE
43 0
|
1月前
|
存储 安全 Java
Java大数据面试复习30天冲刺 - 日积月累,每日五题【Day02】——JavaSE
Java大数据面试复习30天冲刺 - 日积月累,每日五题【Day02】——JavaSE
39 0
|
5天前
|
分布式计算 大数据 关系型数据库
MaxCompute产品使用问题之如何查看数据离线同步每天从MySQL抽取的数据量
MaxCompute作为一款全面的大数据处理平台,广泛应用于各类大数据分析、数据挖掘、BI及机器学习场景。掌握其核心功能、熟练操作流程、遵循最佳实践,可以帮助用户高效、安全地管理和利用海量数据。以下是一个关于MaxCompute产品使用的合集,涵盖了其核心功能、应用场景、操作流程以及最佳实践等内容。