看看airflow怎样调度python写的spark任务吧

简介: 看看airflow怎样调度python写的spark任务吧

⭐️ Apache Airflow

Apache Airflow 是一个开源的工作流自动化工具,它用于调度和管理复杂的数据工作流。Airflow 的原理基于有向无环图(DAG)的概念,它通过编写和组织任务的有向图来描述工作流程。

⭐️ BashOperator

Airflow 使用 BashOperator 在 Bash shell 中执行命令。

run_this = BashOperator(
    task_id="run_after_loop",
    bash_command="echo 1",
)

⭐️ airflow 调度 spark

airflow 就是通过 BashOperator 来调度 spark 任务的。

这里是一个简单的示例,展示了如何使用Python编写一个Spark任务,并使用Airflow进行调度。

用python写spark任务之前要安装 pyspark

pip install pyspark -i http://mirrors.aliyun.com/pypi/simple/ --trusted-host mirrors.aliyun.com

首先,我们创建一个Python脚本来运行Spark任务,假设我们的任务是对一个数据集进行简单的 Word Count。

输入文本文件的内容如下:

hello world hello world hi java hi python

我们将 Spark 任务脚本命名为 spark_word_cnt.py:

import time
from pyspark.sql import SparkSession
file_path = '/opt/spark/'
def word_count(input_file, output_file):
    # 创建SparkSession
    spark = SparkSession.builder \
        .appName("WordCount") \
        .getOrCreate()
    # 读取输入文件
    lines = spark.read.text(input_file).rdd.map(lambda r: r[0])
    # 切分每行文本为单词
    words = lines.flatMap(lambda x: x.split(' '))
    # 计数每个单词出现的次数
    word_counts = words.map(lambda x: (x, 1)).reduceByKey(lambda x, y: x + y)
    # 将结果保存到输出文件
    word_counts.saveAsTextFile(output_file)
    # 关闭SparkSession
    spark.stop()
if __name__ == "__main__":
    input_file = f"{file_path}input.txt"
    output_file = f"{file_path}output{time.time()}"
    word_count(input_file, output_file)

接下来,我们需要创建一个Airflow DAG来调度这个Spark任务。假设我们的DAG命名为 spark_wc.py:

from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.utils.dates import days_ago
from datetime import timedelta
file_path = '/opt/spark/'
default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': days_ago(1),
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
}
dag = DAG(
    'spark_word_count_dag',
    default_args=default_args,
    description='A simple Spark Word Count DAG',
    schedule="0 */1 * * *"
)
spark_task = BashOperator(
    task_id='spark_task',
    bash_command=f'cd {file_path}bin/ && ./spark-submit {file_path}spark_word_cnt.py',
    dag=dag,
)
spark_task

在这个示例中,我们创建了一个Airflow DAG,其中包含一个BashOperator任务,用于运行我们之前编写的Spark任务。

file_path 需要根据情况替换成自己的路径。

Airflow UI的结果如下图所示

详细运行状况如下

调度的结果会在指定的目录下每分钟生产一个 Word Count 结果

本例为演示用,所以 Word Count 结果都是相同的

笔者水平有限,若有不对的地方欢迎评论指正!

相关文章
|
5月前
|
供应链 并行计算 算法
1行Python搞定高频任务!26个实用技巧解决日常+进阶需求
本文整理了26个Python极简技巧,涵盖日常高频操作与进阶玩法,助你用最少代码高效解决问题,提升编程效率。适合各阶段Python学习者参考。
202 27
|
11月前
|
人工智能 分布式计算 调度
打破资源边界、告别资源浪费:ACK One 多集群Spark和AI作业调度
ACK One多集群Spark作业调度,可以帮助您在不影响集群中正在运行的在线业务的前提下,打破资源边界,根据各集群实际剩余资源来进行调度,最大化您多集群中闲置资源的利用率。
|
数据采集 存储 监控
21个Python脚本自动执行日常任务(2)
21个Python脚本自动执行日常任务(2)
21个Python脚本自动执行日常任务(2)
|
12月前
|
数据采集 Java 数据处理
Python实用技巧:轻松驾驭多线程与多进程,加速任务执行
在Python编程中,多线程和多进程是提升程序效率的关键工具。多线程适用于I/O密集型任务,如文件读写、网络请求;多进程则适合CPU密集型任务,如科学计算、图像处理。本文详细介绍这两种并发编程方式的基本用法及应用场景,并通过实例代码展示如何使用threading、multiprocessing模块及线程池、进程池来优化程序性能。结合实际案例,帮助读者掌握并发编程技巧,提高程序执行速度和资源利用率。
621 0
|
存储 安全 数据可视化
用Python实现简单的任务自动化
本文介绍如何使用Python实现任务自动化,提高效率和准确性。通过三个实用案例展示:1. 使用`smtplib`和`schedule`库自动发送邮件提醒;2. 利用`shutil`和`os`库自动备份文件;3. 借助`requests`库自动下载网页内容。每个案例包含详细代码和解释,并附带注意事项。掌握这些技能有助于个人和企业优化流程、节约成本。
485 3
|
Python
Python中的函数是**一种命名的代码块,用于执行特定任务或计算
Python中的函数是**一种命名的代码块,用于执行特定任务或计算
237 18
|
数据采集 分布式计算 大数据
构建高效的数据管道:使用Python进行ETL任务
在数据驱动的世界中,高效地处理和移动数据是至关重要的。本文将引导你通过一个实际的Python ETL(提取、转换、加载)项目,从概念到实现。我们将探索如何设计一个灵活且可扩展的数据管道,确保数据的准确性和完整性。无论你是数据工程师、分析师还是任何对数据处理感兴趣的人,这篇文章都将成为你工具箱中的宝贵资源。
|
运维 监控 网络安全
自动化运维的崛起:如何利用Python脚本简化日常任务
【10月更文挑战第43天】在数字化时代的浪潮中,运维工作已从繁琐的手工操作转变为高效的自动化流程。本文将引导您了解如何运用Python编写脚本,以实现日常运维任务的自动化,从而提升工作效率和准确性。我们将通过一个实际案例,展示如何使用Python来自动部署应用、监控服务器状态并生成报告。文章不仅适合运维新手入门,也能为有经验的运维工程师提供新的视角和灵感。
|
运维 监控 Python
自动化运维:使用Python脚本简化日常任务
【10月更文挑战第36天】在数字化时代,运维工作的效率和准确性成为企业竞争力的关键。本文将介绍如何通过编写Python脚本来自动化日常的运维任务,不仅提高工作效率,还能降低人为错误的风险。从基础的文件操作到进阶的网络管理,我们将一步步展示Python在自动化运维中的应用,并分享实用的代码示例,帮助读者快速掌握自动化运维的核心技能。
609 3
|
调度 数据库 Python
掌握Python中的异步编程,提升I/O密集型任务的性能
掌握Python中的异步编程,提升I/O密集型任务的性能
182 0

推荐镜像

更多