[转载] Spark Structed Streaming执行过程

本文涉及的产品
EMR Serverless StarRocks,5000CU*H 48000GB*H
简介: 在Struct Streaming中增加了支持sql处理流数据,在sql包中单独处理,其中StreamExecution是下面提到两处流处理的基类,这个流查询在数据源有新数据到达时会生成一个QueryExecution来执行并将结果输出到指定的Sink(处理后数据存放地)中。
本文转自:https://www.jianshu.com/p/dcfc0b6ae0ea

本站转载已经过作者授权。任何形式的转载都请联系原作者(薛定谔的猫Plus获得授权并注明出处



在Struct Streaming中增加了支持sql处理流数据,在sql包中单独处理,其中StreamExecution是下面提到两处流处理的基类,这个流查询在数据源有新数据到达时会生成一个QueryExecution来执行并将结果输出到指定的Sink(处理后数据存放地)中。


webp

MicroBatchExecution

该部分是小批量处理,默认使用ProcessingTimeExecutor这个trigger定时出发,使用的是系统时钟.

case class ProcessingTimeExecutor(processingTime: ProcessingTime, clock: Clock = new SystemClock())
  extends TriggerExecutor with Logging {

  private val intervalMs = processingTime.intervalMs
  require(intervalMs >= 0)

  override def execute(triggerHandler: () => Boolean): Unit = {
    while (true) {
      val triggerTimeMs = clock.getTimeMillis
      val nextTriggerTimeMs = nextBatchTime(triggerTimeMs)
      val terminated = !triggerHandler()
      if (intervalMs > 0) {
        val batchElapsedTimeMs = clock.getTimeMillis - triggerTimeMs
        if (batchElapsedTimeMs > intervalMs) {
          notifyBatchFallingBehind(batchElapsedTimeMs)
        }
        if (terminated) {
          return
        }
        clock.waitTillTime(nextTriggerTimeMs)
      } else {
        if (terminated) {
          return
        }
      }
    }
  }

该执行逻辑是首先生成一个逻辑计划,标记是从什么数据源抽取数据


override lazy val logicalPlan: LogicalPlan = {
    assert(queryExecutionThread eq Thread.currentThread,
      "logicalPlan must be initialized in QueryExecutionThread " +
        s"but the current thread was ${Thread.currentThread}")
    var nextSourceId = 0L
    val toExecutionRelationMap = MutableMap[StreamingRelation, StreamingExecutionRelation]()
    val v2ToExecutionRelationMap = MutableMap[StreamingRelationV2, StreamingExecutionRelation]()
    // We transform each distinct streaming relation into a StreamingExecutionRelation, keeping a
    // map as we go to ensure each identical relation gets the same StreamingExecutionRelation
    // object. For each microbatch, the StreamingExecutionRelation will be replaced with a logical
    // plan for the data within that batch.
    // Note that we have to use the previous `output` as attributes in StreamingExecutionRelation,
    // since the existing logical plan has already used those attributes. The per-microbatch
    // transformation is responsible for replacing attributes with their final values.
    val _logicalPlan = analyzedPlan.transform {
      case streamingRelation@StreamingRelation(dataSource, _, output) =>
        toExecutionRelationMap.getOrElseUpdate(streamingRelation, {
          // Materialize source to avoid creating it in every batch
          val metadataPath = s"$resolvedCheckpointRoot/sources/$nextSourceId"
          val source = dataSource.createSource(metadataPath)
          nextSourceId += 1
          StreamingExecutionRelation(source, output)(sparkSession)
        })
      case s@StreamingRelationV2(source: MicroBatchReadSupport, _, options, output, _) =>
        v2ToExecutionRelationMap.getOrElseUpdate(s, {
          // Materialize source to avoid creating it in every batch
          val metadataPath = s"$resolvedCheckpointRoot/sources/$nextSourceId"
          val reader = source.createMicroBatchReader(
            Optional.empty(), // user specified schema
            metadataPath,
            new DataSourceOptions(options.asJava))
          nextSourceId += 1
          StreamingExecutionRelation(reader, output)(sparkSession)
        })
      case s@StreamingRelationV2(_, sourceName, _, output, v1Relation) =>
        v2ToExecutionRelationMap.getOrElseUpdate(s, {
          // Materialize source to avoid creating it in every batch
          val metadataPath = s"$resolvedCheckpointRoot/sources/$nextSourceId"
          if (v1Relation.isEmpty) {
            throw new UnsupportedOperationException(
              s"Data source $sourceName does not support microbatch processing.")
          }
          val source = v1Relation.get.dataSource.createSource(metadataPath)
          nextSourceId += 1
          StreamingExecutionRelation(source, output)(sparkSession)
        })
    }
    sources = _logicalPlan.collect { case s: StreamingExecutionRelation => s.source }
    uniqueSources = sources.distinct
    _logicalPlan
  }


以kafka为例,在执行过程中构建kafka的offset范围,在populateStartOffsets以及constructNextBatch这两个方法中完成kafka的offset范围,接下来在runBatch中完成数据数据抽取.


newData = reportTimeTaken("getBatch") {
      availableOffsets.flatMap {
        case (source: Source, available)
          if committedOffsets.get(source).map(_ != available).getOrElse(true) =>
          val current = committedOffsets.get(source)
          //这部分逻辑基于传入的起始offset范围(包含了每个partition的offset范围)形成一个kafka的DataFrame
          val batch = source.getBatch(current, available)


基于该部分生成的DataFrame,替换最开始logicPlan中的数据源


val newBatchesPlan = logicalPlan transform {
      case StreamingExecutionRelation(source, output) =>
        newData.get(source).map { dataPlan =>
          assert(output.size == dataPlan.output.size,
            s"Invalid batch: ${Utils.truncatedString(output, ",")} != " +
              s"${Utils.truncatedString(dataPlan.output, ",")}")
          replacements ++= output.zip(dataPlan.output)
          dataPlan
        }.getOrElse {
          LocalRelation(output, isStreaming = true)
        }
    }


后续基于此逻辑计划new一个IncrementalExecution形成执行计划


reportTimeTaken("queryPlanning") {
      lastExecution = new IncrementalExecution(
        sparkSessionToRunBatch,
        triggerLogicalPlan,
        outputMode,
        checkpointFile("state"),
        runId,
        currentBatchId,
        offsetSeqMetadata)
      lastExecution.executedPlan // Force the lazy generation of execution plan
    }

    val nextBatch =
      new Dataset(sparkSessionToRunBatch, lastExecution, RowEncoder(lastExecution.analyzed.schema))

接下来基于不同的sink进行处理,其中SQLExecution.withNewExecutionId主要是为了跟踪jobs的信息

reportTimeTaken("addBatch") {
      SQLExecution.withNewExecutionId(sparkSessionToRunBatch, lastExecution) {
        sink match {
          case s: Sink =>
            if (s.isInstanceOf[MemorySinkExtend]) {
              s.addBatch(currentBatchId, nextBatch, batchIdOffsetMap.get(currentBatchId).getOrElse((None, None)))
            } else {
              s.addBatch(currentBatchId, nextBatch, (None, None))
            }
          case _: StreamWriteSupport =>
            // This doesn't accumulate any data - it just forces execution of the microbatch writer.
            nextBatch.collect()
        }
      }
    }


其中遗留一个问题是在计算过程中水印(watermark)的处理如何,我们继续分析。
在执行过程中会随着数据中的事件时更新watermark时间


if (hasNewData) {
      var batchWatermarkMs = offsetSeqMetadata.batchWatermarkMs
      // Update the eventTime watermarks if we find any in the plan.
      if (lastExecution != null) {
        lastExecution.executedPlan.collect {
          case e: EventTimeWatermarkExec => e
        }.zipWithIndex.foreach {
          case (e, index) if e.eventTimeStats.value.count > 0 =>
            logDebug(s"Observed event time stats $index: ${e.eventTimeStats.value}")
            val newWatermarkMs = e.eventTimeStats.value.max - e.delayMs
            val prevWatermarkMs = watermarkMsMap.get(index)
            if (prevWatermarkMs.isEmpty || newWatermarkMs > prevWatermarkMs.get) {
              watermarkMsMap.put(index, newWatermarkMs)
            }


在随后执行阶段,基于该watermark生成表达式,然后在输出数据时进行过滤
//statefulOperators.scala


lazy val watermarkExpression: Option[Expression] = {
    WatermarkSupport.watermarkExpression(
      child.output.find(_.metadata.contains(EventTimeWatermark.delayKey)),
      eventTimeWatermark)
  }

  /** Predicate based on keys that matches data older than the watermark */
  lazy val watermarkPredicateForKeys: Option[Predicate] = watermarkExpression.flatMap { e =>
    if (keyExpressions.exists(_.metadata.contains(EventTimeWatermark.delayKey))) {
      Some(newPredicate(e, keyExpressions))
    } else {
      None
    }
  }

  /** Predicate based on the child output that matches data older than the watermark. */
  lazy val watermarkPredicateForData: Option[Predicate] =
    watermarkExpression.map(newPredicate(_, child.output))

在输出阶段,根据输出模式不同,根据watermark时间从HDFSBackedStateStoreProvider中过滤聚合后的数据,以及删除存储的一些聚合数据
webp

ContinusExecution

该执行逻辑与上面类似,只是这部分在保存offset信息是异步方式,流中的数据一直在处理。


相关实践学习
基于EMR Serverless StarRocks一键玩转世界杯
基于StarRocks构建极速统一OLAP平台
快速掌握阿里云 E-MapReduce
E-MapReduce 是构建于阿里云 ECS 弹性虚拟机之上,利用开源大数据生态系统,包括 Hadoop、Spark、HBase,为用户提供集群、作业、数据等管理的一站式大数据处理分析服务。 本课程主要介绍阿里云 E-MapReduce 的使用方法。
相关文章
|
20天前
|
分布式计算 Java Apache
Apache Spark Streaming技术深度解析
【9月更文挑战第4天】Apache Spark Streaming是Apache Spark生态系统中用于处理实时数据流的一个重要组件。它将输入数据分成小批次(micro-batch),然后利用Spark的批处理引擎进行处理,从而结合了批处理和流处理的优点。这种处理方式使得Spark Streaming既能够保持高吞吐量,又能够处理实时数据流。
51 0
|
2月前
|
分布式计算 Apache 数据安全/隐私保护
流计算引擎数据问题之在 Spark Structured Streaming 中水印计算和使用如何解决
流计算引擎数据问题之在 Spark Structured Streaming 中水印计算和使用如何解决
40 1
|
2月前
|
分布式计算 资源调度 测试技术
“Spark Streaming异常处理秘籍:揭秘如何驯服实时数据流的猛兽,守护你的应用稳如泰山,不容错过!”
【8月更文挑战第7天】Spark Streaming 是 Apache Spark 中的关键组件,用于实时数据流处理。部署时可能遭遇数据问题、资源限制或逻辑错误等异常。合理处理这些异常对于保持应用稳定性至关重要。基础在于理解其异常处理机制,通过 DSC 将数据流切分为 RDD。对于数据异常,可采用 try-catch 结构捕获并处理;资源层面异常需优化 Spark 配置,如调整内存分配;逻辑异常则需加强单元测试及集成测试。结合监控工具,可全面提升应用的健壮性和可靠性。
66 3
|
4月前
|
分布式计算 Java Scala
如何处理 Spark Streaming 的异常情况?
【6月更文挑战第16天】如何处理 Spark Streaming 的异常情况?
176 56
|
3月前
|
分布式计算 监控 数据处理
Spark Streaming:解锁实时数据处理的力量
【7月更文挑战第15天】Spark Streaming作为Spark框架的一个重要组成部分,为实时数据处理提供了高效、可扩展的解决方案。通过其微批处理的工作模式和强大的集成性、容错性特性,Spark Streaming能够轻松应对各种复杂的实时数据处理场景。然而,在实际应用中,我们还需要根据具体需求和资源情况进行合理的部署和优化,以确保系统的稳定性和高效性。
|
3月前
|
分布式计算 Apache Spark
|
5月前
|
分布式计算 关系型数据库 MySQL
Spark编程实验四:Spark Streaming编程
Spark编程实验四:Spark Streaming编程
98 2
|
3月前
|
机器学习/深度学习 分布式计算 算法
Spark快速大数据分析PDF下载读书分享推荐
《Spark快速大数据分析》适合初学者,聚焦Spark实用技巧,同时深入核心概念。作者团队来自Databricks,书中详述Spark 3.0新特性,结合机器学习展示大数据分析。Spark是大数据分析的首选工具,本书助你驾驭这一利器。[PDF下载链接][1]。 ![Spark Book Cover][2] [1]: https://zhangfeidezhu.com/?p=345 [2]: https://i-blog.csdnimg.cn/direct/6b851489ad1944548602766ea9d62136.png#pic_center
124 1
Spark快速大数据分析PDF下载读书分享推荐
|
2月前
|
分布式计算 资源调度 大数据
【决战大数据之巅】:Spark Standalone VS YARN —— 揭秘两大部署模式的恩怨情仇与终极对决!
【8月更文挑战第7天】随着大数据需求的增长,Apache Spark 成为关键框架。本文对比了常见的 Spark Standalone 与 YARN 部署模式。Standalone 作为自带的轻量级集群管理服务,易于设置,适用于小规模或独立部署;而 YARN 作为 Hadoop 的资源管理系统,支持资源的统一管理和调度,更适合大规模生产环境及多框架集成。我们将通过示例代码展示如何在这两种模式下运行 Spark 应用程序。
165 3
|
1月前
|
机器学习/深度学习 分布式计算 大数据
Spark 适合解决多种类型的大数据处理问题
【9月更文挑战第1天】Spark 适合解决多种类型的大数据处理问题
38 3
下一篇
无影云桌面