Flink教程(30)- Flink VS Spark(下)

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: Flink教程(30)- Flink VS Spark(下)

2.7 kafka 动态分区检测

2.7.1 Spark Streaming

Spark Streaming:对于有实时处理业务需求的企业,随着业务增长数据量也会同步增长,将导致原有的 kafka 分区数不满足数据写入所需的并发度,需要扩展 kafka 的分区或者增加 kafka 的 topic,这时就要求实时处理程序,如 SparkStreaming、flink 能检测到 kafka 新增的 topic 、分区及消费新增分区的数据。

接下来结合源码分析,Spark Streaming 和 flink 在 kafka 新增 topic 或 partition 时能否动态发现新增分区并消费处理新增分区的数据。 Spark Streaming 与 kafka 结合有两个区别比较大的版本,如图所示是官网给出的对比数据:

其中确认的是 Spark Streaming 与 kafka 0.8 版本结合不支持动态分区检测,与 0.10 版本结合支持,接着通过源码分析。

Spark Streaming 与 kafka 0.8 版本结合(源码分析只针对是否分区检测),入口是 DirectKafkaInputDStream 的 compute:

//    改行代码会计算这个job,要消费的每个kafka分区的最大偏移
override def compute(validTime: Time): Option[KafkaRDD[K, V, U, T, R]] = {
   //    构建KafkaRDD,用指定的分区数和要消费的offset范围
   val untilOffsets = clamp(latestLeaderOffsets(maxRetries))
   val rdd = KafkaRDD[K, V, U, T, R](
     // Report the record number and metadata of this batch interval to InputInfoTracker.
     context.sparkContext, kafkaParams, currentOffsets, untilOffsets, messageHandler)    
   val offsetRanges = currentOffsets.map { case (tp, fo) =>
     val uo = untilOffsets(tp)      
     OffsetRange(tp.topic, tp.partition, fo, uo.offset)
   }    
   val description = offsetRanges.filter { offsetRange =>
     // Don't display empty ranges.
     offsetRange.fromOffset != offsetRange.untilOffset
   }.map { offsetRange =>
     // Copy offsetRanges to immutable.List to prevent from being modified by the user
     s"topic: ${offsetRange.topic}\tpartition: ${offsetRange.partition}\t" +
       s"offsets: ${offsetRange.fromOffset} to ${offsetRange.untilOffset}"
   }.mkString("\n")    
   val metadata = Map("offsets" -> offsetRanges.toList,      
        StreamInputInfo.METADATA_KEY_DESCRIPTION -> description)    
   val inputInfo = StreamInputInfo(id, rdd.count, metadata)
   ssc.scheduler.inputInfoTracker.reportInfo(validTime, inputInfo)
   currentOffsets = untilOffsets.map(kv => kv._1 -> kv._2.offset)    
   Some(rdd)
}

第一行就是计算得到该批次生成 KafkaRDD 每个分区要消费的最大 offset。 接着看 latestLeaderOffsets(maxRetries):

// 可以看到的是用来指定获取最大偏移分区的列表还是只有currentOffsets,没有发现关于新增的分区的内容。
@tailrec  protected final def latestLeaderOffsets(retries: Int): Map[TopicAndPartition, LeaderOffset] = {
   val o = kc.getLatestLeaderOffsets(currentOffsets.keySet)    
   // Either.fold would confuse @tailrec, do it manually
   if (o.isLeft) {      
      val err = o.left.get.toString      
      if (retries <= 0) {        
        throw new SparkException(err)
      } else {
       logError(err)        
       Thread.sleep(kc.config.refreshLeaderBackoffMs)
       latestLeaderOffsets(retries - 1)
     }
   } else {
     o.right.get
   }
 }

其中 protected var currentOffsets = fromOffsets,这个仅仅是在构建 DirectKafkaInputDStream 的时候初始化,并在 compute 里面更新:

currentOffsets = untilOffsets.map(kv => kv._1 -> kv._2.offset)

中间没有检测 kafka 新增 topic 或者分区的代码,所以可以确认 Spark Streaming 与 kafka 0.8 的版本结合不支持动态分区检测。

Spark Streaming 与 kafka 0.10 版本结合:入口同样是 DirectKafkaInputDStream 的 compute 方法,捡主要的部分说,Compute 里第一行也是计算当前 job 生成 kafkardd 要消费的每个分区的最大 offset:

//    获取当前生成job,要用到的KafkaRDD每个分区最大消费偏移值
   val untilOffsets = clamp(latestOffsets())

具体检测 kafka 新增 topic 或者分区的代码在 latestOffsets()

/**   
* Returns the latest (highest) available offsets, taking new partitions into account.   
*/
protected def latestOffsets(): Map[TopicPartition, Long] = {    
   val c = consumer
   paranoidPoll(c)    // 获取所有的分区信息
   // make sure new partitions are reflected in currentOffsets
   val parts = c.assignment().asScala    
   // 做差获取新增的分区信息
   val newPartitions = parts.diff(currentOffsets.keySet)    
   // position for new partitions determined by auto.offset.reset if no commit
   // 新分区消费位置,没有记录的化是由auto.offset.reset决定
   currentOffsets = currentOffsets ++ newPartitions.map(tp => tp -> c.position(tp)).toMap    
   // don't want to consume messages, so pause
   c.pause(newPartitions.asJava)    // find latest available offsets
   c.seekToEnd(currentOffsets.keySet.asJava)
   parts.map(tp => tp -> c.position(tp)).toMap
}

该方法内有获取 kafka 新增分区,并将其更新到 currentOffsets 的过程,所以可以验证 Spark Streaming 与 kafka 0.10 版本结合支持动态分区检测。

2.7.2 Flink

入口类是 FlinkKafkaConsumerBase,该类是所有 flink 的 kafka 消费者的父类。

在 FlinkKafkaConsumerBase 的 run 方法中,创建了 kafkaFetcher,实际上就是消费者:

this.kafkaFetcher = createFetcher(
       sourceContext,
       subscribedPartitionsToStartOffsets,
       periodicWatermarkAssigner,
       punctuatedWatermarkAssigner,
       (StreamingRuntimeContext) getRuntimeContext(),
       offsetCommitMode,
       getRuntimeContext().getMetricGroup().addGroup(KAFKA_CONSUMER_METRICS_GROUP),
       useMetrics);

接是创建了一个线程,该线程会定期检测 kafka 新增分区,然后将其添加到 kafkaFetcher 里。

if (discoveryIntervalMillis != PARTITION_DISCOVERY_DISABLED) {      
  final AtomicReference<Exception> discoveryLoopErrorRef = new AtomicReference<>();      
  this.discoveryLoopThread = new Thread(new Runnable() {        
      @Override
      public void run() {          
          try {            
          // --------------------- partition discovery loop ---------------------
           List<KafkaTopicPartition> discoveredPartitions;            
           // throughout the loop, we always eagerly check if we are still running before
           // performing the next operation, so that we can escape the loop as soon as possible
           while (running) {              
             if (LOG.isDebugEnabled()) {                
               LOG.debug("Consumer subtask {} is trying to discover new partitions ...", getRuntimeContext().getIndexOfThisSubtask());
             }              
             try {
               discoveredPartitions = partitionDiscoverer.discoverPartitions();
             } catch (AbstractPartitionDiscoverer.WakeupException | AbstractPartitionDiscoverer.ClosedException e) { 
               // the partition discoverer may have been closed or woken up before or during the discovery;
               // this would only happen if the consumer was canceled; simply escape the loop
               break;
             } 
             // no need to add the discovered partitions if we were closed during the meantime
             if (running && !discoveredPartitions.isEmpty()) {
               kafkaFetcher.addDiscoveredPartitions(discoveredPartitions);
             } 
             // do not waste any time sleeping if we're not running anymore
             if (running && discoveryIntervalMillis != 0) {                
               try {                  
                 Thread.sleep(discoveryIntervalMillis);
               } catch (InterruptedException iex) {
                 // may be interrupted if the consumer was canceled midway; simply escape the loop
                 break;
               }
             }
           }
         } catch (Exception e) {
           discoveryLoopErrorRef.set(e);
         } finally {            
           // calling cancel will also let the fetcher loop escape
           // (if not running, cancel() was already called)
           if (running) {
             cancel();
           }
         }
       }
     }, "Kafka Partition Discovery for " + getRuntimeContext().getTaskNameWithSubtasks());
}
discoveryLoopThread.start();
kafkaFetcher.runFetchLoop();

上面,就是 flink 动态发现 kafka 新增分区的过程。不过与 Spark 无需做任何配置不同的是,flink 动态发现 kafka 新增分区,这个功能时需要被开启的。也很简单,需要将 flink.partition-discovery.interval-millis 该属性设置为大于 0 即可。

2.8 容错机制及处理语义

抛出一个问题:实时处理的时候,如何保证数据仅一次处理语义?

2.8.1 Spark Streaming 保证仅一次处理

对于 Spark Streaming 任务,我们可以设置 checkpoint,然后假如发生故障并重启,我们可以从上次 checkpoint 之处恢复,但是这个行为只能使得数据不丢失,可能会重复处理,不能做到恰一次处理语义。

对于 Spark Streaming 与 kafka 结合的 direct Stream 可以自己维护 offset 到 zookeeper、kafka 或任何其它外部系统,每次提交完结果之后再提交 offset,这样故障恢复重启可以利用上次提交的 offset 恢复,保证数据不丢失。但是假如故障发生在提交结果之后、提交 offset 之前会导致数据多次处理,这个时候我们需要保证处理结果多次输出不影响正常的业务。

由此可以分析,假设要保证数据恰一次处理语义,那么结果输出和 offset 提交必须在一个事务内完成。在这里有以下两种做法:

  • repartition(1) : Spark Streaming 输出的 action 变成仅一个 partition,这样可以利用事务去做:
rdd.repartition(1).foreachPartition(partition=>{    
        //    开启事务
       partition.foreach(each=>{//提交数据
       })    //  提交事务
   })
 })

将结果和 offset 一起提交: 也就是结果数据包含 offset。这样提交结果和提交 offset 就是一个操作完成,不会数据丢失,也不会重复处理。故障恢复的时候可以利用上次提交结果带的 offset。

2.8.2 Flink 与 kafka 0.11 保证仅一次处理

若要 sink 支持仅一次语义,必须以事务的方式写数据到 Kafka,这样当提交事务时两次 checkpoint 间的所有写入操作作为一个事务被提交。这确保了出现故障或崩溃时这些写入操作能够被回滚。

在一个分布式且含有多个并发执行 sink 的应用中,仅仅执行单次提交或回滚是不够的,因为所有组件都必须对这些提交或回滚达成共识,这样才能保证得到一致性的结果。Flink 使用两阶段提交协议以及预提交(pre-commit)阶段来解决这个问题。

本例中的 Flink 应用如图 11 所示包含以下组件:

  • 一个source,从Kafka中读取数据(即KafkaConsumer)
  • 一个时间窗口化的聚会操作
  • 一个sink,将结果写回到Kafka(即KafkaProducer)

下面详细讲解 flink 的两段提交思路:

如图所示,Flink checkpointing 开始时便进入到 pre-commit 阶段。具体来说,一旦 checkpoint 开始,Flink 的 JobManager 向输入流中写入一个 checkpoint barrier ,将流中所有消息分割成属于本次 checkpoint 的消息以及属于下次 checkpoint 的,barrier 也会在操作算子间流转。对于每个 operator 来说,该 barrier 会触发 operator 状态后端为该 operator 状态打快照。data source 保存了 Kafka 的 offset,之后把 checkpoint barrier 传递到后续的 operator。

这种方式仅适用于 operator 仅有它的内部状态。内部状态是指 Flink state backends 保存和管理的内容(如第二个 operator 中 window 聚合算出来的 sum)。

当一个进程仅有它的内部状态的时候,除了在 checkpoint 之前将需要将数据更改写入到 state backend,不需要在预提交阶段做其他的动作。在 checkpoint 成功的时候,Flink 会正确的提交这些写入,在 checkpoint 失败的时候会终止提交,过程可见图。

当结合外部系统的时候,外部系统必须要支持可与两阶段提交协议捆绑使用的事务。显然本例中的 sink 由于引入了 kafka sink,因此在预提交阶段 data sink 必须预提交外部事务。如下图:

当 barrier 在所有的算子中传递一遍,并且触发的快照写入完成,预提交阶段完成。所有的触发状态快照都被视为 checkpoint 的一部分,也可以说 checkpoint 是整个应用程序的状态快照,包括预提交外部状态。出现故障可以从 checkpoint 恢复。下一步就是通知所有的操作算子 checkpoint 成功。该阶段 jobmanager 会为每个 operator 发起 checkpoint 已完成的回调逻辑。

本例中 data source 和窗口操作无外部状态,因此该阶段,这两个算子无需执行任何逻辑,但是 data sink 是有外部状态的,因此,此时我们必须提交外部事务,如下图:

以上就是 flink 实现恰一次处理的基本逻辑。

2.9 Back pressure背压/反压

消费者消费的速度低于生产者生产的速度,为了使应用正常,消费者会反馈给生产者来调节生产者生产的速度,以使得消费者需要多少,生产者生产多少。

  • back pressure 后面一律称为背压。

2.9.1 Spark Streaming 的背压

Spark Streaming 跟 kafka 结合是存在背压机制的,目标是根据当前 job 的处理情况来调节后续批次的获取 kafka 消息的条数。为了达到这个目的,Spark Streaming 在原有的架构上加入了一个 RateController,利用的算法是 PID,需要的反馈数据是任务处理的结束时间、调度时间、处理时间、消息条数,这些数据是通过 SparkListener 体系获得,然后通过 PIDRateEsimator 的 compute 计算得到一个速率,进而可以计算得到一个 offset,然后跟限速设置最大消费条数比较得到一个最终要消费的消息最大 offset。

PIDRateEsimator 的 compute 方法如下:

def compute(time: Long, // in milliseconds
     numElements: Long,      
     processingDelay: Long, // in milliseconds
     schedulingDelay: Long // in milliseconds
   ): Option[Double] = {
   logTrace(s"\ntime = $time, # records = $numElements, " +
     s"processing time = $processingDelay, scheduling delay = $schedulingDelay")    
     this.synchronized {      
       if (time > latestTime && numElements > 0 && processingDelay > 0) {        
       val delaySinceUpdate = (time - latestTime).toDouble / 1000
       val processingRate = numElements.toDouble / processingDelay * 1000
       val error = latestRate - processingRate        
       val historicalError = schedulingDelay.toDouble * processingRate / batchIntervalMillis        
       // in elements/(second ^ 2)
       val dError = (error - latestError) / delaySinceUpdate       
       val newRate = (latestRate - proportional * error -
                                   integral * historicalError -
                                   derivative * dError).max(minRate)
       logTrace(s"""  | latestRate = $latestRate, error = $error            
                      | latestError = $latestError, historicalError = $historicalError            
                      | delaySinceUpdate = $delaySinceUpdate, dError = $dError           
                 """.stripMargin)
       latestTime = time        
       if (firstRun) {
         latestRate = processingRate
         latestError = 0D
         firstRun = false
         logTrace("First run, rate estimation skipped")         
         None
       } else {
         latestRate = newRate
         latestError = error
         logTrace(s"New rate = $newRate")          
         Some(newRate)
       }
     } else {
       logTrace("Rate estimation skipped")        
       None
     }
   }
 }

2.9.2 Flink 的背压

与 Spark Streaming 的背压不同的是,Flink 1.5 之后实现了自己托管的 credit – based 流控机制,在应用层模拟 TCP 的流控机制,就是每一次 ResultSubPartition 向 InputChannel 发送消息的时候都会发送一个 backlog size 告诉下游准备发送多少消息,下游就会去计算有多少的 Buffer 去接收消息,算完之后如果有充足的 Buffer 就会返还给上游一个 Credit 告知他可以发送消息

jobmanager 针对每一个 task 每 50ms 触发 100 次 Thread.getStackTrace() 调用,求出阻塞的占比。过程如图 所示:

阻塞占比在 web 上划分了三个等级:

  • OK: 0 <= Ratio <= 0.10,表示状态良好;
  • LOW: 0.10 < Ratio <= 0.5,表示有待观察;
  • HIGH: 0.5 < Ratio <= 1,表示要处理了。

例如,0.01,代表着100次中有一次阻塞在内部调用

03 文末

本文主要讲解了Flink与Spark的对比,谢谢大家的阅读,本文完!


相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
目录
相关文章
|
1月前
|
分布式计算 数据处理 Apache
Spark和Flink的区别是什么?如何选择?都应用在哪些行业?
【10月更文挑战第10天】Spark和Flink的区别是什么?如何选择?都应用在哪些行业?
165 1
|
19天前
|
分布式计算 大数据 Apache
ClickHouse与大数据生态集成:Spark & Flink 实战
【10月更文挑战第26天】在当今这个数据爆炸的时代,能够高效地处理和分析海量数据成为了企业和组织提升竞争力的关键。作为一款高性能的列式数据库系统,ClickHouse 在大数据分析领域展现出了卓越的能力。然而,为了充分利用ClickHouse的优势,将其与现有的大数据处理框架(如Apache Spark和Apache Flink)进行集成变得尤为重要。本文将从我个人的角度出发,探讨如何通过这些技术的结合,实现对大规模数据的实时处理和分析。
53 2
ClickHouse与大数据生态集成:Spark & Flink 实战
|
20天前
|
分布式计算 大数据 OLAP
AnalyticDB与大数据生态集成:Spark & Flink
【10月更文挑战第25天】在大数据时代,实时数据处理和分析变得越来越重要。AnalyticDB(ADB)是阿里云推出的一款完全托管的实时数据仓库服务,支持PB级数据的实时分析。为了充分发挥AnalyticDB的潜力,将其与大数据处理工具如Apache Spark和Apache Flink集成是非常必要的。本文将从我个人的角度出发,分享如何将AnalyticDB与Spark和Flink集成,构建端到端的大数据处理流水线,实现数据的实时分析和处理。
48 1
zdl
|
6天前
|
消息中间件 运维 大数据
大数据实时计算产品的对比测评:实时计算Flink版 VS 自建Flink集群
本文介绍了实时计算Flink版与自建Flink集群的对比,涵盖部署成本、性能表现、易用性和企业级能力等方面。实时计算Flink版作为全托管服务,显著降低了运维成本,提供了强大的集成能力和弹性扩展,特别适合中小型团队和业务波动大的场景。文中还提出了改进建议,并探讨了与其他产品的联动可能性。总结指出,实时计算Flink版在简化运维、降低成本和提升易用性方面表现出色,是大数据实时计算的优选方案。
zdl
24 0
|
4月前
|
分布式计算 Java Serverless
EMR Serverless Spark 实践教程 | 通过 spark-submit 命令行工具提交 Spark 任务
本文以 ECS 连接 EMR Serverless Spark 为例,介绍如何通过 EMR Serverless spark-submit 命令行工具进行 Spark 任务开发。
406 7
EMR Serverless Spark 实践教程 | 通过 spark-submit 命令行工具提交 Spark 任务
|
3月前
|
资源调度 关系型数据库 MySQL
【Flink on YARN + CDC 3.0】神操作!看完这篇教程,你也能成为数据流处理高手!从零开始,一步步教会你在Flink on YARN模式下如何配置Debezium CDC 3.0,让你的数据库变更数据瞬间飞起来!
【8月更文挑战第15天】随着Apache Flink的普及,企业广泛采用Flink on YARN部署流处理应用,高效利用集群资源。变更数据捕获(CDC)工具在现代数据栈中至关重要,能实时捕捉数据库变化并转发给下游系统处理。本文以Flink on YARN为例,介绍如何在Debezium CDC 3.0中配置MySQL连接器,实现数据流处理。首先确保YARN上已部署Flink集群,接着安装Debezium MySQL连接器并配置Kafka Connect。最后,创建Flink任务消费变更事件并提交任务到Flink集群。通过这些步骤,可以构建出从数据库变更到实时处理的无缝数据管道。
292 2
|
3月前
|
分布式计算 Serverless 数据处理
EMR Serverless Spark 实践教程 | 通过 Apache Airflow 使用 Livy Operator 提交任务
Apache Airflow 是一个强大的工作流程自动化和调度工具,它允许开发者编排、计划和监控数据管道的执行。EMR Serverless Spark 为处理大规模数据处理任务提供了一个无服务器计算环境。本文为您介绍如何通过 Apache Airflow 的 Livy Operator 实现自动化地向 EMR Serverless Spark 提交任务,以实现任务调度和执行的自动化,帮助您更有效地管理数据处理任务。
205 0
|
4月前
|
分布式计算 数据处理 流计算
实时计算 Flink版产品使用问题之使用Spark ThriftServer查询同步到Hudi的数据时,如何实时查看数据变化
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
3月前
|
分布式计算 Java Linux
【Deepin 20系统】Linux 系统安装Spark教程及使用
在Deepin 20系统上安装和使用Apache Spark的详细教程,包括安装Java JDK、下载和解压Spark安装包、配置环境变量和Spark配置文件、启动和关闭Spark集群的步骤,以及使用Spark Shell和PySpark进行简单操作的示例。
67 0

热门文章

最新文章