Flink教程(30)- Flink VS Spark(下)

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: Flink教程(30)- Flink VS Spark(下)

2.7 kafka 动态分区检测

2.7.1 Spark Streaming

Spark Streaming:对于有实时处理业务需求的企业,随着业务增长数据量也会同步增长,将导致原有的 kafka 分区数不满足数据写入所需的并发度,需要扩展 kafka 的分区或者增加 kafka 的 topic,这时就要求实时处理程序,如 SparkStreaming、flink 能检测到 kafka 新增的 topic 、分区及消费新增分区的数据。

接下来结合源码分析,Spark Streaming 和 flink 在 kafka 新增 topic 或 partition 时能否动态发现新增分区并消费处理新增分区的数据。 Spark Streaming 与 kafka 结合有两个区别比较大的版本,如图所示是官网给出的对比数据:

其中确认的是 Spark Streaming 与 kafka 0.8 版本结合不支持动态分区检测,与 0.10 版本结合支持,接着通过源码分析。

Spark Streaming 与 kafka 0.8 版本结合(源码分析只针对是否分区检测),入口是 DirectKafkaInputDStream 的 compute:

//    改行代码会计算这个job,要消费的每个kafka分区的最大偏移
override def compute(validTime: Time): Option[KafkaRDD[K, V, U, T, R]] = {
   //    构建KafkaRDD,用指定的分区数和要消费的offset范围
   val untilOffsets = clamp(latestLeaderOffsets(maxRetries))
   val rdd = KafkaRDD[K, V, U, T, R](
     // Report the record number and metadata of this batch interval to InputInfoTracker.
     context.sparkContext, kafkaParams, currentOffsets, untilOffsets, messageHandler)    
   val offsetRanges = currentOffsets.map { case (tp, fo) =>
     val uo = untilOffsets(tp)      
     OffsetRange(tp.topic, tp.partition, fo, uo.offset)
   }    
   val description = offsetRanges.filter { offsetRange =>
     // Don't display empty ranges.
     offsetRange.fromOffset != offsetRange.untilOffset
   }.map { offsetRange =>
     // Copy offsetRanges to immutable.List to prevent from being modified by the user
     s"topic: ${offsetRange.topic}\tpartition: ${offsetRange.partition}\t" +
       s"offsets: ${offsetRange.fromOffset} to ${offsetRange.untilOffset}"
   }.mkString("\n")    
   val metadata = Map("offsets" -> offsetRanges.toList,      
        StreamInputInfo.METADATA_KEY_DESCRIPTION -> description)    
   val inputInfo = StreamInputInfo(id, rdd.count, metadata)
   ssc.scheduler.inputInfoTracker.reportInfo(validTime, inputInfo)
   currentOffsets = untilOffsets.map(kv => kv._1 -> kv._2.offset)    
   Some(rdd)
}

第一行就是计算得到该批次生成 KafkaRDD 每个分区要消费的最大 offset。 接着看 latestLeaderOffsets(maxRetries):

// 可以看到的是用来指定获取最大偏移分区的列表还是只有currentOffsets,没有发现关于新增的分区的内容。
@tailrec  protected final def latestLeaderOffsets(retries: Int): Map[TopicAndPartition, LeaderOffset] = {
   val o = kc.getLatestLeaderOffsets(currentOffsets.keySet)    
   // Either.fold would confuse @tailrec, do it manually
   if (o.isLeft) {      
      val err = o.left.get.toString      
      if (retries <= 0) {        
        throw new SparkException(err)
      } else {
       logError(err)        
       Thread.sleep(kc.config.refreshLeaderBackoffMs)
       latestLeaderOffsets(retries - 1)
     }
   } else {
     o.right.get
   }
 }

其中 protected var currentOffsets = fromOffsets,这个仅仅是在构建 DirectKafkaInputDStream 的时候初始化,并在 compute 里面更新:

currentOffsets = untilOffsets.map(kv => kv._1 -> kv._2.offset)

中间没有检测 kafka 新增 topic 或者分区的代码,所以可以确认 Spark Streaming 与 kafka 0.8 的版本结合不支持动态分区检测。

Spark Streaming 与 kafka 0.10 版本结合:入口同样是 DirectKafkaInputDStream 的 compute 方法,捡主要的部分说,Compute 里第一行也是计算当前 job 生成 kafkardd 要消费的每个分区的最大 offset:

//    获取当前生成job,要用到的KafkaRDD每个分区最大消费偏移值
   val untilOffsets = clamp(latestOffsets())

具体检测 kafka 新增 topic 或者分区的代码在 latestOffsets()

/**   
* Returns the latest (highest) available offsets, taking new partitions into account.   
*/
protected def latestOffsets(): Map[TopicPartition, Long] = {    
   val c = consumer
   paranoidPoll(c)    // 获取所有的分区信息
   // make sure new partitions are reflected in currentOffsets
   val parts = c.assignment().asScala    
   // 做差获取新增的分区信息
   val newPartitions = parts.diff(currentOffsets.keySet)    
   // position for new partitions determined by auto.offset.reset if no commit
   // 新分区消费位置,没有记录的化是由auto.offset.reset决定
   currentOffsets = currentOffsets ++ newPartitions.map(tp => tp -> c.position(tp)).toMap    
   // don't want to consume messages, so pause
   c.pause(newPartitions.asJava)    // find latest available offsets
   c.seekToEnd(currentOffsets.keySet.asJava)
   parts.map(tp => tp -> c.position(tp)).toMap
}

该方法内有获取 kafka 新增分区,并将其更新到 currentOffsets 的过程,所以可以验证 Spark Streaming 与 kafka 0.10 版本结合支持动态分区检测。

2.7.2 Flink

入口类是 FlinkKafkaConsumerBase,该类是所有 flink 的 kafka 消费者的父类。

在 FlinkKafkaConsumerBase 的 run 方法中,创建了 kafkaFetcher,实际上就是消费者:

this.kafkaFetcher = createFetcher(
       sourceContext,
       subscribedPartitionsToStartOffsets,
       periodicWatermarkAssigner,
       punctuatedWatermarkAssigner,
       (StreamingRuntimeContext) getRuntimeContext(),
       offsetCommitMode,
       getRuntimeContext().getMetricGroup().addGroup(KAFKA_CONSUMER_METRICS_GROUP),
       useMetrics);

接是创建了一个线程,该线程会定期检测 kafka 新增分区,然后将其添加到 kafkaFetcher 里。

if (discoveryIntervalMillis != PARTITION_DISCOVERY_DISABLED) {      
  final AtomicReference<Exception> discoveryLoopErrorRef = new AtomicReference<>();      
  this.discoveryLoopThread = new Thread(new Runnable() {        
      @Override
      public void run() {          
          try {            
          // --------------------- partition discovery loop ---------------------
           List<KafkaTopicPartition> discoveredPartitions;            
           // throughout the loop, we always eagerly check if we are still running before
           // performing the next operation, so that we can escape the loop as soon as possible
           while (running) {              
             if (LOG.isDebugEnabled()) {                
               LOG.debug("Consumer subtask {} is trying to discover new partitions ...", getRuntimeContext().getIndexOfThisSubtask());
             }              
             try {
               discoveredPartitions = partitionDiscoverer.discoverPartitions();
             } catch (AbstractPartitionDiscoverer.WakeupException | AbstractPartitionDiscoverer.ClosedException e) { 
               // the partition discoverer may have been closed or woken up before or during the discovery;
               // this would only happen if the consumer was canceled; simply escape the loop
               break;
             } 
             // no need to add the discovered partitions if we were closed during the meantime
             if (running && !discoveredPartitions.isEmpty()) {
               kafkaFetcher.addDiscoveredPartitions(discoveredPartitions);
             } 
             // do not waste any time sleeping if we're not running anymore
             if (running && discoveryIntervalMillis != 0) {                
               try {                  
                 Thread.sleep(discoveryIntervalMillis);
               } catch (InterruptedException iex) {
                 // may be interrupted if the consumer was canceled midway; simply escape the loop
                 break;
               }
             }
           }
         } catch (Exception e) {
           discoveryLoopErrorRef.set(e);
         } finally {            
           // calling cancel will also let the fetcher loop escape
           // (if not running, cancel() was already called)
           if (running) {
             cancel();
           }
         }
       }
     }, "Kafka Partition Discovery for " + getRuntimeContext().getTaskNameWithSubtasks());
}
discoveryLoopThread.start();
kafkaFetcher.runFetchLoop();

上面,就是 flink 动态发现 kafka 新增分区的过程。不过与 Spark 无需做任何配置不同的是,flink 动态发现 kafka 新增分区,这个功能时需要被开启的。也很简单,需要将 flink.partition-discovery.interval-millis 该属性设置为大于 0 即可。

2.8 容错机制及处理语义

抛出一个问题:实时处理的时候,如何保证数据仅一次处理语义?

2.8.1 Spark Streaming 保证仅一次处理

对于 Spark Streaming 任务,我们可以设置 checkpoint,然后假如发生故障并重启,我们可以从上次 checkpoint 之处恢复,但是这个行为只能使得数据不丢失,可能会重复处理,不能做到恰一次处理语义。

对于 Spark Streaming 与 kafka 结合的 direct Stream 可以自己维护 offset 到 zookeeper、kafka 或任何其它外部系统,每次提交完结果之后再提交 offset,这样故障恢复重启可以利用上次提交的 offset 恢复,保证数据不丢失。但是假如故障发生在提交结果之后、提交 offset 之前会导致数据多次处理,这个时候我们需要保证处理结果多次输出不影响正常的业务。

由此可以分析,假设要保证数据恰一次处理语义,那么结果输出和 offset 提交必须在一个事务内完成。在这里有以下两种做法:

  • repartition(1) : Spark Streaming 输出的 action 变成仅一个 partition,这样可以利用事务去做:
rdd.repartition(1).foreachPartition(partition=>{    
        //    开启事务
       partition.foreach(each=>{//提交数据
       })    //  提交事务
   })
 })

将结果和 offset 一起提交: 也就是结果数据包含 offset。这样提交结果和提交 offset 就是一个操作完成,不会数据丢失,也不会重复处理。故障恢复的时候可以利用上次提交结果带的 offset。

2.8.2 Flink 与 kafka 0.11 保证仅一次处理

若要 sink 支持仅一次语义,必须以事务的方式写数据到 Kafka,这样当提交事务时两次 checkpoint 间的所有写入操作作为一个事务被提交。这确保了出现故障或崩溃时这些写入操作能够被回滚。

在一个分布式且含有多个并发执行 sink 的应用中,仅仅执行单次提交或回滚是不够的,因为所有组件都必须对这些提交或回滚达成共识,这样才能保证得到一致性的结果。Flink 使用两阶段提交协议以及预提交(pre-commit)阶段来解决这个问题。

本例中的 Flink 应用如图 11 所示包含以下组件:

  • 一个source,从Kafka中读取数据(即KafkaConsumer)
  • 一个时间窗口化的聚会操作
  • 一个sink,将结果写回到Kafka(即KafkaProducer)

下面详细讲解 flink 的两段提交思路:

如图所示,Flink checkpointing 开始时便进入到 pre-commit 阶段。具体来说,一旦 checkpoint 开始,Flink 的 JobManager 向输入流中写入一个 checkpoint barrier ,将流中所有消息分割成属于本次 checkpoint 的消息以及属于下次 checkpoint 的,barrier 也会在操作算子间流转。对于每个 operator 来说,该 barrier 会触发 operator 状态后端为该 operator 状态打快照。data source 保存了 Kafka 的 offset,之后把 checkpoint barrier 传递到后续的 operator。

这种方式仅适用于 operator 仅有它的内部状态。内部状态是指 Flink state backends 保存和管理的内容(如第二个 operator 中 window 聚合算出来的 sum)。

当一个进程仅有它的内部状态的时候,除了在 checkpoint 之前将需要将数据更改写入到 state backend,不需要在预提交阶段做其他的动作。在 checkpoint 成功的时候,Flink 会正确的提交这些写入,在 checkpoint 失败的时候会终止提交,过程可见图。

当结合外部系统的时候,外部系统必须要支持可与两阶段提交协议捆绑使用的事务。显然本例中的 sink 由于引入了 kafka sink,因此在预提交阶段 data sink 必须预提交外部事务。如下图:

当 barrier 在所有的算子中传递一遍,并且触发的快照写入完成,预提交阶段完成。所有的触发状态快照都被视为 checkpoint 的一部分,也可以说 checkpoint 是整个应用程序的状态快照,包括预提交外部状态。出现故障可以从 checkpoint 恢复。下一步就是通知所有的操作算子 checkpoint 成功。该阶段 jobmanager 会为每个 operator 发起 checkpoint 已完成的回调逻辑。

本例中 data source 和窗口操作无外部状态,因此该阶段,这两个算子无需执行任何逻辑,但是 data sink 是有外部状态的,因此,此时我们必须提交外部事务,如下图:

以上就是 flink 实现恰一次处理的基本逻辑。

2.9 Back pressure背压/反压

消费者消费的速度低于生产者生产的速度,为了使应用正常,消费者会反馈给生产者来调节生产者生产的速度,以使得消费者需要多少,生产者生产多少。

  • back pressure 后面一律称为背压。

2.9.1 Spark Streaming 的背压

Spark Streaming 跟 kafka 结合是存在背压机制的,目标是根据当前 job 的处理情况来调节后续批次的获取 kafka 消息的条数。为了达到这个目的,Spark Streaming 在原有的架构上加入了一个 RateController,利用的算法是 PID,需要的反馈数据是任务处理的结束时间、调度时间、处理时间、消息条数,这些数据是通过 SparkListener 体系获得,然后通过 PIDRateEsimator 的 compute 计算得到一个速率,进而可以计算得到一个 offset,然后跟限速设置最大消费条数比较得到一个最终要消费的消息最大 offset。

PIDRateEsimator 的 compute 方法如下:

def compute(time: Long, // in milliseconds
     numElements: Long,      
     processingDelay: Long, // in milliseconds
     schedulingDelay: Long // in milliseconds
   ): Option[Double] = {
   logTrace(s"\ntime = $time, # records = $numElements, " +
     s"processing time = $processingDelay, scheduling delay = $schedulingDelay")    
     this.synchronized {      
       if (time > latestTime && numElements > 0 && processingDelay > 0) {        
       val delaySinceUpdate = (time - latestTime).toDouble / 1000
       val processingRate = numElements.toDouble / processingDelay * 1000
       val error = latestRate - processingRate        
       val historicalError = schedulingDelay.toDouble * processingRate / batchIntervalMillis        
       // in elements/(second ^ 2)
       val dError = (error - latestError) / delaySinceUpdate       
       val newRate = (latestRate - proportional * error -
                                   integral * historicalError -
                                   derivative * dError).max(minRate)
       logTrace(s"""  | latestRate = $latestRate, error = $error            
                      | latestError = $latestError, historicalError = $historicalError            
                      | delaySinceUpdate = $delaySinceUpdate, dError = $dError           
                 """.stripMargin)
       latestTime = time        
       if (firstRun) {
         latestRate = processingRate
         latestError = 0D
         firstRun = false
         logTrace("First run, rate estimation skipped")         
         None
       } else {
         latestRate = newRate
         latestError = error
         logTrace(s"New rate = $newRate")          
         Some(newRate)
       }
     } else {
       logTrace("Rate estimation skipped")        
       None
     }
   }
 }

2.9.2 Flink 的背压

与 Spark Streaming 的背压不同的是,Flink 1.5 之后实现了自己托管的 credit – based 流控机制,在应用层模拟 TCP 的流控机制,就是每一次 ResultSubPartition 向 InputChannel 发送消息的时候都会发送一个 backlog size 告诉下游准备发送多少消息,下游就会去计算有多少的 Buffer 去接收消息,算完之后如果有充足的 Buffer 就会返还给上游一个 Credit 告知他可以发送消息

jobmanager 针对每一个 task 每 50ms 触发 100 次 Thread.getStackTrace() 调用,求出阻塞的占比。过程如图 所示:

阻塞占比在 web 上划分了三个等级:

  • OK: 0 <= Ratio <= 0.10,表示状态良好;
  • LOW: 0.10 < Ratio <= 0.5,表示有待观察;
  • HIGH: 0.5 < Ratio <= 1,表示要处理了。

例如,0.01,代表着100次中有一次阻塞在内部调用

03 文末

本文主要讲解了Flink与Spark的对比,谢谢大家的阅读,本文完!


相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
目录
相关文章
|
2月前
|
SQL 分布式计算 Spark
Spark 教程系列
Spark 教程系列
22 0
|
2月前
|
流计算
JD Flink教程
JD Flink教程
17 0
|
2月前
|
SQL 运维 API
Apache Flink 学习教程----持续更新
Apache Flink 学习教程----持续更新
90 0
|
2月前
|
Apache 流计算
Apache Flink教程
Apache Flink教程
76 0
|
1月前
|
消息中间件 分布式计算 数据处理
Flink与Spark的区别是什么?请举例说明。
Flink与Spark的区别是什么?请举例说明。
24 0
|
2月前
|
Apache 流计算
Apache Flink教程----2.本地开发
Apache Flink教程----2.本地开发
30 0
|
2月前
|
SQL 分布式计算 Java
2021年最新最全Flink系列教程__Flink综合案例(九)
2021年最新最全Flink系列教程__Flink综合案例(九)
24 0
|
2月前
|
消息中间件 NoSQL 数据挖掘
2021年最新最全Flink系列教程__Flink高级特性和新特性(八)
2021年最新最全Flink系列教程__Flink高级特性和新特性(八)
21 0
|
2月前
|
消息中间件 SQL Kafka
2021年最新最全Flink系列教程__FlinkTable&SQL(六、七)
2021年最新最全Flink系列教程__FlinkTable&SQL(六、七)
25 0
|
2月前
|
存储 消息中间件 Kafka
2021年最新最全Flink系列教程__Flink容错机制(五)
2021年最新最全Flink系列教程__Flink容错机制(五)
29 0