Apache Airflow 是一个强大的工作流程自动化和调度工具,它允许开发者编排、计划和监控数据管道的执行。EMR Serverless Spark 为处理大规模数据处理任务提供了一个无服务器计算环境。本文为您介绍如何通过 Apache Airflow 的 Livy Operator 实现自动化地向 EMR Serverless Spark 提交任务,以实现任务调度和执行的自动化,帮助您更有效地管理数据处理任务。
背景信息
Apache Livy 通过 REST 接口与 Spark 进行交互,极大简化了 Spark 和应用程序服务器之间的通信复杂度。关于 Livy API,请参见REST API。
前提条件
- 已安装并启动 Airflow 服务,详情请参见Installation of Airflow。
- 已创建工作空间,详情请参见创建工作空间。
操作步骤
步骤一:创建 Gateway 及访问 Token
- 创建 Gateway。
- 进入 Compute 页面。
- 登录E-MapReduce控制台。
- 在左侧导航栏,选择EMR Serverless > Spark。
- 在Spark页面,单击目标工作空间名称。
- 在EMR Serverless Spark页面,单击左侧导航栏中的Compute。
- 在Compute页面,单击Gateway。
- 单击创建Gateway。
- 在创建Gateway页面,输入名称(例如,Livy-gateway),单击创建。
- 创建Token。
- 在Gateway页面,单击Livy-gateway操作列的Token管理。
- 单击创建Token。
- 在创建Token对话框中,输入名称(例如,Livy-token),单击确定。
- 复制Token信息。
重要
Token创建完成后,请务必立即复制新Token的信息,后续不支持查看。如果您的Token过期或遗失,请选择新建Token或重置Token。
步骤二:配置 Apache Airflow
- 执行以下命令,在Apache Airflow环境中安装Apache Livy。
pip install apache-airflow-providers-apache-livy
- 添加Connection。
- UI 方式
在Airflow中找到默认为livy_default的Connection,并对其信息进行修改;或者您也可以在Airflow Web页面手动添加Connection,详情请参见创建Connection。
涉及以下信息:
- Host:填写为Gateway中的Endpoint信息。
- Schema:填写为https。
- Extra:填写JSON字符串,
x-acs-spark-livy-token
为您前一个步骤中复制的Token信息。
{ "x-acs-spark-livy-token": "6ac**********kfu" }
- CLI 方式
通过Airflow CLI执行相应命令来建立Connection,详情请参见创建Connection。
airflow connections add 'livy_default' \ --conn-json '{ "conn_type": "livy", "host": "pre-emr-spark-livy-gateway-cn-hangzhou.data.aliyun.com/api/v1/workspace/w-xxxxxxx/livycompute/lc-xxxxxxx", # Gateway中的Endpoint信息。 "schema": "https", "extra": { "x-acs-spark-livy-token": "6ac**********kfu" # 为您前一个步骤中复制的Token信息。 } }'
步骤三:DAG 示例
Airflow的DAG(Directed Acyclic Graph)定义允许您声明任务执行的方式,以下是通过Airflow使用Livy Operator执行Spark任务的示例。
从阿里云OSS获取并执行Python脚本文件。
from datetime import timedelta, datetime from airflow import DAG from airflow.providers.apache.livy.operators.livy import LivyOperator default_args = { 'owner': 'aliyun', 'depends_on_past': False, 'email_on_failure': False, 'email_on_retry': False, 'retries': 1, 'retry_delay': timedelta(minutes=5), } # Initiate DAG livy_operator_sparkpi_dag = DAG( dag_id="livy_operator_sparkpi_dag", default_args=default_args, schedule_interval=None, start_date=datetime(2024, 5, 20), tags=['example', 'spark', 'livy'], catchup=False ) # define livy task with LivyOperator # 请根据实际情况替换file内容。 livy_sparkpi_submit_task = LivyOperator( file="oss://<YourBucket>/jars/spark-examples_2.12-3.3.1.jar", class_name="org.apache.spark.examples.SparkPi", args=['1000'], driver_memory="1g", driver_cores=1, executor_memory="1g", executor_cores=2, num_executors=1, name="LivyOperator SparkPi", task_id="livy_sparkpi_submit_task", dag=livy_operator_sparkpi_dag, ) livy_sparkpi_submit_task
说明
file
为您的 Spark 任务对应的文件路径,本文示例为上传至阿里云 OSS 上的 JAR 包spark-examples_2.12-3.3.1.jar的路径,请您根据实际情况替换。上传操作可参见简单上传。
步骤四:查看提交至 EMR 的任务
- 在EMR Serverless Spark页面,单击左侧导航栏中的任务历史。
- 在任务历史的开发任务页签,您可以查看提交的任务。
相关文档
在Apache Airflow中,您也可以选择使用EMR提供的EmrServerlessSparkStartJobRunOperator
接口来提交EMR Serverless Spark任务,提供了一种除了Livy之外的便捷途径。更多详情,请参见通过Apache Airflow向EMR Serverless Spark提交任务。
快速跳转
- EMR Serverless Spark 版官网:https://www.aliyun.com/product/bigdata/serverlessspark
- 产品控制台:https://emr-next.console.aliyun.com/
- 产品文档:https://help.aliyun.com/zh/emr/emr-serverless-spark/
EMR Serverless Spark 在 2024年5月正式开启公测,在公测期间可以免费使用最高 100 CU 计算资源,欢迎试用。如果您在使用 EMR Serverless Spark 版的过程中遇到任何疑问,可钉钉扫描以下二维码加入钉钉群(群号:58570004119)咨询。