Spark 3.0 终于支持 event logs 滚动了

本文涉及的产品
EMR Serverless StarRocks,5000CU*H 48000GB*H
简介: Spark 的 event log 为什么不可以提供类似功能呢?值得高兴的是,即将发布的 Spark 3.0 为我们带来了这个功能(具体参见 SPARK-28594)。当然,对待 Spark 的 event log 不能像其他普通应用程序的日志那样,简单切割,然后删除很早之前的日志,而需要保证 Spark 的历史服务器能够解析已经 Roll 出来的日志,并且在 Spark UI 中展示出来,以便我们进行一些查错、调优等。

背景

相信经常使用 Spark 的同学肯定知道 Spark 支持将作业的 event log 保存到持久化设备。默认这个功能是关闭的,不过我们可以通过 spark.eventLog.enabled 参数来启用这个功能,并且通过 spark.eventLog.dir 参数来指定 event log 保存的地方,可以是本地目录或者 HDFS 上的目录,不过一般我们都会将它设置成 HDFS 上的一个目录。

但是这个功能有个问题,就是这个 Spark Job 运行的过程中产生的所有 event log 都是写到单个文件中,这就导致了 event log 文件的大小和这个 Spark Job 的并行度、复杂度以及运行的时间有很大关系。如果我们是运行 Spark Streaming 作业,这个问题特别明显,我们经常看到某个 Spark Streaming 作业的 event log 达到几十 GB 大小!我们没办法清理或者删除一些不需要的事件日志,当我们使用 Spark 历史服务器打开这个几十 GB 大小的 event log,打开速度可想而知。

如果大家经常使用 Log4j 的话,Log4j 提供了一个 RollingFileAppender,可以使长时间运行应用的日志按照时间或者日志文件大小进行切割,从而达到限制单个日志文件的大小。Spark 的 event log 为什么不可以提供类似功能呢?值得高兴的是,即将发布的 Spark 3.0 为我们带来了这个功能(具体参见 SPARK-28594)。当然,对待 Spark 的 event log 不能像其他普通应用程序的日志那样,简单切割,然后删除很早之前的日志,而需要保证 Spark 的历史服务器能够解析已经 Roll 出来的日志,并且在 Spark UI 中展示出来,以便我们进行一些查错、调优等。

如何使用

事件日志滚动

首先必须使用 Spark 3.0,同时将 spark.eventLog.rolling.enabled 设置为 true(默认是 false)。那么 Spark 在 writeEvent 的时候会判断当前在写的 event log 文件大小加上现在新来的事件日志大小总和是否大于 spark.eventLog.rolling.maxFileSize 参数配置的值,如果满足将启动 event log roll 操作。

事件日志压缩

所谓事件日志压缩就是将多个滚动出来的事件日志文件合并到一个压缩的文件中。日志压缩涉及到的参数有 spark.history.fs.eventLog.rolling.maxFilesToRetain 和 spark.history.fs.eventLog.rolling.compaction.score.threshold。第一个参数的意思是进行 Compaction 之后需要保存多少个 event logs 为不压缩的状态,这个参数的默认值是 Int.MaxValue。也就是默认其实不启用事件日志 Compaction,所有 event logs 都将不会被 Compaction 到一个文件里面。

需要注意的:

•event logs 的 Compaction 操作是在 Spark 历史服务器端进行的,而且是在 Spark 历史服务器检查到有新的事件日志写到 spark.eventLog.dir 参数配置的目录中,这时候对应 Spark 作业的 event logs 将可能进行 compact 操作。

•event logs 的 Compaction 操作可能会删除一些没用的事件日志,关于删除的逻辑请看下一小结。这样经过 Compaction 操作之后,新生成的压缩文件大小将会变小。

•一个 Spark 作业最多只会有一个 Compact 文件,文件的后缀是 .compact。已经有 compact 之后的合并文件在下一次进行 compact 的时候会被读出来和需要被 compact 的文件再一次合并,然后写到新的 compact 文件里。

•已经被选中进行 compact 的 event logs 在执行完 compact 之后会被删除。

核心思想

整个 event logs 滚动项目应该可以大致分为两个阶段:

•第一个阶段就是支持 event logs 滚动以及 event logs Compaction,这个在 SPARK-28594 里面,已经合并到 Spark 3.0 代码中。

•第二个阶段是采用 AppStatusListener 使用的方法,即把 event logs 持久化到底层的 KVStore 中,并支持从 KVStore 把 event logs restore 出来,这个可以参见 SPARK-28870,这个还在开发中。

阶段一

支持 event log 滚动的隐层含义是支持删除旧的事件日志,要不然光支持滚动不支持删除,只是解决了单个 event log 文件的大小,解决不了整个作业 event log 总和大小。

为了保证删除旧的事件日志之后 event logs 仍然可以被 Spark 历史服务器重放,我们需要定义出哪些事件日志是可以删除的。

拿 Streaming 作业来说,每个批次都是运行不同的作业。如果我们想删除一些事件日志,在大多数情况下,我们都会保存最近一些批次作业的事件日志,因为这些事件日志有助于我们分析刚刚遇到的问题。换句话说,删除那些比较旧的作业对应的事件日志是比较安全的,而且是比较可行的。这个在 SQL 查询的作业来说一样是适用的。

目前 Spark 在内存中会维护一些 liveExecutors、liveRDDs、liveJobs、liveStages 以及 liveTasks 等信息,当 Spark 历史服务器触发 compact 操作的时候,会读取需要 compact 的事件日志文件, 然后根据前面的 liveExecutors、liveRDDs、liveJobs、liveStages 以及 liveTasks 等信息判断哪些事件需要删除,哪些事件需要保留。满足 EventFilter 定义的 Event 会被保留,不满足的就删除,具体可以参见 EventFilter 的 applyFilterToFile 方法实现。

阶段二

AppStatusListener 会利用外部 KVStore 来存储事件日志,所以社区建议利用现有特性在底层 KVStore 中保留最多数量的 Jobs、Stages 以及 SQL 执行。为了存储对象到 KVStore 以及从 KVStore 恢复对象,社区采用的方法是将 KVStore 中存储的对象 dump 到一个文件中,这个称为 snapshot。

从空间使用的角度来看,这个想法非常有效,因为在 POC 中,只需 5MB 内存就可以将 KVStore 中的数据 dump 到文件中,其中回放了8.4GB 的事件日志。结果看起来很令人惊讶,但是很有意义,根据这个机制,在大多数情况下,dump 数据到文件需要的内存大小不太可能发生显著变化。

需要注意的是,快照里面的内容与当前事件日志文件的内容是不同的。因为这个快照文件是从 KVStore dump 出来的,这些对象不会按创建的顺序写入。我们可以压缩这些对象以节省空间和 IO 成本。在分析某个问题时,新产生的事件日志可能没什么用,这就需要读取和操作之前的事件日志文件。为了支持这种情况,需要将基本的 listener events 编写为原始格式,然后滚动事件日志文件,然后再将旧的事件日志保存到快照中,两种格式的文件共存。这样就满足我们之前的需求。由于快照的存在,事件日志的总体大小不会无限增长。

新的方案中 event log 是如何存储的呢

之前每个 Spark 作业的 event log 都是保存在单个文件里面,如果事件日志没有完成,会使用 .inprogress 后缀表示。新的 event log 方案会为每个 Spark 作业创建一个目录来保存,因为每个 Spark 作业可能会生成多个事件日志文件。事件日志的文件夹名称格式为:eventlog_v2_appId(_)。在事件日志文件夹里面存储的是对应作业的事件日志,日志文件名称格式为:events__(_)(.)。

为了说明,我这里进行了一些测试,/data/iteblog/eventlogs 这个目录就是 spark.eventLog.dir 参数设置的值,下面是这个目录下的内容:

iteblog@www.iteblog.com:/data/iteblog/eventlogs|
⇒  ll
total 0
drwxrwx---  15 iteblog  wheel   480B  3  9 14:26 eventlog_v2_local-1583735123583
drwxrwx---   7 iteblog  wheel   224B  3  9 14:50 eventlog_v2_local-1583735259373
​
iteblog@www.iteblog.com:/data/iteblog/eventlogs/eventlog_v2_local-1583735259373|
⇒  ll
total 416
-rw-r--r--  1 iteblog  wheel     0B  3  9 14:27 appstatus_local-1583735259373.inprogress
-rwxrwx---  1 iteblog  wheel    64K  3  9 14:50 events_2_local-1583735259373.compact
-rwxrwx---  1 iteblog  wheel   102K  3  9 14:50 events_3_local-1583735259373
-rwxrwx---  1 iteblog  wheel   374B  3  9 14:50 events_4_local-1583735259373
​

可以看到,当 Spark 作业还没有完成的时候,会存在一个 appstatus_local-1583735259373.inprogress 的空文件,真正的事件日志是写到 events_x_local-1583735259373 文件里面。

这里再多说一下,Spark 还使用 HDFS 的 EC (erasure coding,参见过往记忆大数据的 Hadoop 3.0 纠删码(Erasure Coding):节省一半存储空间功能来进一步节省 event log 的大小。不过从 Spark 3.0 开始,默认写 event log 不启用 EC,原因是 HDFS EC 的 hflush() 或 hsync() 实现是不做任何操作的,也就意味着如果你的应用程序不完成,那么你在磁盘是看不到数据的。不过你如果实在需要 EC 功能,在 Spark 3.0 可以通过 spark.eventLog.allowErasureCoding 参数启用,具体参见 SPARK-25855


本文转载自公众号:过往记忆大数据
原文链接:https://mp.weixin.qq.com/s/Qb_g66MQMUopyVSfnhU-qw


阿里巴巴开源大数据技术团队成立Apache Spark中国技术社区,定期推送精彩案例,技术专家直播,问答区近万人Spark技术同学在线提问答疑,只为营造纯粹的Spark氛围,欢迎钉钉扫码加入!

image.png

对开源大数据和感兴趣的同学可以加小编微信(下图二维码,备注“进群”)进入技术交流微信群。
image.png

Apache Spark技术交流社区公众号,微信扫一扫关注

image.png

相关实践学习
基于EMR Serverless StarRocks一键玩转世界杯
基于StarRocks构建极速统一OLAP平台
快速掌握阿里云 E-MapReduce
E-MapReduce 是构建于阿里云 ECS 弹性虚拟机之上,利用开源大数据生态系统,包括 Hadoop、Spark、HBase,为用户提供集群、作业、数据等管理的一站式大数据处理分析服务。 本课程主要介绍阿里云 E-MapReduce 的使用方法。
相关文章
|
存储 分布式计算 Kubernetes
SPARK k8s backend中Executor Rolling(Executor的自动化滚动驱逐)
SPARK k8s backend中Executor Rolling(Executor的自动化滚动驱逐)
228 0
|
8天前
|
分布式计算 大数据 Apache
ClickHouse与大数据生态集成:Spark & Flink 实战
【10月更文挑战第26天】在当今这个数据爆炸的时代,能够高效地处理和分析海量数据成为了企业和组织提升竞争力的关键。作为一款高性能的列式数据库系统,ClickHouse 在大数据分析领域展现出了卓越的能力。然而,为了充分利用ClickHouse的优势,将其与现有的大数据处理框架(如Apache Spark和Apache Flink)进行集成变得尤为重要。本文将从我个人的角度出发,探讨如何通过这些技术的结合,实现对大规模数据的实时处理和分析。
36 2
ClickHouse与大数据生态集成:Spark & Flink 实战
|
30天前
|
存储 分布式计算 算法
大数据-106 Spark Graph X 计算学习 案例:1图的基本计算、2连通图算法、3寻找相同的用户
大数据-106 Spark Graph X 计算学习 案例:1图的基本计算、2连通图算法、3寻找相同的用户
58 0
|
30天前
|
消息中间件 分布式计算 NoSQL
大数据-104 Spark Streaming Kafka Offset Scala实现Redis管理Offset并更新
大数据-104 Spark Streaming Kafka Offset Scala实现Redis管理Offset并更新
38 0
|
30天前
|
消息中间件 存储 分布式计算
大数据-103 Spark Streaming Kafka Offset管理详解 Scala自定义Offset
大数据-103 Spark Streaming Kafka Offset管理详解 Scala自定义Offset
75 0
|
10天前
|
SQL 机器学习/深度学习 分布式计算
Spark快速上手:揭秘大数据处理的高效秘密,让你轻松应对海量数据
【10月更文挑战第25天】本文全面介绍了大数据处理框架 Spark,涵盖其基本概念、安装配置、编程模型及实际应用。Spark 是一个高效的分布式计算平台,支持批处理、实时流处理、SQL 查询和机器学习等任务。通过详细的技术综述和示例代码,帮助读者快速掌握 Spark 的核心技能。
33 6
|
8天前
|
存储 分布式计算 Hadoop
数据湖技术:Hadoop与Spark在大数据处理中的协同作用
【10月更文挑战第27天】在大数据时代,数据湖技术凭借其灵活性和成本效益成为企业存储和分析大规模异构数据的首选。Hadoop和Spark作为数据湖技术的核心组件,通过HDFS存储数据和Spark进行高效计算,实现了数据处理的优化。本文探讨了Hadoop与Spark的最佳实践,包括数据存储、处理、安全和可视化等方面,展示了它们在实际应用中的协同效应。
40 2
|
8天前
|
存储 分布式计算 Hadoop
数据湖技术:Hadoop与Spark在大数据处理中的协同作用
【10月更文挑战第26天】本文详细探讨了Hadoop与Spark在大数据处理中的协同作用,通过具体案例展示了两者的最佳实践。Hadoop的HDFS和MapReduce负责数据存储和预处理,确保高可靠性和容错性;Spark则凭借其高性能和丰富的API,进行深度分析和机器学习,实现高效的批处理和实时处理。
34 1
|
9天前
|
分布式计算 大数据 OLAP
AnalyticDB与大数据生态集成:Spark & Flink
【10月更文挑战第25天】在大数据时代,实时数据处理和分析变得越来越重要。AnalyticDB(ADB)是阿里云推出的一款完全托管的实时数据仓库服务,支持PB级数据的实时分析。为了充分发挥AnalyticDB的潜力,将其与大数据处理工具如Apache Spark和Apache Flink集成是非常必要的。本文将从我个人的角度出发,分享如何将AnalyticDB与Spark和Flink集成,构建端到端的大数据处理流水线,实现数据的实时分析和处理。
40 1
|
20天前
|
分布式计算 大数据 Apache
利用.NET进行大数据处理:Apache Spark与.NET for Apache Spark
【10月更文挑战第15天】随着大数据成为企业决策和技术创新的关键驱动力,Apache Spark作为高效的大数据处理引擎,广受青睐。然而,.NET开发者面临使用Spark的门槛。本文介绍.NET for Apache Spark,展示如何通过C#和F#等.NET语言,结合Spark的强大功能进行大数据处理,简化开发流程并提升效率。示例代码演示了读取CSV文件及统计分析的基本操作,突显了.NET for Apache Spark的易用性和强大功能。
31 1