通过Job Committer保证Mapreduce/Spark任务数据一致性

本文涉及的产品
EMR Serverless StarRocks,5000CU*H 48000GB*H
简介: 通过对象存储系统普遍提供的Multipart Upload功能,实现的No-Rename Committer在数据一致性和性能方面相对于FileOutputCommitter V1/V2版本均有较大提升,在使用MapRedcue和Spark写入数据到S3/Oss的场景中更加推荐使用。

作者:李呈祥,花名司麟,阿里云智能EMR团队高级技术专家,Apache Hive Committer, Apache Flink Committer,目前主要专注于EMR产品中开源计算引擎的优化工作。


并发地向目标存储系统写数据是分布式任务的一个天然特性,通过在节点/进程/线程等级别的并发写数据,充分利用集群的磁盘和网络带宽,实现高容量吞吐。并发写数据的一个主要需要解决的问题就是如何保证数据一致性的问题,具体来说,需要解决下面列出的各个问题:

1.在分布式任务写数据的过程中,如何保证中间数据对外不可见。

2.在分布式任务正常完成后,保证所有的结果数据同时对外可见。

3.在分布式任务失败时,所有结果数据对外不可见且能正确清理。

4.开启预测执行时,保证多个执行相同任务的task只有一份结果数据在最终结果中。

此外,还要一些作业的异常情况需要处理,例如task失败重试,作业重启等等。Job Committer是MapReduce用来实现分布式写入一致性的保证,通过Job Committer的各种实现,保证MapReduce任务在各种异常场景中数据写出的一致性。Spark支持MapReduce的JobCommitter,同样也是通过JobCommitter实现Spark作业写出数据的一致性。

JobCommitter接口

MapReduce有V1和V2两套API接口,在包名中以mapredmapreduce区分,v1和v2版本的JobCommitter抽象接口基本一致,下面以org.apache.hadoop.mapreduce.OutputCommitter为例介绍主要的接口定义:

Modifier and Type Method and Description
abstract void setupJob(JobContext jobContext)For the framework to setup the job output during initialization.
void commitJob(JobContext jobContext)For committing job's output after successful job completion.
void abortJob(JobContext jobContext, org.apache.hadoop.mapreduce.JobStatus.State state)For aborting an unsuccessful job's output.
boolean isCommitJobRepeatable(JobContext jobContext)Returns true if an in-progress job commit can be retried.
abstract void setupTask(TaskAttemptContext taskContext)Sets up output for the task.
abstract void commitTask(TaskAttemptContext taskContext)To promote the task's temporary output to final output location.
abstract void abortTask(TaskAttemptContext taskContext)Discard the task output.
abstract boolean needsTaskCommit(TaskAttemptContext taskContext)Check whether task needs a commit.
boolean isRecoverySupported(JobContext jobContext)Is task output recovery supported for restarting jobs? If task output recovery is supported, job restart can be done more efficiently.
void recoverTask(TaskAttemptContext taskContext)Recover the task output.

根据接口的调用时机和顺序,我们可以大致梳理出MapReduce任务是如何通过JobCommitter的工作机制。

1.在job初始化时,调用setupJob,进行一些作业级别的初始化工作,例如设置job的工作目录等等。

2.如果已有相同作业正在执行,调用isCommitJobRepeatable判断是否继续。

3.在task初始化时,调用setupTask,进行一些作业级别的初始化工作,例如设置task工作目录,task输出目录等。

4.如果task输出已存在,通过isRecorverySupport判断是否支持recovery,是的话,调用recoverTask,避免task的计算。

5.如果task执行失败,调用abortTask,清理task输出。

6.如果task执行成功,调用commitTask。

7.如果所有task都全部完成,调用commitJob。

8.如果job失败,调用abortJob。

可以看到,JobCommitter的基本机制是基于一种类似于分布式数据库中的两阶段提交协议的方式,task首先commit,主要的工作在task中完成,在appmaster收到所有task成功提交的信息后,进行job commit完成最后的提交工作。通过两阶段提交协议实现数据一致性有两个主要的需求需要满足:

1.在commit job以前,数据对外不可见,且可回退。

2.commit job过程要尽量短,最好是原子操作,较长的commit job过程,中间发生失败的风险较大,一旦失败,会导致数据处于某种中间状态,无法满足数据一致性的要求。

在MapReduce中,FileOutputCommitter是最常使用的一个Job Commiter实现,在写入数据到HDFS上时,完全满足两阶段提交协议的两个要求。

FileOutputCommitter

下面简单介绍FileOutputCommitter主要接口的一些具体实现细节。FileOutputCommitter主要涉及到四个目录:

  • 最终目录:$dest/
  • Job临时目录:$dest/_temporary/$appAttemptId/
  • Task临时目录:$dest/_temporary/$appAttemptId/_temporary/$taskAttemptID/
  • Task输出目录:$dest/_temporary/$appAttemptId/$taskAttemptID/

整个JobCommitter执行过程如图所示:

img

1.setupJob:设置Job临时目录。

2.setupTask:确定Task临时目录和输出目录。

3.commitTask:将Task临时目录rename到输出目录。

4.abortTask:清理Task临时目录。

5.commitJob:将Job临时目录中的数据(包含所有Task输出目录中的文件)合并到Job最终目录。

6.abortJob:清理Job临时目录。

根据以上FileOutputCommitter的实现,在可以看到,在commitJob之前,所有mapreduce任务写的数据都在临时目录中,读取Job最终目录不会读到临时数据,在Job执行的任意过程失败,清理临时目录文件即可。FileOutputCommitter在Job执行的过程中,每一个产生的文件需要进行两次Rename操作,第一次是commitTask,在Task中执行,多个节点中执行的task可以并发地进行Rename。第二次是commitJob,MapReduce或者Spark的Job Driver端执行的,是个单点操作。在commitJob时,由于需要将Job临时目录中的文件移动到最终目录,会有一个时间窗口,在过程中失败的话,会导致部分数据对外可见,这个时间窗口随着文件数量的增加也会随之增加。对于HDFS这类分布式文件系统来说,rename是一个十分高效的操作,只涉及到NameNode上相关元数据的修改,所以这个时间窗口非常小,可以满足绝大部分场景的需求。

在对于S3,OSS等公有云上的对象存储系统来说,并不直接支持Rename操作,文件系统级别的Rename操作一般会转换成Copy+Delete操作,这个代价相对于HDFS会大大增加。commitJob是在MapReduce或者Spark的Job Driver端执行的,是个单点操作,虽然有实现线程级别的并发优化,但是在写入S3/OSS的场景中,commitJob的时间窗口会非常长,文件数量较大时,可能达到分钟,甚至小时级别,这对于Job的性能会产生严重的影响,为了解决写S3/OSS等对象存储系统的性能问题,Hadoop社区引入了FileOutputCommitter V2版本。

FileOutputCommitter V2

FileOutputCommitter V2版本整个job commit的过程如下:

img

1.setupJob:设置Job临时目录。

2.setupTask:确定Task临时目录。

3.commitTask:将Task临时目录文件rename到Job最终目录。

4.abortTask:清理Task临时目录。

5.commitJob:无需Rename操作。

6.abortJob:清理Job临时目录。

可以看到在V2版本中,最大的区别是去掉了Task输出目录,在commitTask的时候将文件直接rename到Job最终目录,整个Job Commit过程,对于所有的文件只需进行一次Rename操作,而且Rename操作是在集群节点的所有task上并发执行的,消除了Job Driver单点执行rename的瓶颈。

FileOutputCommitter V2在写入数据到S3/OSS等场景中大大提高了性能,但是由于byPass了Task输出目录,无法保证数据的一致性,在Job执行过程中,部分文件就移动到了Job最终目录。当部分task成功,部分task失败时,也会在最终目录中残留中间文件。

针对写入S3/OSS等的场景,Hadoop社区和各个工业界也都提出了非常多的解决方案,基本的目标是保证数据一致性的前提下,完全避免Rename操作。下面主要介绍S3ACommitter和JindoOssCommitter,分别是hadoop社区和阿里云EMR团队针对S3和OSS实现的Job Committer,主要是基于S3/OSS的Multipart Upload特性实现,基本思想一致,在这里一并介绍。此外,还有Databricks基于DBIO的方案,Netflix的Staging committer方案等等,篇幅有限,这里就不过多介绍了。

对象存储系统的Multipart Upload

除了通过PUT Object接口上传文件到S3/OSS以外,S3/OSS还提供了另外一种上传模式——Multipart Upload。主要应用在文件较大,需要断点上传或者网络不好等场景中,以OSS为例,Multipart Upload上传的流程如下:

1.InitiateMultipartUpload:使用Multipart Upload模式传输数据前,必须先调用该接口来通知OSS初始化一个Multipart Upload事件。指定目标文件地址作为参数,获取一个uploadId用作后续upload使用。

2.UploadPart:初始化一个MultipartUpload之后,可以根据指定的Object名和Upload ID来分块(Part)上传数据。可重复调用uploadPart接口上传不同的分块数据,而且可以并发调用。

3.CompleteMultipartUpload:在将所有数据Part都上传完成后,必须调用CompleteMultipartUpload接口来完成整个文件的MultipartUpload。完成completeMultipartUpload后,文件在oss上对外可见,在completeMultipartUpload返回之前,该文件对外不可见。

4.AbortMultipartUpload:AbortMultipartUpload接口用于终止MultipartUpload事件,在CompleteMultipartUpload之前可随时中止MultipartUpload。

5.ListMultipartUploads:ListMultipartUploads用来列举所有执行中的Multipart Upload事件,即已经初始化但还未Complete或者Abort的Multipart Upload事件。

基于Multipart Upload的No-Rename Committer实现

通过Multipart Upload功能提供的支持,结合S3/Oss文件系统层面的定制支持,可以实现在保证数据一致性前提下无需Rename操作的Job Committer实现,具体的Job Commit流程如下:

img

1.setupJob:设置Job临时目录。

2.setupTask:设置Task临时目录,Task执行过程中写文件使用MultiUpload接口直接写到Job最终目录,在close文件时,不调用CompleteMultipartUpload接口,将所有Upload分块信息记录在Task临时目录的文件中。

3.commitTask:将Task临时目录文件中的多个文件Upload分块信息合并成一个文件,写到Job临时目录。

4.abortTask:清理Task临时目录,使用AbortMultipartUpload接口,abort所有该task写的文件。

5.commitJob:访问Job临时目录中所有的Upload分块信息,调用CompleteMultipartUpload接口,完成所有文件的MultipartUpload。

6.abortJob:调用ListMultipartUploads,abort所有该task写的文件分块,清理Job临时目录。

在Task执行过程中,由于通过Multipart Upload相关接口初始化upload和上传分块数据,但是知道commitJob时,才会调用CompleteMultipartUpload。根据Multipart Upload特性,在调用CompleteMultipartUpload前文件是不可见的,从而保证了数据一致性。同FileOutputCommitter类似,由于有多个文件需要CompleteMultipartUpload,在commitJob时也会有一个可能导致数据不一致的时间窗口。文件的上传过程都已经在task中分布式的完成了,在Job Driver中commitJob时CompleteMultipartUpload是一个非常轻量级的请求,所以这个时间窗口会非常短,失败的可能较低,可以满足绝大部分业务场景的需求。对比FileOutputCommitter V1,在jobCommit时,CompleteMultipartUpload相对于Rename代价小很多,可能导致数据不一致的时间窗口也会少很多。对比FileOutputCommitter V2,V2并不保证数据一致性,JindoOssCommitter可以适用于更多对数据一致性有要求的场景。

性能方面,这种方式分布式的在task中并发写数据到OSS中,并且不需要Rename操作,对比FileOutputCommitter V1/V2分别需要的两次和一次Rename操作,也有大幅的性能提升。

总结

通过对象存储系统普遍提供的Multipart Upload功能,实现的No-Rename Committer在数据一致性和性能方面相对于FileOutputCommitter V1/V2版本均有较大提升,在使用MapRedcue和Spark写入数据到S3/Oss的场景中更加推荐使用。S3ACommitter在Hadoop社区版本的3.1.2中已经可以使用,JindoOssCommitter也在阿里云的EMR环境2.5.0以上版本中默认开启。


阿里巴巴开源大数据技术团队成立Apache Spark中国技术社区,定期推送精彩案例,技术专家直播,问答区近万人Spark技术同学在线提问答疑,只为营造纯粹的Spark氛围,欢迎钉钉扫码加入!

image.png

对开源大数据和感兴趣的同学可以加小编微信(下图二维码,备注“进群”)进入技术交流微信群。
image.png

Apache Spark技术交流社区公众号,微信扫一扫关注

image.png

相关实践学习
基于EMR Serverless StarRocks一键玩转世界杯
基于StarRocks构建极速统一OLAP平台
快速掌握阿里云 E-MapReduce
E-MapReduce 是构建于阿里云 ECS 弹性虚拟机之上,利用开源大数据生态系统,包括 Hadoop、Spark、HBase,为用户提供集群、作业、数据等管理的一站式大数据处理分析服务。 本课程主要介绍阿里云 E-MapReduce 的使用方法。
相关文章
|
2月前
|
存储 缓存 分布式计算
Spark任务OOM问题如何解决?
大家好,我是V哥。在实际业务中,Spark任务常因数据量过大、资源分配不合理或代码瓶颈导致OOM(Out of Memory)。本文详细分析了各种业务场景下的OOM原因,并提供了优化方案,包括调整Executor内存和CPU资源、优化内存管理策略、数据切分及减少宽依赖等。通过综合运用这些方法,可有效解决Spark任务中的OOM问题。关注威哥爱编程,让编码更顺畅!
220 3
|
2月前
|
分布式计算 资源调度 Hadoop
大数据-80 Spark 简要概述 系统架构 部署模式 与Hadoop MapReduce对比
大数据-80 Spark 简要概述 系统架构 部署模式 与Hadoop MapReduce对比
83 2
|
4月前
|
SQL 分布式计算 DataWorks
DataWorks产品使用合集之如何开发ODPS Spark任务
DataWorks作为一站式的数据开发与治理平台,提供了从数据采集、清洗、开发、调度、服务化、质量监控到安全管理的全套解决方案,帮助企业构建高效、规范、安全的大数据处理体系。以下是对DataWorks产品使用合集的概述,涵盖数据处理的各个环节。
|
2月前
|
分布式计算 资源调度 数据可视化
Hadoop-06-Hadoop集群 历史服务器配置 超详细 执行任务记录 JobHistoryServer MapReduce执行记录 日志聚合结果可视化查看
Hadoop-06-Hadoop集群 历史服务器配置 超详细 执行任务记录 JobHistoryServer MapReduce执行记录 日志聚合结果可视化查看
52 1
|
2月前
|
分布式计算 资源调度 Hadoop
Hadoop-05-Hadoop集群 集群WordCount 超详细 真正的分布式计算 上传HDFS MapReduce计算 YRAN查看任务 上传计算下载查看
Hadoop-05-Hadoop集群 集群WordCount 超详细 真正的分布式计算 上传HDFS MapReduce计算 YRAN查看任务 上传计算下载查看
59 1
|
3月前
|
消息中间件 分布式计算 Java
Linux环境下 java程序提交spark任务到Yarn报错
Linux环境下 java程序提交spark任务到Yarn报错
51 5
|
3月前
|
SQL 机器学习/深度学习 分布式计算
Spark适合处理哪些任务?
【9月更文挑战第1天】Spark适合处理哪些任务?
218 3
|
4月前
|
存储 分布式计算 供应链
Spark在供应链核算中应用问题之通过Spark UI进行任务优化如何解决
Spark在供应链核算中应用问题之通过Spark UI进行任务优化如何解决
|
5月前
|
分布式计算 监控 Serverless
E-MapReduce Serverless Spark 版测评
E-MapReduce Serverless Spark 版测评
11609 10
|
5月前
|
分布式计算 Serverless Spark
【开发者评测】E-MapReduce Serverless Spark获奖名单
E-MapReduce Serverless Spark获奖名单正式公布!
188 1

相关实验场景

更多