EMR Serverless Spark 实践教程 | 通过 Apache Airflow 使用 Livy Operator 提交任务

简介: Apache Airflow 是一个强大的工作流程自动化和调度工具,它允许开发者编排、计划和监控数据管道的执行。EMR Serverless Spark 为处理大规模数据处理任务提供了一个无服务器计算环境。本文为您介绍如何通过 Apache Airflow 的 Livy Operator 实现自动化地向 EMR Serverless Spark 提交任务,以实现任务调度和执行的自动化,帮助您更有效地管理数据处理任务。

Apache Airflow 是一个强大的工作流程自动化和调度工具,它允许开发者编排、计划和监控数据管道的执行。EMR Serverless Spark 为处理大规模数据处理任务提供了一个无服务器计算环境。本文为您介绍如何通过 Apache Airflow 的 Livy Operator 实现自动化地向 EMR Serverless Spark 提交任务,以实现任务调度和执行的自动化,帮助您更有效地管理数据处理任务。


背景信息

Apache Livy 通过 REST 接口与 Spark 进行交互,极大简化了 Spark 和应用程序服务器之间的通信复杂度。关于 Livy API,请参见REST API


前提条件


操作步骤

步骤一:创建 Gateway 及访问 Token

  1. 创建 Gateway。
  1. 进入 Compute 页面。
  1. 登录E-MapReduce控制台
  2. 在左侧导航栏,选择EMR Serverless > Spark
  3. Spark页面,单击目标工作空间名称。
  4. EMR Serverless Spark页面,单击左侧导航栏中的Compute
  1. Compute页面,单击Gateway
  2. 单击创建Gateway
  3. 在创建Gateway页面,输入名称(例如,Livy-gateway),单击创建


  1. 创建Token。
  1. Gateway页面,单击Livy-gateway操作列的Token管理
  2. 单击创建Token
  3. 创建Token对话框中,输入名称(例如,Livy-token),单击确定
  4. 复制Token信息。


重要
Token创建完成后,请务必立即复制新Token的信息,后续不支持查看。如果您的Token过期或遗失,请选择新建Token或重置Token。


步骤二:配置 Apache Airflow

  1. 执行以下命令,在Apache Airflow环境中安装Apache Livy。
pip install apache-airflow-providers-apache-livy


  1. 添加Connection。
  • UI 方式

在Airflow中找到默认为livy_default的Connection,并对其信息进行修改;或者您也可以在Airflow Web页面手动添加Connection,详情请参见创建Connection


涉及以下信息:

  • Host:填写为Gateway中的Endpoint信息。
  • Schema:填写为https
  • Extra:填写JSON字符串,x-acs-spark-livy-token为您前一个步骤中复制的Token信息。
{
  "x-acs-spark-livy-token": "6ac**********kfu"
}


  • CLI 方式

通过Airflow CLI执行相应命令来建立Connection,详情请参见创建Connection

airflow connections add 'livy_default' \
    --conn-json '{
        "conn_type": "livy",
        "host": "pre-emr-spark-livy-gateway-cn-hangzhou.data.aliyun.com/api/v1/workspace/w-xxxxxxx/livycompute/lc-xxxxxxx",   # Gateway中的Endpoint信息。
        "schema": "https",
        "extra": {
            "x-acs-spark-livy-token": "6ac**********kfu"  # 为您前一个步骤中复制的Token信息。
        }
    }'



步骤三:DAG 示例

Airflow的DAG(Directed Acyclic Graph)定义允许您声明任务执行的方式,以下是通过Airflow使用Livy Operator执行Spark任务的示例。


从阿里云OSS获取并执行Python脚本文件。

from datetime import timedelta, datetime
from airflow import DAG
from airflow.providers.apache.livy.operators.livy import LivyOperator
default_args = {
    'owner': 'aliyun',
    'depends_on_past': False,
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
}
# Initiate DAG
livy_operator_sparkpi_dag = DAG(
    dag_id="livy_operator_sparkpi_dag",
    default_args=default_args,
    schedule_interval=None,
    start_date=datetime(2024, 5, 20),
    tags=['example', 'spark', 'livy'],
    catchup=False
)
# define livy task with LivyOperator
# 请根据实际情况替换file内容。
livy_sparkpi_submit_task = LivyOperator(
    file="oss://<YourBucket>/jars/spark-examples_2.12-3.3.1.jar",
    class_name="org.apache.spark.examples.SparkPi",
    args=['1000'],
    driver_memory="1g",
    driver_cores=1,
    executor_memory="1g",
    executor_cores=2,
    num_executors=1,
    name="LivyOperator SparkPi",
    task_id="livy_sparkpi_submit_task",
    dag=livy_operator_sparkpi_dag,
)
livy_sparkpi_submit_task


说明

file为您的 Spark 任务对应的文件路径,本文示例为上传至阿里云 OSS 上的 JAR 包spark-examples_2.12-3.3.1.jar的路径,请您根据实际情况替换。上传操作可参见简单上传


步骤四:查看提交至 EMR 的任务

  1. EMR Serverless Spark页面,单击左侧导航栏中的任务历史
  2. 任务历史开发任务页签,您可以查看提交的任务。


相关文档

在Apache Airflow中,您也可以选择使用EMR提供的EmrServerlessSparkStartJobRunOperator接口来提交EMR Serverless Spark任务,提供了一种除了Livy之外的便捷途径。更多详情,请参见通过Apache Airflow向EMR Serverless Spark提交任务


快速跳转

  1. EMR Serverless Spark 版官网:https://www.aliyun.com/product/bigdata/serverlessspark
  2. 产品控制台:https://emr-next.console.aliyun.com/
  3. 产品文档:https://help.aliyun.com/zh/emr/emr-serverless-spark/



EMR Serverless Spark 在 2024年5月正式开启公测,在公测期间可以免费使用最高 100 CU 计算资源,欢迎试用。如果您在使用 EMR Serverless Spark 版的过程中遇到任何疑问,可钉钉扫描以下二维码加入钉钉群(群号:58570004119)咨询。

目录
相关文章
|
11月前
|
SQL 分布式计算 Serverless
鹰角网络:EMR Serverless Spark 在《明日方舟》游戏业务的应用
鹰角网络为应对游戏业务高频活动带来的数据潮汐、资源弹性及稳定性需求,采用阿里云 EMR Serverless Spark 替代原有架构。迁移后实现研发效率提升,支持业务快速发展、计算效率提升,增强SLA保障,稳定性提升,降低运维成本,并支撑全球化数据架构部署。
1225 56
鹰角网络:EMR Serverless Spark 在《明日方舟》游戏业务的应用
|
11月前
|
分布式计算 运维 搜索推荐
立马耀:通过阿里云 Serverless Spark 和 Milvus 构建高效向量检索系统,驱动个性化推荐业务
蝉妈妈旗下蝉选通过迁移到阿里云 Serverless Spark 及 Milvus,解决传统架构性能瓶颈与运维复杂性问题。新方案实现离线任务耗时减少40%、失败率降80%,Milvus 向量检索成本降低75%,支持更大规模数据处理,查询响应提速。
584 57
|
6月前
|
SQL 人工智能 数据挖掘
Apache Doris 4.0 AI 能力揭秘(二):为企业级应用而生的 AI 函数设计与实践
Apache Doris 4.0 原生集成 LLM 函数,将大语言模型能力深度融入 SQL 引擎,实现文本处理智能化与数据分析一体化。通过十大函数,支持智能客服、内容分析、金融风控等场景,提升实时决策效率。采用资源池化管理,保障数据一致性,降低传输开销,毫秒级完成 AI 分析。结合缓存复用、并行执行与权限控制,兼顾性能、成本与安全,推动数据库向 AI 原生演进。
637 0
Apache Doris 4.0 AI 能力揭秘(二):为企业级应用而生的 AI 函数设计与实践
|
7月前
|
SQL 存储 运维
Apache Doris 在菜鸟的大规模湖仓业务场景落地实践
本文介绍了 Apache Doris 在菜鸟的大规模落地的实践经验,菜鸟为什么选择 Doris,以及 Doris 如何在菜鸟从 0 开始,一步步的验证、落地,到如今上万核的规模,服务于各个业务线,Doris 已然成为菜鸟 OLAP 数据分析的最优选型。
448 2
Apache Doris 在菜鸟的大规模湖仓业务场景落地实践
|
DataWorks 数据挖掘 Serverless
阿里云EMR Serverless StarRocks 内容合集
阿里云 EMR StarRocks 提供存算分离架构,支持实时湖仓分析,适用于多种 OLAP 场景。结合 Paimon 与 Flink,助力企业高效处理海量数据,广泛应用于游戏、教育、生活服务等领域,显著提升数据分析效率与业务响应速度。
500 0
|
11月前
|
存储 运维 Serverless
千万级数据秒级响应!碧桂园基于 EMR Serverless StarRocks 升级存算分离架构实践
碧桂园服务通过引入 EMR Serverless StarRocks 存算分离架构,解决了海量数据处理中的资源利用率低、并发能力不足等问题,显著降低了硬件和运维成本。实时查询性能提升8倍,查询出错率减少30倍,集群数据 SLA 达99.99%。此次技术升级不仅优化了用户体验,还结合AI打造了“一看”和“—问”智能场景助力精准决策与风险预测。
1045 69
|
9月前
|
人工智能 分布式计算 DataWorks
一体系数据平台的进化:基于阿里云 EMR Serverless Spark 的持续演进
本文介绍了一体系汽配供应链平台如何借助阿里云EMR Serverless Spark实现从传统Hadoop平台向云原生架构的迁移。通过融合高质量零部件供应与创新互联网科技,一体系利用EMR Serverless Spark和DataWorks构建高效数据分析体系,解决大规模数据处理瓶颈。方案涵盖实时数据集成、Lakehouse搭建、数仓分层设计及BI/ML应用支持,显著提升数据处理性能与业务响应速度,降低运维成本,为数字化转型奠定基础。最终实现研发效率提升、运维压力减轻,并推动AI技术深度整合,迈向智能化云原生数据平台。
310 4
|
9月前
|
分布式计算 运维 监控
Fusion 引擎赋能:流利说如何用阿里云 Serverless Spark 实现数仓计算加速
本文介绍了流利说与阿里云合作,利用EMR Serverless Spark优化数据处理的全过程。流利说是科技驱动的教育公司,通过AI技术提升用户英语水平。原有架构存在资源管理、成本和性能等痛点,采用EMR Serverless Spark后,实现弹性资源管理、按需计费及性能优化。方案涵盖数据采集、存储、计算到查询的完整能力,支持多种接入方式与高效调度。迁移后任务耗时减少40%,失败率降低80%,成本下降30%。未来将深化合作,探索更多行业解决方案。
670 1
|
存储 安全 数据挖掘
天翼云:Apache Doris + Iceberg 超大规模湖仓一体实践
天翼云基于 Apache Doris 成功落地项目已超 20 个,整体集群规模超 50 套,部署节点超 3000 个,存储容量超 15PB
844 2
天翼云:Apache Doris + Iceberg 超大规模湖仓一体实践

推荐镜像

更多