EMR Serverless Spark 实践教程 | 通过 Apache Airflow 使用 Livy Operator 提交任务

本文涉及的产品
EMR Serverless StarRocks,5000CU*H 48000GB*H
简介: Apache Airflow 是一个强大的工作流程自动化和调度工具,它允许开发者编排、计划和监控数据管道的执行。EMR Serverless Spark 为处理大规模数据处理任务提供了一个无服务器计算环境。本文为您介绍如何通过 Apache Airflow 的 Livy Operator 实现自动化地向 EMR Serverless Spark 提交任务,以实现任务调度和执行的自动化,帮助您更有效地管理数据处理任务。

Apache Airflow 是一个强大的工作流程自动化和调度工具,它允许开发者编排、计划和监控数据管道的执行。EMR Serverless Spark 为处理大规模数据处理任务提供了一个无服务器计算环境。本文为您介绍如何通过 Apache Airflow 的 Livy Operator 实现自动化地向 EMR Serverless Spark 提交任务,以实现任务调度和执行的自动化,帮助您更有效地管理数据处理任务。


背景信息

Apache Livy 通过 REST 接口与 Spark 进行交互,极大简化了 Spark 和应用程序服务器之间的通信复杂度。关于 Livy API,请参见REST API


前提条件


操作步骤

步骤一:创建 Gateway 及访问 Token

  1. 创建 Gateway。
  1. 进入 Compute 页面。
  1. 登录E-MapReduce控制台
  2. 在左侧导航栏,选择EMR Serverless > Spark
  3. Spark页面,单击目标工作空间名称。
  4. EMR Serverless Spark页面,单击左侧导航栏中的Compute
  1. Compute页面,单击Gateway
  2. 单击创建Gateway
  3. 在创建Gateway页面,输入名称(例如,Livy-gateway),单击创建


  1. 创建Token。
  1. Gateway页面,单击Livy-gateway操作列的Token管理
  2. 单击创建Token
  3. 创建Token对话框中,输入名称(例如,Livy-token),单击确定
  4. 复制Token信息。


重要
Token创建完成后,请务必立即复制新Token的信息,后续不支持查看。如果您的Token过期或遗失,请选择新建Token或重置Token。


步骤二:配置 Apache Airflow

  1. 执行以下命令,在Apache Airflow环境中安装Apache Livy。
pip install apache-airflow-providers-apache-livy


  1. 添加Connection。
  • UI 方式

在Airflow中找到默认为livy_default的Connection,并对其信息进行修改;或者您也可以在Airflow Web页面手动添加Connection,详情请参见创建Connection


涉及以下信息:

  • Host:填写为Gateway中的Endpoint信息。
  • Schema:填写为https
  • Extra:填写JSON字符串,x-acs-spark-livy-token为您前一个步骤中复制的Token信息。
{
  "x-acs-spark-livy-token": "6ac**********kfu"
}


  • CLI 方式

通过Airflow CLI执行相应命令来建立Connection,详情请参见创建Connection

airflow connections add 'livy_default' \
    --conn-json '{
        "conn_type": "livy",
        "host": "pre-emr-spark-livy-gateway-cn-hangzhou.data.aliyun.com/api/v1/workspace/w-xxxxxxx/livycompute/lc-xxxxxxx",   # Gateway中的Endpoint信息。
        "schema": "https",
        "extra": {
            "x-acs-spark-livy-token": "6ac**********kfu"  # 为您前一个步骤中复制的Token信息。
        }
    }'



步骤三:DAG 示例

Airflow的DAG(Directed Acyclic Graph)定义允许您声明任务执行的方式,以下是通过Airflow使用Livy Operator执行Spark任务的示例。


从阿里云OSS获取并执行Python脚本文件。

from datetime import timedelta, datetime
from airflow import DAG
from airflow.providers.apache.livy.operators.livy import LivyOperator
default_args = {
    'owner': 'aliyun',
    'depends_on_past': False,
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
}
# Initiate DAG
livy_operator_sparkpi_dag = DAG(
    dag_id="livy_operator_sparkpi_dag",
    default_args=default_args,
    schedule_interval=None,
    start_date=datetime(2024, 5, 20),
    tags=['example', 'spark', 'livy'],
    catchup=False
)
# define livy task with LivyOperator
# 请根据实际情况替换file内容。
livy_sparkpi_submit_task = LivyOperator(
    file="oss://<YourBucket>/jars/spark-examples_2.12-3.3.1.jar",
    class_name="org.apache.spark.examples.SparkPi",
    args=['1000'],
    driver_memory="1g",
    driver_cores=1,
    executor_memory="1g",
    executor_cores=2,
    num_executors=1,
    name="LivyOperator SparkPi",
    task_id="livy_sparkpi_submit_task",
    dag=livy_operator_sparkpi_dag,
)
livy_sparkpi_submit_task


说明

file为您的 Spark 任务对应的文件路径,本文示例为上传至阿里云 OSS 上的 JAR 包spark-examples_2.12-3.3.1.jar的路径,请您根据实际情况替换。上传操作可参见简单上传


步骤四:查看提交至 EMR 的任务

  1. EMR Serverless Spark页面,单击左侧导航栏中的任务历史
  2. 任务历史开发任务页签,您可以查看提交的任务。


相关文档

在Apache Airflow中,您也可以选择使用EMR提供的EmrServerlessSparkStartJobRunOperator接口来提交EMR Serverless Spark任务,提供了一种除了Livy之外的便捷途径。更多详情,请参见通过Apache Airflow向EMR Serverless Spark提交任务


快速跳转

  1. EMR Serverless Spark 版官网:https://www.aliyun.com/product/bigdata/serverlessspark
  2. 产品控制台:https://emr-next.console.aliyun.com/
  3. 产品文档:https://help.aliyun.com/zh/emr/emr-serverless-spark/



EMR Serverless Spark 在 2024年5月正式开启公测,在公测期间可以免费使用最高 100 CU 计算资源,欢迎试用。如果您在使用 EMR Serverless Spark 版的过程中遇到任何疑问,可钉钉扫描以下二维码加入钉钉群(群号:58570004119)咨询。

相关实践学习
【文生图】一键部署Stable Diffusion基于函数计算
本实验教你如何在函数计算FC上从零开始部署Stable Diffusion来进行AI绘画创作,开启AIGC盲盒。函数计算提供一定的免费额度供用户使用。本实验答疑钉钉群:29290019867
建立 Serverless 思维
本课程包括: Serverless 应用引擎的概念, 为开发者带来的实际价值, 以及让您了解常见的 Serverless 架构模式
目录
相关文章
|
3月前
|
存储 消息中间件 Java
Apache Flink 实践问题之原生TM UI日志问题如何解决
Apache Flink 实践问题之原生TM UI日志问题如何解决
45 1
|
19天前
|
消息中间件 存储 监控
构建高可用性Apache Kafka集群:从理论到实践
【10月更文挑战第24天】随着大数据时代的到来,数据传输与处理的需求日益增长。Apache Kafka作为一个高性能的消息队列服务,因其出色的吞吐量、可扩展性和容错能力而受到广泛欢迎。然而,在构建大规模生产环境下的Kafka集群时,保证其高可用性是至关重要的。本文将从个人实践经验出发,详细介绍如何构建一个高可用性的Kafka集群,包括集群规划、节点配置以及故障恢复机制等方面。
51 4
|
3月前
|
消息中间件 分布式计算 Hadoop
Apache Flink 实践问题之Flume与Hadoop之间的物理墙问题如何解决
Apache Flink 实践问题之Flume与Hadoop之间的物理墙问题如何解决
51 3
|
18天前
|
存储 数据挖掘 数据处理
巴别时代使用 Apache Paimon 构建 Streaming Lakehouse 的实践
随着数据湖技术的发展,企业纷纷探索其优化潜力。本文分享了巴别时代使用 Apache Paimon 构建 Streaming Lakehouse 的实践。Paimon 支持流式和批处理,提供高性能、统一的数据访问和流批一体的优势。通过示例代码和实践经验,展示了如何高效处理实时数据,解决了数据一致性和故障恢复等挑战。
99 61
|
3月前
|
消息中间件 运维 Kafka
Apache Flink 实践问题之达到网卡的最大速度如何解决
Apache Flink 实践问题之达到网卡的最大速度如何解决
46 2
|
21天前
|
SQL 分布式计算 Serverless
EMR Serverless Spark:一站式全托管湖仓分析利器
本文根据2024云栖大会阿里云 EMR 团队负责人李钰(绝顶) 演讲实录整理而成
112 2
|
2月前
|
SQL 分布式计算 Serverless
阿里云 EMR Serverless Spark 版正式开启商业化
阿里云 EMR Serverless Spark 版正式开启商业化,内置 Fusion Engine,100% 兼容开源 Spark 编程接口,相比于开源 Spark 性能提升300%;提供 Notebook 及 SQL 开发、调试、发布、调度、监控诊断等一站式数据开发体验!
147 3
阿里云 EMR Serverless Spark 版正式开启商业化
|
1月前
|
存储 小程序 Apache
10月26日@杭州,飞轮科技 x 阿里云举办 Apache Doris Meetup,探索保险、游戏、制造及电信领域数据仓库建设实践
10月26日,由飞轮科技与阿里云联手发起的 Apache Doris 杭州站 Meetup 即将开启!
54 0
|
3月前
|
存储 分布式计算 物联网
Apache IoTDB进行IoT相关开发实践
当今社会,物联网技术的发展带来了许多繁琐的挑战,尤其是在数据库管理系统领域,比如实时整合海量数据、处理流中的事件以及处理数据的安全性。例如,应用于智能城市的基于物联网的交通传感器可以实时生成大量的交通数据。据估计,未来5年,物联网设备的数量将达数万亿。物联网产生大量的数据,包括流数据、时间序列数据、RFID数据、传感数据等。要有效地管理这些数据,就需要使用数据库。数据库在充分处理物联网数据方面扮演着非常重要的角色。因此,适当的数据库与适当的平台同等重要。由于物联网在世界上不同的环境中运行,选择合适的数据库变得非常重要。 原创文字,IoTDB 社区可进行使用与传播 一、什么是IoTDB 我
175 9
Apache IoTDB进行IoT相关开发实践
|
3月前
|
SQL 运维 分布式计算
Apache Flink 实践问题之避免用户作业包中包含Flink的core包如何解决
Apache Flink 实践问题之避免用户作业包中包含Flink的core包如何解决
47 1
Apache Flink 实践问题之避免用户作业包中包含Flink的core包如何解决

推荐镜像

更多