看看airflow怎样调度python写的spark任务吧

简介: 看看airflow怎样调度python写的spark任务吧

⭐️ Apache Airflow

Apache Airflow 是一个开源的工作流自动化工具,它用于调度和管理复杂的数据工作流。Airflow 的原理基于有向无环图(DAG)的概念,它通过编写和组织任务的有向图来描述工作流程。

⭐️ BashOperator

Airflow 使用 BashOperator 在 Bash shell 中执行命令。

run_this = BashOperator(
    task_id="run_after_loop",
    bash_command="echo 1",
)

⭐️ airflow 调度 spark

airflow 就是通过 BashOperator 来调度 spark 任务的。

这里是一个简单的示例,展示了如何使用Python编写一个Spark任务,并使用Airflow进行调度。

用python写spark任务之前要安装 pyspark

pip install pyspark -i http://mirrors.aliyun.com/pypi/simple/ --trusted-host mirrors.aliyun.com

首先,我们创建一个Python脚本来运行Spark任务,假设我们的任务是对一个数据集进行简单的 Word Count。

输入文本文件的内容如下:

hello world hello world hi java hi python

我们将 Spark 任务脚本命名为 spark_word_cnt.py:

import time
from pyspark.sql import SparkSession
file_path = '/opt/spark/'
def word_count(input_file, output_file):
    # 创建SparkSession
    spark = SparkSession.builder \
        .appName("WordCount") \
        .getOrCreate()
    # 读取输入文件
    lines = spark.read.text(input_file).rdd.map(lambda r: r[0])
    # 切分每行文本为单词
    words = lines.flatMap(lambda x: x.split(' '))
    # 计数每个单词出现的次数
    word_counts = words.map(lambda x: (x, 1)).reduceByKey(lambda x, y: x + y)
    # 将结果保存到输出文件
    word_counts.saveAsTextFile(output_file)
    # 关闭SparkSession
    spark.stop()
if __name__ == "__main__":
    input_file = f"{file_path}input.txt"
    output_file = f"{file_path}output{time.time()}"
    word_count(input_file, output_file)

接下来,我们需要创建一个Airflow DAG来调度这个Spark任务。假设我们的DAG命名为 spark_wc.py:

from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.utils.dates import days_ago
from datetime import timedelta
file_path = '/opt/spark/'
default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': days_ago(1),
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
}
dag = DAG(
    'spark_word_count_dag',
    default_args=default_args,
    description='A simple Spark Word Count DAG',
    schedule="0 */1 * * *"
)
spark_task = BashOperator(
    task_id='spark_task',
    bash_command=f'cd {file_path}bin/ && ./spark-submit {file_path}spark_word_cnt.py',
    dag=dag,
)
spark_task

在这个示例中,我们创建了一个Airflow DAG,其中包含一个BashOperator任务,用于运行我们之前编写的Spark任务。

file_path 需要根据情况替换成自己的路径。

Airflow UI的结果如下图所示

详细运行状况如下

调度的结果会在指定的目录下每分钟生产一个 Word Count 结果

本例为演示用,所以 Word Count 结果都是相同的

笔者水平有限,若有不对的地方欢迎评论指正!

相关文章
|
7天前
|
存储 对象存储 Python
Python|玩转 Asyncio 任务处理(1)
Python|玩转 Asyncio 任务处理(1)
21 5
|
4天前
|
SQL 分布式计算 大数据
MaxCompute操作报错合集之使用spark.sql执行rename分区操作,遇到任务报错退出的情况,该怎么办
MaxCompute是阿里云提供的大规模离线数据处理服务,用于大数据分析、挖掘和报表生成等场景。在使用MaxCompute进行数据处理时,可能会遇到各种操作报错。以下是一些常见的MaxCompute操作报错及其可能的原因与解决措施的合集。
|
12天前
|
运维 监控 API
自动化运维实践指南:Python脚本优化服务器管理任务
本文探讨了Python在自动化运维中的应用,介绍了使用Python脚本优化服务器管理的四个关键步骤:1) 安装必备库如paramiko、psutil和requests;2) 使用paramiko进行远程命令执行;3) 利用psutil监控系统资源;4) 结合requests自动化软件部署。这些示例展示了Python如何提升运维效率和系统稳定性。
30 8
|
9天前
|
分布式计算 运维 Serverless
通过Serverless Spark提交PySpark流任务的实践体验
EMR Serverless Spark服务是阿里云推出的一种全托管、一站式的数据计算平台,旨在简化大数据计算的工作流程,让用户更加专注于数据分析和价值提炼,而非基础设施的管理和运维。下面就跟我一起通过Serverless Spark提交PySpark流任务吧。
49 1
|
18天前
|
DataWorks 监控 API
DataWorks产品使用合集之在赋值节点上面为什么不能使用全局变量o或odps
DataWorks作为一站式的数据开发与治理平台,提供了从数据采集、清洗、开发、调度、服务化、质量监控到安全管理的全套解决方案,帮助企业构建高效、规范、安全的大数据处理体系。以下是对DataWorks产品使用合集的概述,涵盖数据处理的各个环节。
|
1天前
|
存储 API 数据库
Python|玩转 Asyncio 任务处理(2)
Python|玩转 Asyncio 任务处理(2)
8 0
|
6天前
|
分布式计算 资源调度 Java
Scala+Spark+Hadoop+IDEA实现WordCount单词计数,上传并执行任务(简单实例-下)
Scala+Spark+Hadoop+IDEA实现WordCount单词计数,上传并执行任务(简单实例-下)
14 0
|
5天前
|
机器学习/深度学习 人工智能 前端开发
Python中的模块化编程
【6月更文挑战第17天】Python模块化编程与软件架构设计的关键在于拆分任务到独立模块,提高代码的可维护性、可重用性和可扩展性。例如,学生管理系统可分解为录入、查询和删除模块。MVC和MVVM架构模式有助于组织代码,而微服务和函数式编程将在未来发展中扮演重要角色。通过示例代码,读者能学习如何实现这些概念,提升项目开发效率和质量。
153 57
|
12天前
|
测试技术 虚拟化 云计算
GitHub高赞!速通Python编程基础手册,被玩出花了!
随着云时代的来临,Python 语言越来越被程序开发人员喜欢和使用,因为其不仅简单易学,而且还有丰富的第三方程序库和相应完善的管理工具。 从命令行脚本程序到 GUI程序,从图形技术到科学计算,从软件开发到自动化测试,从云计算到虚拟化,所有这些领域都有 Python 的身影。 今天给小伙伴们分享的这份手册采用以任务为导向的编写模式,全面地介绍了 Python 编程基础及其相关知识的应用,讲解了如何利用 Python 的知识解决部分实际问题。
GitHub高赞!速通Python编程基础手册,被玩出花了!
|
2天前
|
数据挖掘 数据处理 Python
Python编程入门:从基础到实践
【6月更文挑战第26天】这篇文章引导读者逐步学习Python编程,从基础语法如变量、数据类型(整数、浮点数、字符串)到条件语句、循环(if/for/while),再到函数定义和模块导入。通过实例展示了Python在文本处理、数据分析(使用pandas)和Web开发(使用Flask)的应用。学习Python能为初学者开启更广阔的技术领域,如面向对象编程、并发和网络编程等。