Spark in action on Kubernetes - Spark Operator的原理解析

本文涉及的产品
容器镜像服务 ACR,镜像仓库100个 不限时长
简介: 在上篇文章中,向大家介绍了如何使用Spark Operator在kubernetes集群上面提交一个计算作业。今天我们会继续使用上篇文章中搭建的Playgroud进行调试与解析,帮助大家更深入的理解Spark Operator的工作原理。

前言

在上篇文章中,向大家介绍了如何使用Spark Operator在kubernetes集群上面提交一个计算作业。今天我们会继续使用上篇文章中搭建的Playground进行调试与解析,帮助大家更深入的理解Spark Operator的工作原理。所以如果没有浏览过上篇文章的同学,可以通过传送门直达,先配置好Playground的环境。

Spark Operator的内部实现

在深入解析Spark Operator之前,我们先补充一些关于kubernetes operator的知识。2018年可以说是kubernetes operator泛滥的一年,各种operator如雨后春笋般出现。operator是扩展kubernetes以及与kubernetes集成的最佳方式之一。在kubernetes的设计理念中,有很重要的一条就是进行了抽象,比如对存储进行抽象、对应用负载进行抽象、对接入层进行抽象等等。每个抽象又对应了各自生命周期管理的controller,开发者提交的Yaml实际上是对抽象终态的描述,而controller会监听抽象的变化、解析并进行处理,最终尝试将状态修正到终态。

4dc0e692f9af92652fffdeb77a19029a.png

那么对于在kubernetes中未定义的抽象该如何处理呢,答案就是operator。一个标准operator通常包含如下几个部分:1. CRD抽象的定义,负责描述抽象所能包含的功能。 2.CRD Controller ,负责解析CRD定义的内容以及生命周期的管理。3.clent-go的SDK,负责提供代码集成时使用的SDK。

3fcd091178b3e42fa2cf48e8567a6520.png

有了这个知识储备,那么我们回过头来看Spark Operator的代码,结构基本就比较明晰了。核心的代码逻辑都在pkg下,其中apis下面主要是定义了不同版本的API;client目录下主要是自动生成的client-go的SDK;crd目录下主要是定义的两个自定义资源sparkapplication和scheduledsparkapplication的结构。controller目录下主要定义的就是这个operator的生命周期管理的逻辑;config目录下主要处理spark config的转换。了解一个Operator能力最快捷的方式,就是查看CRD的定义。在Spark Operator中定义了sparkapplication和scheduledsparkapplication两个CRD,他们之间有什么区别呢?

sparkapplication 是对常规spark任务的抽象,作业是单次运行的,作业运行完毕后,所有的Pod会进入Succeed或者Failed的状态。而scheduledsparkapplication是对离线定时任务的一种抽象,开发者可以在scheduledsparkapplication中定义类似crontab的任务,实现spark离线任务的周期性定时调度。

image.png

上面这张图是Spark中kubernetes的集成图,也就是说当我们通过spark-submit提交作业的时候,会自动生成driver pod与exector pods。那么引入了Spark Operator后,这个流程变成了什么呢?

func (c *Controller) submitSparkApplication(app *v1beta1.SparkApplication) *v1beta1.SparkApplication {

    // prometheus的监控指标的暴露
    appToSubmit := app.DeepCopy()
    if appToSubmit.Spec.Monitoring != nil && appToSubmit.Spec.Monitoring.Prometheus != nil {
        if err := configPrometheusMonitoring(appToSubmit, c.kubeClient); err != nil {
            glog.Error(err)
        }
    }

  // 将CRD中的定义转变为spark-submit的命令
    submissionCmdArgs, err := buildSubmissionCommandArgs(appToSubmit)
    if err != nil {
        app.Status = v1beta1.SparkApplicationStatus{
            AppState: v1beta1.ApplicationState{
                State:        v1beta1.FailedSubmissionState,
                ErrorMessage: err.Error(),
            },
            SubmissionAttempts:        app.Status.SubmissionAttempts + 1,
            LastSubmissionAttemptTime: metav1.Now(),
        }
        return app
    }

    // 在operator容器内通过spark-submit提交作业
    submitted, err := runSparkSubmit(newSubmission(submissionCmdArgs, appToSubmit))
    if err != nil {
        app.Status = v1beta1.SparkApplicationStatus{
            AppState: v1beta1.ApplicationState{
                State:        v1beta1.FailedSubmissionState,
                ErrorMessage: err.Error(),
            },
            SubmissionAttempts:        app.Status.SubmissionAttempts + 1,
            LastSubmissionAttemptTime: metav1.Now(),
        }
        c.recordSparkApplicationEvent(app)
        glog.Errorf("failed to run spark-submit for SparkApplication %s/%s: %v", app.Namespace, app.Name, err)
        return app
    }
    
    // 因为Pod的状态也会被Spark Operator进行观测,因此driver pod宕掉会被重新拉起
    // 这是和直接跑spark-submit的一大区别,提供了故障恢复的能力。
    if !submitted {
        // The application may not have been submitted even if err == nil, e.g., when some
        // state update caused an attempt to re-submit the application, in which case no
        // error gets returned from runSparkSubmit. If this is the case, we simply return.
        return app
    }

    glog.Infof("SparkApplication %s/%s has been submitted", app.Namespace, app.Name)
    app.Status = v1beta1.SparkApplicationStatus{
        AppState: v1beta1.ApplicationState{
            State: v1beta1.SubmittedState,
        },
        SubmissionAttempts:        app.Status.SubmissionAttempts + 1,
        ExecutionAttempts:         app.Status.ExecutionAttempts + 1,
        LastSubmissionAttemptTime: metav1.Now(),
    }
    c.recordSparkApplicationEvent(app)

    // 通过service暴露spark-ui
    service, err := createSparkUIService(app, c.kubeClient)
    if err != nil {
        glog.Errorf("failed to create UI service for SparkApplication %s/%s: %v", app.Namespace, app.Name, err)
    } else {
        app.Status.DriverInfo.WebUIServiceName = service.serviceName
        app.Status.DriverInfo.WebUIPort = service.nodePort
        // Create UI Ingress if ingress-format is set.
        if c.ingressURLFormat != "" {
            ingress, err := createSparkUIIngress(app, *service, c.ingressURLFormat, c.kubeClient)
            if err != nil {
                glog.Errorf("failed to create UI Ingress for SparkApplication %s/%s: %v", app.Namespace, app.Name, err)
            } else {
                app.Status.DriverInfo.WebUIIngressAddress = ingress.ingressURL
                app.Status.DriverInfo.WebUIIngressName = ingress.ingressName
            }
        }
    }
    return app
}

其实到此,我们就已经基本了解Spark Operator做的事情了,首先定义了两种不同的CRD对象,分别对应普通的计算任务与定时周期性的计算任务,然后解析CRD的配置文件,拼装成为spark-submit的命令,通过prometheus暴露监控数据采集接口,创建Service提供spark-ui的访问。然后通过监听Pod的状态,不断回写更新CRD对象,实现了spark作业任务的生命周期管理。

Spark Operator的任务状态机

当我们了解了Spark Operator的设计思路和基本流程后,还需要深入了解的就是sparkapplication的状态都包含哪些,他们之间是如何进行转换的,因为这是Spark Operator对于生命周期管理增强最重要的部分。

image.png

一个Spark的作业任务可以通过上述的状态机转换图进行表示,一个正常的作业任务经历如下几个状态:

New -> Submitted -> Running -> Succeeding -> Completed

而当任务失败的时候会进行重试,若重试超过最大重试次数则会失败。也就是说如果在任务的执行过程中,由于资源、调度等因素造成Pod被驱逐或者移除,Spark Operator都会通过自身的状态机状态转换进行重试。

Spark Operator的状态排查

我们已经知道了Spark Operator最核心的功能就是将CRD的配置转换为spark-submit的命令,那么当一个作业运行不预期的时候,我们该如何判断是哪一层出现的问题呢?首先我们要判断的就是spark-submit时所生成的参数是否是预期的,因为CRD的Yaml配置虽然可以增强表达能力,但是提高了配置的难度与出错的可能性。

func runSparkSubmit(submission *submission) (bool, error) {
    sparkHome, present := os.LookupEnv(sparkHomeEnvVar)
    if !present {
        glog.Error("SPARK_HOME is not specified")
    }
    var command = filepath.Join(sparkHome, "/bin/spark-submit")

    cmd := execCommand(command, submission.args...)
    glog.V(2).Infof("spark-submit arguments: %v", cmd.Args)
    if _, err := cmd.Output(); err != nil {
        var errorMsg string
        if exitErr, ok := err.(*exec.ExitError); ok {
            errorMsg = string(exitErr.Stderr)
        }
        // The driver pod of the application already exists.
        if strings.Contains(errorMsg, podAlreadyExistsErrorCode) {
            glog.Warningf("trying to resubmit an already submitted SparkApplication %s/%s", submission.namespace, submission.name)
            return false, nil
        }
        if errorMsg != "" {
            return false, fmt.Errorf("failed to run spark-submit for SparkApplication %s/%s: %s", submission.namespace, submission.name, errorMsg)
        }
        return false, fmt.Errorf("failed to run spark-submit for SparkApplication %s/%s: %v", submission.namespace, submission.name, err)
    }

    return true, nil
}

默认情况下Spark Operator会通过glog level=2等级对外输出每次作业提交后转换的提交命令。而默认情况下,glog的level即为2,因此通过检查Spark Operator的Pod日志可以协助开发者快速排查问题。此外在sparkapplication上面也会通过event的方式进行状态的记录,上述状态机之间的转换都会通过event的方式体现在sparkapplication的对象上。掌握这两种方式进行问题排查,可以节省大量排错时间。

最后

使用Spark Operator是在kubernetes上实践spark的最佳方式,和传统的spark-submit相比提供了更多的故障恢复与可靠性保障,并且提供了监控、日志、UI等能力的集成与支持。在下一篇中,会为大家介绍在kubernetes集群中,提交spark作业时的如何使用外部存储存储的最佳实践。

目录
相关文章
|
7月前
|
安全 算法 网络协议
解析:HTTPS通过SSL/TLS证书加密的原理与逻辑
HTTPS通过SSL/TLS证书加密,结合对称与非对称加密及数字证书验证实现安全通信。首先,服务器发送含公钥的数字证书,客户端验证其合法性后生成随机数并用公钥加密发送给服务器,双方据此生成相同的对称密钥。后续通信使用对称加密确保高效性和安全性。同时,数字证书验证服务器身份,防止中间人攻击;哈希算法和数字签名确保数据完整性,防止篡改。整个流程保障了身份认证、数据加密和完整性保护。
|
6月前
|
人工智能 分布式计算 调度
打破资源边界、告别资源浪费:ACK One 多集群Spark和AI作业调度
ACK One多集群Spark作业调度,可以帮助您在不影响集群中正在运行的在线业务的前提下,打破资源边界,根据各集群实际剩余资源来进行调度,最大化您多集群中闲置资源的利用率。
|
6月前
|
机器学习/深度学习 数据可视化 PyTorch
深入解析图神经网络注意力机制:数学原理与可视化实现
本文深入解析了图神经网络(GNNs)中自注意力机制的内部运作原理,通过可视化和数学推导揭示其工作机制。文章采用“位置-转移图”概念框架,并使用NumPy实现代码示例,逐步拆解自注意力层的计算过程。文中详细展示了从节点特征矩阵、邻接矩阵到生成注意力权重的具体步骤,并通过四个类(GAL1至GAL4)模拟了整个计算流程。最终,结合实际PyTorch Geometric库中的代码,对比分析了核心逻辑,为理解GNN自注意力机制提供了清晰的学习路径。
441 7
深入解析图神经网络注意力机制:数学原理与可视化实现
|
6月前
|
机器学习/深度学习 缓存 自然语言处理
深入解析Tiktokenizer:大语言模型中核心分词技术的原理与架构
Tiktokenizer 是一款现代分词工具,旨在高效、智能地将文本转换为机器可处理的离散单元(token)。它不仅超越了传统的空格分割和正则表达式匹配方法,还结合了上下文感知能力,适应复杂语言结构。Tiktokenizer 的核心特性包括自适应 token 分割、高效编码能力和出色的可扩展性,使其适用于从聊天机器人到大规模文本分析等多种应用场景。通过模块化设计,Tiktokenizer 确保了代码的可重用性和维护性,并在分词精度、处理效率和灵活性方面表现出色。此外,它支持多语言处理、表情符号识别和领域特定文本处理,能够应对各种复杂的文本输入需求。
728 6
深入解析Tiktokenizer:大语言模型中核心分词技术的原理与架构
|
6月前
|
传感器 人工智能 监控
反向寻车系统怎么做?基本原理与系统组成解析
本文通过反向寻车系统的核心组成部分与技术分析,阐述反向寻车系统的工作原理,适用于适用于商场停车场、医院停车场及火车站停车场等。如需获取智慧停车场反向寻车技术方案前往文章最下方获取,如有项目合作及技术交流欢迎私信作者。
383 2
|
7月前
|
Java 数据库 开发者
详细介绍SpringBoot启动流程及配置类解析原理
通过对 Spring Boot 启动流程及配置类解析原理的深入分析,我们可以看到 Spring Boot 在启动时的灵活性和可扩展性。理解这些机制不仅有助于开发者更好地使用 Spring Boot 进行应用开发,还能够在面对问题时,迅速定位和解决问题。希望本文能为您在 Spring Boot 开发过程中提供有效的指导和帮助。
528 12
|
7月前
|
开发框架 监控 JavaScript
解锁鸿蒙装饰器:应用、原理与优势全解析
ArkTS提供了多维度的状态管理机制。在UI开发框架中,与UI相关联的数据可以在组件内使用,也可以在不同组件层级间传递,比如父子组件之间、爷孙组件之间,还可以在应用全局范围内传递或跨设备传递。
148 2
|
6月前
|
负载均衡 JavaScript 前端开发
分片上传技术全解析:原理、优势与应用(含简单实现源码)
分片上传通过将大文件分割成多个小的片段或块,然后并行或顺序地上传这些片段,从而提高上传效率和可靠性,特别适用于大文件的上传场景,尤其是在网络环境不佳时,分片上传能有效提高上传体验。 博客不应该只有代码和解决方案,重点应该在于给出解决方案的同时分享思维模式,只有思维才能可持续地解决问题,只有思维才是真正值得学习和分享的核心要素。如果这篇博客能给您带来一点帮助,麻烦您点个赞支持一下,还可以收藏起来以备不时之需,有疑问和错误欢迎在评论区指出~
|
6月前
|
算法 测试技术 C语言
深入理解HTTP/2:nghttp2库源码解析及客户端实现示例
通过解析nghttp2库的源码和实现一个简单的HTTP/2客户端示例,本文详细介绍了HTTP/2的关键特性和nghttp2的核心实现。了解这些内容可以帮助开发者更好地理解HTTP/2协议,提高Web应用的性能和用户体验。对于实际开发中的应用,可以根据需要进一步优化和扩展代码,以满足具体需求。
523 29
|
6月前
|
前端开发 数据安全/隐私保护 CDN
二次元聚合短视频解析去水印系统源码
二次元聚合短视频解析去水印系统源码
166 4

相关产品

  • 容器服务Kubernetes版
  • 推荐镜像

    更多