⭐️ Apache Airflow
Apache Airflow 是一个开源的工作流自动化工具,它用于调度和管理复杂的数据工作流。Airflow 的原理基于有向无环图(DAG)的概念,它通过编写和组织任务的有向图来描述工作流程。
⭐️ BashOperator
Airflow 使用 BashOperator 在 Bash shell 中执行命令。
run_this = BashOperator( task_id="run_after_loop", bash_command="echo 1", )
⭐️ airflow 调度 spark
airflow 就是通过 BashOperator 来调度 spark 任务的。
这里是一个简单的示例,展示了如何使用Python编写一个Spark任务,并使用Airflow进行调度。
用python写spark任务之前要安装 pyspark
pip install pyspark -i http://mirrors.aliyun.com/pypi/simple/ --trusted-host mirrors.aliyun.com
首先,我们创建一个Python脚本来运行Spark任务,假设我们的任务是对一个数据集进行简单的 Word Count。
输入文本文件的内容如下:
hello world hello world hi java hi python
我们将 Spark 任务脚本命名为 spark_word_cnt.py:
import time from pyspark.sql import SparkSession file_path = '/opt/spark/' def word_count(input_file, output_file): # 创建SparkSession spark = SparkSession.builder \ .appName("WordCount") \ .getOrCreate() # 读取输入文件 lines = spark.read.text(input_file).rdd.map(lambda r: r[0]) # 切分每行文本为单词 words = lines.flatMap(lambda x: x.split(' ')) # 计数每个单词出现的次数 word_counts = words.map(lambda x: (x, 1)).reduceByKey(lambda x, y: x + y) # 将结果保存到输出文件 word_counts.saveAsTextFile(output_file) # 关闭SparkSession spark.stop() if __name__ == "__main__": input_file = f"{file_path}input.txt" output_file = f"{file_path}output{time.time()}" word_count(input_file, output_file)
接下来,我们需要创建一个Airflow DAG来调度这个Spark任务。假设我们的DAG命名为 spark_wc.py:
from airflow import DAG from airflow.operators.bash_operator import BashOperator from airflow.utils.dates import days_ago from datetime import timedelta file_path = '/opt/spark/' default_args = { 'owner': 'airflow', 'depends_on_past': False, 'start_date': days_ago(1), 'retries': 1, 'retry_delay': timedelta(minutes=5), } dag = DAG( 'spark_word_count_dag', default_args=default_args, description='A simple Spark Word Count DAG', schedule="0 */1 * * *" ) spark_task = BashOperator( task_id='spark_task', bash_command=f'cd {file_path}bin/ && ./spark-submit {file_path}spark_word_cnt.py', dag=dag, ) spark_task
在这个示例中,我们创建了一个Airflow DAG,其中包含一个BashOperator任务,用于运行我们之前编写的Spark任务。
file_path 需要根据情况替换成自己的路径。
Airflow UI的结果如下图所示
详细运行状况如下
调度的结果会在指定的目录下每分钟生产一个 Word Count 结果
本例为演示用,所以 Word Count 结果都是相同的
笔者水平有限,若有不对的地方欢迎评论指正!