2021超全大数据面试宝典,吐血总结十万字,大数据面试收藏这一篇就够了 (四)

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
云原生大数据计算服务 MaxCompute,5000CU*H 100GB 3个月
云原生大数据计算服务MaxCompute,500CU*H 100GB 3个月
简介: 2021超全大数据面试宝典
  1. 请列举会引起Shuffle过程的Spark算子,并简述功能。


reduceBykey:

groupByKey:

…ByKey:

  1. 简述Spark的两种核心Shuffle(HashShuffle与SortShuffle)的工作流程(包括未优化的HashShuffle、优化的HashShuffle、普通的SortShuffle与bypass的SortShuffle)(重点


未经优化的HashShuffle:

 

优化后的Shuffle:

 

普通的SortShuffle:


     当 shuffle read task 的 数 量 小 于 等 于 spark.shuffle.sort。

bypassMergeThreshold 参数的值时(默认为 200),就会启用 bypass 机制。

 

  1. Spark常用算子reduceByKey与groupByKey的区别,哪一种更具优势?(重点)

reduceByKey:按照key进行聚合,在shuffle之前有combine(预聚合)操作,返回结果是RDD[k,v]。


groupByKey:按照key进行分组,直接进行shuffle。

开发指导:reduceByKey比groupByKey,建议使用。但是需要注意是否会影响业务逻辑。


  1. Repartition和Coalesce关系与区别


  1. 关系:

两者都是用来改变RDD的partition数量的,repartition底层调用的就是coalesce方法:coalesce(numPartitions, shuffle = true)


  1. 区别:

repartition一定会发生shuffle,coalesce根据传入的参数来判断是否发生shuffle

一般情况下增大rdd的partition数量使用repartition,减少partition数量时使用coalesce


  1. 分别简述Spark中的缓存机制(cache和persist)与checkpoint机制,并指出两者的区别与联系


都是做RDD持久化的


cache:内存,不会截断血缘关系,使用计算过程中的数据缓存。

checkpoint:磁盘,截断血缘关系,在ck之前必须没有任何任务提交才会生效,ck过程会额外提交一次任务。


  1. 简述Spark中共享变量(广播变量和累加器)的基本原理与用途。(重点


累加器(accumulator)是Spark中提供的一种分布式的变量机制,其原理类似于mapreduce,即分布式的改变,然后聚合这些改变。累加器的一个常见用途是在调试时对作业执行过程中的事件进行计数。而广播变量用来高效分发较大的对象。


共享变量出现的原因:


通常在向 Spark 传递函数时,比如使用 map() 函数或者用 filter() 传条件时,可以使用驱动器程序中定义的变量,但是集群中运行的每个任务都会得到这些变量的一份新的副本,更新这些副本的值也不会影响驱动器中的对应变量。


Spark的两个共享变量,累加器与广播变量,分别为结果聚合与广播这两种常见的通信模式突破了这一限制。


  1. 当Spark涉及到数据库的操作时,如何减少Spark运行中的数据库连接数?


使用foreachPartition代替foreach,在foreachPartition内获取数据库的连接。


  1. 简述SparkSQL中RDD、DataFrame、DataSet三者的区别与联系? (笔试重点


  1. RDD


优点:

编译时类型安全

编译时就能检查出类型错误

面向对象的编程风格

直接通过类名点的方式来操作数据


缺点:

序列化和反序列化的性能开销

无论是集群间的通信, 还是IO操作都需要对对象的结构和数据进行序列化和反序列化。

GC的性能开销,频繁的创建和销毁对象, 势必会增加GC


  1. DataFrame

DataFrame引入了schema和off-heap

schema : RDD每一行的数据, 结构都是一样的,这个结构就存储在schema中。 Spark通过schema就能够读懂数据, 因此在通信和IO时就只需要序列化和反序列化数据, 而结构的部分就可以省略了。


  1. DataSet

DataSet结合了RDD和DataFrame的优点,并带来的一个新的概念Encoder。

当序列化数据时,Encoder产生字节码与off-heap进行交互,能够达到按需访问数据的效果,而不用反序列化整个对象。Spark还没有提供自定义Encoder的API,但是未来会加入。


三者之间的转换:

 

  1. SparkSQL中join操作与left join操作的区别?


join和sql中的inner join操作很相似,返回结果是前面一个集合和后面一个集合中匹配成功的,过滤掉关联不上的。


leftJoin类似于SQL中的左外关联left outer join,返回结果以第一个RDD为主,关联不上的记录为空。


部分场景下可以使用left semi join替代left join:


因为 left semi join 是 in(keySet) 的关系,遇到右表重复记录,左表会跳过,性能更高,而 left join 则会一直遍历。但是left semi join 中最后 select 的结果中只许出现左表中的列名,因为右表只有 join key 参与关联计算了


  1. SparkStreaming有哪几种方式消费Kafka中的数据,它们之间的区别是什么?重点


一、基于Receiver的方式


这种方式使用Receiver来获取数据。Receiver是使用Kafka的高层次Consumer API来实现的。receiver从Kafka中获取的数据都是存储在Spark Executor的内存中的(如果突然数据暴增,大量batch堆积,很容易出现内存溢出的问题),然后Spark Streaming启动的job会去处理那些数据。


然而,在默认的配置下,这种方式可能会因为底层的失败而丢失数据。如果要启用高可靠机制,让数据零丢失,就必须启用Spark Streaming的预写日志机制(Write Ahead Log,WAL)。该机制会同步地将接收到的Kafka数据写入分布式文件系统(比如HDFS)上的预写日志中。所以,即使底层节点出现了失败,也可以使用预写日志中的数据进行恢复。

 

二、基于Direct的方式


这种新的不基于Receiver的直接方式,是在Spark 1.3中引入的,从而能够确保更加健壮的机制。替代掉使用Receiver来接收数据后,这种方式会周期性地查询Kafka,来获得每个topic+partition的最新的offset,从而定义每个batch的offset的范围。当处理数据的job启动时,就会使用Kafka的简单consumer api来获取Kafka指定offset范围的数据。

 

优点如下:


简化并行读取:如果要读取多个partition,不需要创建多个输入DStream然后对它们进行union操作。Spark会创建跟Kafka partition一样多的RDD partition,并且会并行从Kafka中读取数据。所以在Kafka partition和RDD partition之间,有一个一对一的映射关系。


高性能:如果要保证零数据丢失,在基于receiver的方式中,需要开启WAL机制。这种方式其实效率低下,因为数据实际上被复制了两份,Kafka自己本身就有高可靠的机制,会对数据复制一份,而这里又会复制一份到WAL中。而基于direct的方式,不依赖Receiver,不需要开启WAL机制,只要Kafka中作了数据的复制,那么就可以通过Kafka的副本进行恢复。


一次且仅一次的事务机制。


三、对比:


基于receiver的方式,是使用Kafka的高阶API来在ZooKeeper中保存消费过的offset的。这是消费Kafka数据的传统方式。这种方式配合着WAL机制可以保证数据零丢失的高可靠性,但是却无法保证数据被处理一次且仅一次,可能会处理两次。因为Spark和ZooKeeper之间可能是不同步的。


基于direct的方式,使用kafka的简单api,Spark Streaming自己就负责追踪消费的offset,并保存在checkpoint中。Spark自己一定是同步的,因此可以保证数据是消费一次且仅消费一次。


在实际生产环境中大都用Direct方式


  1. 简述SparkStreaming窗口函数的原理(重点


窗口函数就是在原来定义的SparkStreaming计算批次大小的基础上再次进行封装,每次计算多个批次的数据,同时还需要传递一个滑动步长的参数,用来设置当次计算任务完成之后下一次从什么地方开始计算。


图中time1就是SparkStreaming计算批次大小,虚线框以及实线大框就是窗口的大小,必须为批次的整数倍。虚线框到大实线框的距离(相隔多少批次),就是滑动步长。


  1. 请手写出wordcount的Spark代码实现(Scala)(手写代码重点)

 

val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("WordCount")

  val sc = new SparkContext(conf)

  sc.textFile("/input")

  .flatMap(_.split(" "))

  .map((_,1))

  .reduceByKey(_+_)

  .saveAsTextFile("/output")

 sc.stop()

 

  1. 如何使用Spark实现topN的获取(描述思路或使用伪代码)(重点)


方法1:

(1)按照key对数据进行聚合(groupByKey)

(2)将value转换为数组,利用scala的sortBy或者sortWith进行排序(mapValues)数据量太大,会OOM。


方法2:

(1)取出所有的key

(2)对key进行迭代,每次取出一个key利用spark的排序算子进行排序


方法3:

(1)自定义分区器,按照key进行分区,使不同的key进到不同的分区

(2)对每个分区运用spark的排序算子进行排序


  1. 京东:调优之前与调优之后性能的详细对比(例如调整map个数,map个数之前多少、之后多少,有什么提升)


这里举个例子。比如我们有几百个文件,会有几百个map出现,读取之后进行join操作,会非常的慢。这个时候我们可以进行coalesce操作,比如240个map,我们合成60个map,也就是窄依赖。这样再shuffle,过程产生的文件数会大大减少。提高join的时间性能。


  1. Flink相关总结


  1. 简单介绍一下Flink


Flink核心是一个流式的数据流执行引擎,其针对数据流的分布式计算提供了数据分布、数据通信以及容错机制等功能。基于流执行引擎,Flink提供了诸多更高抽象层的API以便用户编写分布式任务:DataSet API, 对静态数据进行批处理操作,将静态数据抽象成分布式的数据集,用户可以方便地使用Flink提供的各种操作符对分布式数据集进行处理,支持Java、Scala和Python。DataStream API,对数据流进行流处理操作,将流式的数据抽象成分布式的数据流,用户可以方便地对分布式数据流进行各种操作,支持Java和Scala。Table API,对结构化数据进行查询操作,将结构化数据抽象成关系表,并通过类SQL的DSL对关系表进行各种查询操作,支持Java和Scala。此外,Flink还针对特定的应用领域提供了领域库,例如:Flink ML,Flink的机器学习库,提供了机器学习Pipelines API并实现了多种机器学习算法。Gelly,Flink的图计算库,提供了图计算的相关API及多种图计算算法实现。


  1. Flink相比Spark Streaming有什么区别?


架构模型上:Spark Streaming 的task运行依赖driver 和 executor和worker,当然driver和excutor还依赖于集群管理器Standalone或者yarn等。而Flink运行时主要是JobManager、TaskManage和TaskSlot。另外一个最核心的区别是:Spark Streaming 是微批处理,运行的时候需要指定批处理的时间,每次运行 job 时处理一个批次的数据;Flink 是基于事件驱动的,事件可以理解为消息。事件驱动的应用程序是一种状态应用程序,它会从一个或者多个流中注入事件,通过触发计算更新状态,或外部动作对注入的事件作出反应。

 

任务调度上:Spark Streaming的调度分为构建 DGA 图,划分 stage,生成 taskset,调度 task等步骤,而Flink首先会生成 StreamGraph,接着生成 JobGraph,然后将 jobGraph 提交给 Jobmanager 由它完成 jobGraph 到 ExecutionGraph 的转变,最后由 jobManager 调度执行。

 

时间机制上:flink 支持三种时间机制事件时间,注入时间,处理时间,同时支持 watermark 机制处理滞后数据。Spark Streaming 只支持处理时间,Structured streaming则支持了事件时间和watermark机制。


容错机制上:二者保证exactly-once的方式不同。spark streaming 通过保存offset和事务的方式;Flink 则使用两阶段提交协议来解决这个问题。


  1. Flink中的分区策略有哪几种?


Flink中默认提供了八大分区策略(也叫分区器)。八大分区策略继承关系图

 

ChannelSelector: 接口,决定将记录写入哪个Channel。有3个方法:


  1. void setup(int numberOfChannels): 初始化输出Channel的数量。
  2. int selectChannel(T record): 根据当前记录以及Channel总数,决定应将记录写入下游哪个Channel。八大分区策略的区别主要在这个方法的实现上。
  3. boolean isBroadcast(): 是否是广播模式。决定了是否将记录写入下游所有Channel。
  4. StreamPartitioner:抽象类,也是所有流分区器的基类。

 

注意:


这里以及下边提到的Channel可简单理解为下游Operator的某个实例。Flink 中改变并行度,默认RebalancePartitioner分区策略。分区策略,可在Flink WebUI上直观看出,如REBALANCE,即使用了RebalancePartitioner分区策略;SHUFFLE,即使用了ShufflePartitioner分区策略。

 

  1. GlobalPartitioner: DataStream => DataStream

GlobalPartitioner,GLOBAL分区。将记录输出到下游Operator的第一个实例。


selectChannel实现


public int selectChannel(SerializationDelegate<StreamRecord<T>> record) {

//对每条记录,只选择下游operator的第一个Channel

return 0;

}


API使用


dataStream

    .setParallelism(2)

    // 采用GLOBAL分区策略重分区

    .global()

    .print()

    .setParallelism(1);


  1. ShufflePartitioner: DataStream => DataStream

ShufflePartitioner,SHUFFLE分区。将记录随机输出到下游Operator的每个实例。


selectChannel实现


private Random random = new Random();

@Override

public int selectChannel(SerializationDelegate<StreamRecord<T>> record) {

    //对每条记录,随机选择下游operator的某个Channel

     return random.nextInt(numberOfChannels);

}


API使用


dataStream

    .setParallelism(2)

    // 采用SHUFFLE分区策略重分区

    .shuffle()

    .print()

    .setParallelism(4);

 

  1. RebalancePartitioner: DataStream => DataStream

RebalancePartitioner,REBALANCE分区。将记录以循环的方式输出到下游Operator的每个实例。


selectChannel实现


public int selectChannel(SerializationDelegate<StreamRecord<T>> record) {

    //第一条记录,输出到下游的第一个Channel;第二条记录,输出到下游的第二个Channel...如此循环

     nextChannelToSendTo = (nextChannelToSendTo + 1) % numberOfChannels;

     return nextChannelToSendTo;

}


API使用


dataStream

        .setParallelism(2)

        // 采用REBALANCE分区策略重分区

        .rebalance()

        .print()

        .setParallelism(4);

 

  1. RescalePartitioner: DataStream => DataStream


RescalePartitioner,RESCALE分区。基于上下游Operator的并行度,将记录以循环的方式输出到下游Operator的每个实例。举例: 上游并行度是2,下游是4,则上游一个并行度以循环的方式将记录输出到下游的两个并行度上;上游另一个并行度以循环的方式将记录输出到下游另两个并行度上。若上游并行度是4,下游并行度是2,则上游两个并行度将记录输出到下游一个并行度上;上游另两个并行度将记录输出到下游另一个并行度上。


selectChannel实现


private int nextChannelToSendTo = -1;

@Override

public int selectChannel(SerializationDelegate<StreamRecord<T>> record) {

     if (++nextChannelToSendTo >= numberOfChannels) {

          nextChannelToSendTo = 0;

     }

     return nextChannelToSendTo;

}


API使用


dataStream

    .setParallelism(2)

    // 采用RESCALE分区策略重分区

    .rescale()

    .print()

    .setParallelism(4);


  1. BroadcastPartitioner: DataStream => DataStream


BroadcastPartitioner,BROADCAST分区。广播分区将上游数据集输出到下游Operator的每个实例中。适合于大数据集Join小数据集的场景。


selectChannel实现


@Override

public int selectChannel(SerializationDelegate<StreamRecord<T>> record) {

    //广播分区不支持选择Channel,因为会输出到下游每个Channel中

     throw new UnsupportedOperationException("Broadcast partitioner does not support select channels.");

}

@Override

public boolean isBroadcast() {

    //启用广播模式,此时Channel选择器会选择下游所有Channel

     return true;

}


API使用


dataStream

    .setParallelism(2)

    // 采用BROADCAST分区策略重分区

    .broadcast()

    .print()

    .setParallelism(4);


  1. ForwardPartitioner

ForwardPartitioner,FORWARD分区。将记录输出到下游本地的operator实例。ForwardPartitioner分区器要求上下游算子并行度一样。上下游Operator同属一个SubTasks。


selectChannel实现


@Override

public int selectChannel(SerializationDelegate<StreamRecord<T>> record) {

     return 0;

}


API使用


dataStream

    .setParallelism(2)

    // 采用FORWARD分区策略重分区

    .forward()

    .print()

    .setParallelism(2);


  1. KeyGroupStreamPartitioner(HASH方式):

KeyGroupStreamPartitioner,HASH分区。将记录按Key的Hash值输出到下游Operator实例。


selectChannel实现


@Override

public int selectChannel(SerializationDelegate<StreamRecord<T>> record) {

     K key;

     try {

          key = keySelector.getKey(record.getInstance().getValue());

     } catch (Exception e) {

          throw new RuntimeException("Could not extract key from " + record.getInstance().getValue(), e);

     }

     return KeyGroupRangeAssignment.assignKeyToParallelOperator(key, maxParallelism, numberOfChannels);

}

// KeyGroupRangeAssignment中的方法

public static int assignKeyToParallelOperator(Object key, int maxParallelism, int parallelism) {

     return computeOperatorIndexForKeyGroup(maxParallelism, parallelism, assignToKeyGroup(key, maxParallelism));

}

// KeyGroupRangeAssignment中的方法

public static int assignToKeyGroup(Object key, int maxParallelism) {

     return computeKeyGroupForKeyHash(key.hashCode(), maxParallelism);

}

// KeyGroupRangeAssignment中的方法

public static int computeKeyGroupForKeyHash(int keyHash, int maxParallelism) {

     return MathUtils.murmurHash(keyHash) % maxParallelism;

}


API使用


dataStream

    .setParallelism(2)

    // 采用HASH分区策略重分区

    .keyBy((KeySelector<Tuple3<String, Integer, String>, String>) value -> value.f0)

    .print()

    .setParallelism(4);


  1. CustomPartitionerWrapper

CustomPartitionerWrapper,CUSTOM分区。通过Partitioner实例的partition方法(自定义的)将记录输出到下游。


selectChannel实现


Partitioner<K> partitioner;

KeySelector<T, K> keySelector;

public CustomPartitionerWrapper(Partitioner<K> partitioner, KeySelector<T, K> keySelector) {

     this.partitioner = partitioner;

     this.keySelector = keySelector;

}

@Override

public int selectChannel(SerializationDelegate<StreamRecord<T>> record) {

     K key;

     try {

          key = keySelector.getKey(record.getInstance().getValue());

     } catch (Exception e) {

          throw new RuntimeException("Could not extract key from " + record.getInstance(), e);

     }

     return partitioner.partition(key, numberOfChannels);

}


自定义分区器将指定的Key分到指定的分区


// 自定义分区器,将不同的Key(用户ID)分到指定的分区

// key: 根据key的值来分区

// numPartitions: 下游算子并行度

static class CustomPartitioner implements Partitioner<String> {

      @Override

      public int partition(String key, int numPartitions) {

          switch (key){

              case "user_1":

                  return 0;

              case "user_2":

                  return 1;

              case "user_3":

                  return 2;

              default:

                  return 3;

          }

      }

  }


使用自定义分区器


dataStream

    .setParallelism(2)

    // 采用CUSTOM分区策略重分区

    .partitionCustom(new CustomPartitioner(),0)

    .print()

    .setParallelism(4);


  1. Flink的并行度有了解吗?Flink中设置并行度需要注意什么?


Flink程序由多个任务(Source、Transformation、Sink)组成。任务被分成多个并行实例来执行,每个并行实例处理任务的输入数据的子集。任务的并行实例的数量称之为并行度。Flink中人物的并行度可以从多个不同层面设置:操作算子层面(Operator Level)、执行环境层面(Execution Environment Level)、客户端层面(Client Level)、系统层面(System Level)。Flink可以设置好几个level的parallelism,其中包括Operator Level、Execution Environment Level、Client Level、System Level在flink-conf.yaml中通过parallelism.default配置项给所有execution environments指定系统级的默认parallelism;在ExecutionEnvironment里头可以通过setParallelism来给operators、data sources、data sinks设置默认的parallelism;如果operators、data sources、data sinks自己有设置parallelism则会覆盖ExecutionEnvironment设置的parallelism。

 

  1. Flink支持哪几种重启策略?分别如何配置?


重启策略种类:固定延迟重启策略(Fixed Delay Restart Strategy)故障率重启策略(Failure Rate Restart Strategy)无重启策略(No Restart Strategy)Fallback重启策略(Fallback Restart Strategy)


  1. Flink的分布式缓存有什么作用?如何使用?


Flink提供了一个分布式缓存,类似于hadoop,可以使用户在并行函数中很方便的读取本地文件,并把它放在taskmanager节点中,防止task重复拉取。


此缓存的工作机制如下:程序注册一个文件或者目录(本地或者远程文件系统,例如hdfs或者s3),通过ExecutionEnvironment注册缓存文件并为它起一个名称。


当程序执行,Flink自动将文件或者目录复制到所有taskmanager节点的本地文件系统,仅会执行一次。用户可以通过这个指定的名称查找文件或者目录,然后从taskmanager节点的本地文件系统访问它。


  1. Flink中的广播变量,使用广播变量需要注意什么事项?


在Flink中,同一个算子可能存在若干个不同的并行实例,计算过程可能不在同一个Slot中进行,不同算子之间更是如此,因此不同算子的计算数据之间不能像Java数组之间一样互相访问,而广播变量Broadcast便是解决这种情况的。我们可以把广播变量理解为是一个公共的共享变量,我们可以把一个dataset 数据集广播出去,然后不同的task在节点上都能够获取到,这个数据在每个节点上只会存在一份。


  1. Flink中对窗口的支持包括哪几种?说说他们的使用场景

 

  1. Tumbling Time Window

假如我们需要统计每一分钟中用户购买的商品的总数,需要将用户的行为事件按每一分钟进行切分,这种切分被成为翻滚时间窗口(Tumbling Time Window)。翻滚窗口能将数据流切分成不重叠的窗口,每一个事件只能属于一个窗口。


  1. Sliding Time Window

我们可以每30秒计算一次最近一分钟用户购买的商品总数。这种窗口我们称为滑动时间窗口(Sliding Time Window)。在滑窗中,一个元素可以对应多个窗口。


  1. Tumbling Count Window

当我们想要每100个用户购买行为事件统计购买总数,那么每当窗口中填满100个元素了,就会对窗口进行计算,这种窗口我们称之为翻滚计数窗口(Tumbling Count Window),上图所示窗口大小为3个。


  1. Session Window

在这种用户交互事件流中,我们首先想到的是将事件聚合到会话窗口中(一段用户持续活跃的周期),由非活跃的间隙分隔开。如上图所示,就是需要计算每个用户在活跃期间总共购买的商品数量,如果用户30秒没有活动则视为会话断开(假设raw data stream是单个用户的购买行为流)。一般而言,window 是在无限的流上定义了一个有限的元素集合。这个集合可以是基于时间的,元素个数的,时间和个数结合的,会话间隙的,或者是自定义的。Flink 的 DataStream API 提供了简洁的算子来满足常用的窗口操作,同时提供了通用的窗口机制来允许用户自己定义窗口分配逻辑。


  1. Flink 中的 State Backends是什么?有什么作用?分成哪几类?说说他们各自的优缺点?


Flink流计算中可能有各种方式来保存状态:


  1. 窗口操作
  2. 使用了KV操作的函数
  3. 继承了CheckpointedFunction的函数


当开始做checkpointing的时候,状态会被持久化到checkpoints里来规避数据丢失和状态恢复。选择的状态存储策略不同,会导致状态持久化如何和checkpoints交互。


Flink内部提供了这些状态后端:

  1. MemoryStateBackend
  2. FsStateBackend
  3. RocksDBStateBackend


如果没有其他配置,系统将使用MemoryStateBackend。


  1. Flink中的时间种类有哪些?各自介绍一下?


Flink中的时间与现实世界中的时间是不一致的,在flink中被划分为事件时间,摄入时间,处理时间三种。如果以EventTime为基准来定义时间窗口将形成EventTimeWindow,要求消息本身就应该携带EventTime如果以IngesingtTime为基准来定义时间窗口将形成IngestingTimeWindow,以source的systemTime为准。如果以ProcessingTime基准来定义时间窗口将形成ProcessingTimeWindow,以operator的systemTime为准。


  1. WaterMark是什么?是用来解决什么问题?如何生成水印?水印的原理是什么?


Watermark是Apache Flink为了处理EventTime 窗口计算提出的一种机制,本质上也是一种时间戳。watermark是用于处理乱序事件的,处理乱序事件通常用watermark机制结合window来实现。详细参考https://www.jianshu.com/p/1c2542f11da0


  1. Flink的table和SQL熟悉吗?Table API和SQL中TableEnvironment这个类有什么作用


TableEnvironment是Table API和SQL集成的核心概念。它负责:


  1. 在内部catalog中注册表
  2. 注册外部catalog
  3. 执行SQL查询
  4. 注册用户定义(标量,表或聚合)函数
  5. 将DataStream或DataSet转换为表
  6. 持有对ExecutionEnvironment或StreamExecutionEnvironment的引用

 

  1. Flink如何实现SQL解析的呢?

 

StreamSQL API的执行原理如下:


  1. 用户使用对外提供Stream SQL的语法开发业务应用;
  2. 用calcite对StreamSQL进行语法检验,语法检验通过后,转换成calcite的逻辑树节点;最终形成calcite的逻辑计划;
  3. 采用Flink自定义的优化规则和calcite火山模型、启发式模型共同对逻辑树进行优化,生成最优的Flink物理计划;
  4. 对物理计划采用janino codegen生成代码,生成用低阶API DataStream 描述的流应用,提交到Flink平台执行详细参考:https://cloud.tencent.com/developer/article/1471612

 

  1. Flink是如何做到批处理与流处理统一的?


Flink设计者认为:有限流处理是无限流处理的一种特殊情况,它只不过在某个时间点停止而已。Flink通过一个底层引擎同时支持流处理和批处理。详细参考:

https://cloud.tencent.com/developer/article/1501348


  1. Flink中的数据传输模式是怎么样的?

 

大概的原理,上游的task产生数据后,会写在本地的缓存中,然后通知JM自己的数据已经好了,JM通知下游的Task去拉取数据,下游的Task然后去上游的Task拉取数据,形成链条。


但是在何时通知JM?这里有一个设置,比如pipeline还是blocking,pipeline意味着上游哪怕产生一个数据,也会去通知,blocking则需要缓存的插槽存满了才会去通知,默认是pipeline。


虽然生产数据的是Task,但是一个TaskManager中的所有Task共享一个NetworkEnvironment,下游的Task利用ResultPartitionManager主动去上游Task拉数据,底层利用的是Netty和TCP实现网络链路的传输。


那么,一直都在说Flink的背压是一种自然的方式,为什么是自然的了?


从上面的图中下面的链路中可以看到,当下游的process逻辑比较慢,无法及时处理数据时,他自己的local buffer中的消息就不能及时被消费,进而导致netty无法把数据放入local buffer,进而netty也不会去socket上读取新到达的数据,进而在tcp机制中,tcp也不会从上游的socket去读取新的数据,上游的netty也是一样的逻辑,它无法发送数据,也就不能从上游的localbuffer中消费数据,所以上游的localbuffer可能就是满的,上游的operator或者process在处理数据之后进行collect.out的时候申请不能本地缓存,导致上游的process被阻塞。这样,在这个链路上,就实现了背压。


如果还有相应的上游,则会一直反压上去,一直影响到source,导致source也放慢从外部消息源读取消息的速度。一旦瓶颈解除,网络链路畅通,则背压也会自然而然的解除。


  1. Flink的容错机制

Flink基于分布式快照与可部分重发的数据源实现了容错。用户可自定义对整个Job进行快照的时间间隔,当任务失败时,Flink会将整个Job恢复到最近一次快照,并从数据源重发快照之后的数据。

 

详细参考:https://www.jianshu.com/p/1fca8fb61f86


  1. Flink在使用Window时出现数据倾斜,你有什么解决办法?

注意:这里window产生的数据倾斜指的是不同的窗口内积攒的数据量不同,主要是由源头数据的产生速度导致的差异。核心思路:1.重新设计key 2.在窗口计算前做预聚合可以参考这个:https://blog.csdn.net/it_lee_j_h/article/details/88641894


  1. Flink任务,delay极高,请问你有什么调优策略?

首先要确定问题产生的原因,找到最耗时的点,确定性能瓶颈点。比如任务频繁反压,找到反压点。主要通过:资源调优、作业参数调优。资源调优即是对作业中的Operator的并发数(parallelism)、CPU(core)、堆内存(heap_memory)等参数进行调优。作业参数调优包括:并行度的设置,State的设置,checkpoint的设置。


  1. 业务交互数据分析
  2. 电商常识

SKU(库存量单位):一台银色、128G内存的、支持联通网络的iPhoneX

SPU(标准产品单位):iPhoneX

  1. 电商业务流程

 

  1. 业务表关键字段


  1. 订单表(order_info)


标签

含义

id

订单编号

total_amount

订单金额

order_status

订单状态

user_id

用户id

payment_way

支付方式

out_trade_no

支付流水号

create_time

创建时间

operate_time

操作时间

     


  1. 订单详情表(order_detail)


标签

含义

id

订单编号

order_id

订单号

user_id

用户id

sku_id

商品id

sku_name

商品名称

order_price

商品价格

sku_num

商品数量

create_time

创建时间

     


  1. 商品表


标签

含义

id

skuId

spu_id

spuid

price

价格

sku_name

商品名称

sku_desc

商品描述

weight

重量

tm_id

品牌id

category3_id

品类id

create_time

创建时间

     


  1. 用户表


标签

含义

id

用户id

name

姓名

birthday

生日

gender

性别

email

邮箱

user_level

用户等级

create_time

创建时间

     


  1. 商品一级分类表


标签

含义

id

id

name

名称

     


  1. 商品二级分类表


标签

含义

id

id

name

名称

category1_id

一级品类id

     


  1. 商品三级分类表


标签

含义

id

id

name

名称

Category2_id

二级品类id

     


  1. 支付流水表


标签

含义

id

编号

out_trade_no

对外业务编号

order_id

订单编号

user_id

用户编号

alipay_trade_no

支付宝交易流水编号

total_amount

支付金额

subject

交易内容

payment_type

支付类型

payment_time

支付时间

     


订单表跟订单详情表有什么区别?


   订单表的订单状态会变化,订单详情表不会,因为没有订单状态。


   订单表记录user_id,订单id订单编号,订单的总金额order_status,支付方式,订单状态等。


   订单详情表记录user_id,商品sku_id ,具体的商品信息(商品名称sku_name,价格order_price,数量sku_num)

相关实践学习
基于MaxCompute的热门话题分析
本实验围绕社交用户发布的文章做了详尽的分析,通过分析能得到用户群体年龄分布,性别分布,地理位置分布,以及热门话题的热度。
SaaS 模式云数据仓库必修课
本课程由阿里云开发者社区和阿里云大数据团队共同出品,是SaaS模式云原生数据仓库领导者MaxCompute核心课程。本课程由阿里云资深产品和技术专家们从概念到方法,从场景到实践,体系化的将阿里巴巴飞天大数据平台10多年的经过验证的方法与实践深入浅出的讲给开发者们。帮助大数据开发者快速了解并掌握SaaS模式的云原生的数据仓库,助力开发者学习了解先进的技术栈,并能在实际业务中敏捷的进行大数据分析,赋能企业业务。 通过本课程可以了解SaaS模式云原生数据仓库领导者MaxCompute核心功能及典型适用场景,可应用MaxCompute实现数仓搭建,快速进行大数据分析。适合大数据工程师、大数据分析师 大量数据需要处理、存储和管理,需要搭建数据仓库?学它! 没有足够人员和经验来运维大数据平台,不想自建IDC买机器,需要免运维的大数据平台?会SQL就等于会大数据?学它! 想知道大数据用得对不对,想用更少的钱得到持续演进的数仓能力?获得极致弹性的计算资源和更好的性能,以及持续保护数据安全的生产环境?学它! 想要获得灵活的分析能力,快速洞察数据规律特征?想要兼得数据湖的灵活性与数据仓库的成长性?学它! 出品人:阿里云大数据产品及研发团队专家 产品 MaxCompute 官网 https://www.aliyun.com/product/odps&nbsp;
相关文章
|
5月前
|
SQL 大数据
常见大数据面试SQL-每年总成绩都有所提升的学生
一张学生成绩表(student_scores),有year-学年,subject-课程,student-学生,score-分数这四个字段,请完成如下问题: 问题1:每年每门学科排名第一的学生 问题2:每年总成绩都有所提升的学生
|
6月前
|
分布式计算 监控 大数据
《吊打面试官》- 大数据工程师50道中大厂面试真题保姆级详解
《吊打面试官》- 大数据工程师50道中大厂面试真题保姆级详解
111 1
《吊打面试官》- 大数据工程师50道中大厂面试真题保姆级详解
|
6月前
|
SQL 分布式计算 算法
程序员必备的面试技巧——大数据工程师面试必备技能
程序员必备的面试技巧——大数据工程师面试必备技能
118 0
|
6月前
|
缓存 运维 NoSQL
面试分享:Redis在大数据环境下的缓存策略与实践
【4月更文挑战第10天】探索Redis在大数据缓存的关键作用,本文分享面试经验及必备知识点。聚焦Redis数据结构(String、List、Set、Hash、Sorted Set)及其适用场景,缓存策略(LRU、LFU、TTL)与过期机制,集群和数据分片,以及性能优化和运维技巧。通过代码示例深入理解,助你面试成功,构建高效缓存服务。
177 4
|
6月前
|
移动开发 前端开发 JavaScript
【前端面试】前端面试题300道~~熬夜吐血整理,2024年最新大厂面试经验分享稿
【前端面试】前端面试题300道~~熬夜吐血整理,2024年最新大厂面试经验分享稿
|
3月前
|
存储 Java
【IO面试题 四】、介绍一下Java的序列化与反序列化
Java的序列化与反序列化允许对象通过实现Serializable接口转换成字节序列并存储或传输,之后可以通过ObjectInputStream和ObjectOutputStream的方法将这些字节序列恢复成对象。
|
16天前
|
存储 算法 Java
大厂面试高频:什么是自旋锁?Java 实现自旋锁的原理?
本文详解自旋锁的概念、优缺点、使用场景及Java实现。关注【mikechen的互联网架构】,10年+BAT架构经验倾囊相授。
大厂面试高频:什么是自旋锁?Java 实现自旋锁的原理?
|
17天前
|
存储 缓存 Java
大厂面试必看!Java基本数据类型和包装类的那些坑
本文介绍了Java中的基本数据类型和包装类,包括整数类型、浮点数类型、字符类型和布尔类型。详细讲解了每种类型的特性和应用场景,并探讨了包装类的引入原因、装箱与拆箱机制以及缓存机制。最后总结了面试中常见的相关考点,帮助读者更好地理解和应对面试中的问题。
41 4
|
1月前
|
算法 Java 数据中心
探讨面试常见问题雪花算法、时钟回拨问题,java中优雅的实现方式
【10月更文挑战第2天】在大数据量系统中,分布式ID生成是一个关键问题。为了保证在分布式环境下生成的ID唯一、有序且高效,业界提出了多种解决方案,其中雪花算法(Snowflake Algorithm)是一种广泛应用的分布式ID生成算法。本文将详细介绍雪花算法的原理、实现及其处理时钟回拨问题的方法,并提供Java代码示例。
74 2
|
1月前
|
JSON 安全 前端开发
第二次面试总结 - 宏汉科技 - Java后端开发
本文是作者对宏汉科技Java后端开发岗位的第二次面试总结,面试结果不理想,主要原因是Java基础知识掌握不牢固,文章详细列出了面试中被问到的技术问题及答案,包括字符串相关函数、抽象类与接口的区别、Java创建线程池的方式、回调函数、函数式接口、反射以及Java中的集合等。
31 0
下一篇
无影云桌面