作者:Genmao Yu
原文链接:https://databricks.com/blog/2020/07/29/a-look-at-the-new-structured-streaming-ui-in-apache-spark-3-0.html
编译:邵嘉阳,计算机科学与技术大三在读,Apache Spark 中文社区志愿者
在Apache Spark 2.0中,我们迎来了Structured Streaming——构建分布式流处理应用的最佳平台。统一的API(SQL,Dataset和DataFrame)以及Spark内置的大量函数为开发者实现复杂的需求提供了便利,比如流的聚合,流-流连接和窗口支持。开发者们普遍喜欢通过Spark Streaming中的DStream的方式来管理他们的流,那么类似的功能什么时候能在Structured Streaming中得到实现呢?这不,在Apache Spark 3.0中,全新的Structured Streaming可视化UI和开发者们见面了。
新的Structured Streaming UI会提供一些有用的信息和统计数据,以此来监视所有流作业,便于在开发调试过程中排除故障。同时,开发者还能够获得实时的监测数据,这能使生产流程更直观。在这个新的UI中,我们会看到两组统计数据:1)流查询作业的聚合信息;2)流查询的具体统计信息,包括输入速率(Input Rate)、处理速率(Process Rate)、输入行数(Input Rows)、批处理持续时间(Batch Duration)和操作持续时间(Operation Duration)等。
流查询作业的聚合信息
开发者提交的流SQL查询会被列在Structured Streaming一栏中,包括正在运行的流查询(active)和已完成的流查询(completed)。结果表则会显示流查询的一些基本信息,包括查询名称、状态、ID、运行ID、提交时间、查询持续时间、最后一批的ID以及一些聚合信息,如平均输入速率和平均处理速率。流查询有三种状态:运行(RUNNING)、结束(FINISHED)、失败(FAILED)。所有结束(FINISHED)和失败(FAILED)的查询都在已完成的流式查询表中列出。Error列显示有关失败查询的详细信息。
我们可以通过单击Run ID链接查看流查询的详细信息。
详细的统计信息
Statistics页面显示了包括输入速率、处理速率、延迟和详细的操作持续时间在内的一系列指标。通过图表,开发者能全面了解已提交的流查询的状态,并且轻松地调试查询处理中的异常情况。
它包含以下指标:
- Input Rate:数据到达的聚合速率(跨所有源)。
- Process Rate: Spark处理数据的聚合速率(跨所有源)。
- Batch Duration: 每一批的处理时间。
- Operation Duration: 执行各种操作所花费的时间(以毫秒为单位)。
被追踪的操作罗列如下: - addBatch:从源读取微批的输入数据、对其进行处理并将批的输出写入接收器所花费的时间。这应该会占用微批处理的大部分时间。
- getBatch:准备逻辑查询以从源读取当前微批的输入所花费的时间。
- getOffset:查询源是否有新的输入数据所花费的时间。
- walCommit:将偏移量写入元数据日志。
- queryPlanning:生成执行计划。
需要注意的是,由于数据源的类型不同,一个查询可能不会包含以上列出的所有操作。
使用UI解决流的性能故障
在这一部分中,我们会看到新的UI是怎样实时、直观地显示查询执行过程中的异常情况的。我们会在每个例子中预先假设一些条件,样例查询看起来是这样的:
import java.util.UUID
val bootstrapServers = ...
val topics = ...
val checkpointLocation = "/tmp/temporary-" + UUID.randomUUID.toString
val lines = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", bootstrapServers)
.option("subscribe", topics)
.load()
.selectExpr("CAST(value AS STRING)")
.as[String]
val wordCounts = lines.flatMap(_.split(" ")).groupBy("value").count()
val query = wordCounts.writeStream
.outputMode("complete")
.format("console")
.option("checkpointLocation", checkpointLocation)
.start()
由于处理能力不足而增加延迟
在第一种情况下,我们希望尽快处理Apache Kafka数据。在每一批中,流作业将处理Kafka中所有可用的数据。如果处理能力不足以处理批数据,那么延迟将迅速增加。最直观的现象是Input Rows和Batch Duration会呈线性上升。Process Rate提示流作业每秒最多只能处理大约8000条记录,但是当前的输入速率是每秒大约20000条记录。产生问题的原因一目了然,那么我们可以为流作业提供更多的执行资源,或者添加足够的分区来处理与生产者匹配所需的所有消费者。
稳定但高延迟
第二种情况下,延迟并没有持续增加,而是保持稳定,如下截图所示:
我们发现在相同的Input Rate下,Process Rate可以保持稳定。这意味着作业的处理能力足以处理输入数据。然而,每批的延迟仍然高达20秒。这里,高延迟的主要原因是每个批中有太多数据,那么我们可以通过增加这个作业的并行度来减少延迟。在为Spark任务添加了10个Kafka分区和10个内核之后,我们发现延迟大约为5秒——比20秒要好得多。
使用操作持续时间图进行故障排除
操作持续时间图(Operation Duration Chart)显示了执行各种操作所花费的时间(以毫秒为单位)。这对于了解每个批处理的时间分布和故障排除非常有用。让我们以Apache Spark社区中的性能改进“Spark-30915:在查找最新批处理ID时避免读取元数据日志文件“为例。
在某次查询中我们发现,当压缩后的元数据日志很大时,下一批要花费比其他批更多的时间来处理。
在进行代码审查之后,我们发现这是由对压缩日志文件的不必要读取造成的并进行了修复。新的操作持续时间图确认了我们想法:
未来的开发方向
如上所示,新的Structured Streaming UI将通过提供更有用的流查询信息帮助开发者更好地监视他们的流作业。作为早期发布版本,新的UI仍在开发中,并将在未来的发布中得到改进。有几个未来可以实现的功能,包括但不限于:
- 更多的流查询执行细节:延迟数据,水印,状态数据指标等等。
- 在Spark历史服务器中支持Structured Streaming UI。
- 对于不寻常的情况有更明显的提示:发生延迟等。
近期活动:
8月24日开始 Spark 实战训练营正式开课
免费报名链接:https://developer.aliyun.com/learning/trainingcamp/spark/2