Airflow【部署 01】Airflow官网Quick Start实操(一篇学会部署Airflow)

本文涉及的产品
容器服务 Serverless 版 ACK Serverless,952元额度 多规格
容器服务 Serverless 版 ACK Serverless,317元额度 多规格
简介: 【2月更文挑战第7天】Airflow【部署 01】Airflow官网Quick Start实操(一篇学会部署Airflow)

来自官网的介绍:https://airflow.apache.org/ Airflow™是一个由社区创建的平台,以编程方式编写,调度和监控工作流。这个快速入门指南将帮助您在本地机器上引导一个独立的Airflow实例。如果您按照下面的说明安装,Airflow的安装是很简单的。使用约束文件来实现可重复的安装,因此建议使用pip和约束文件。

1.环境变量设置

Airflow需要一个主目录,默认使用~/airflow,但如果您喜欢,可以设置一个不同的位置。AIRFLOW_HOME环境变量用于通知Airflow所需的位置。设置环境变量的这一步应该在安装Airflow之前完成,以便安装过程知道在哪里存储必要的文件。

export AIRFLOW_HOME=~/airflow

2.使用约束文件进行安装

官网给出的文件内容:

AIRFLOW_VERSION=2.7.2

# Extract the version of Python you have installed. If you're currently using a Python version that is not supported by Airflow, you may want to set this manually.
# See above for supported versions.
PYTHON_VERSION="$(python --version | cut -d " " -f 2 | cut -d "." -f 1-2)"

CONSTRAINT_URL="https://raw.githubusercontent.com/apache/airflow/constraints-${AIRFLOW_VERSION}/constraints-${PYTHON_VERSION}.txt"
# For example this would install 2.7.2 with python 3.8: https://raw.githubusercontent.com/apache/airflow/constraints-2.7.2/constraints-3.8.txt

pip install "apache-airflow==${AIRFLOW_VERSION}" --constraint "${CONSTRAINT_URL}"

本次使用虚拟环境进行安装:

# 创建并切换到airflow虚拟环境
conda create -n airflow python=3.8
conda activate airflow

创建约束文件airflowInstall.sh添加官网给出的内容:

AIRFLOW_VERSION=2.7.2
PYTHON_VERSION="$(python --version | cut -d " " -f 2 | cut -d "." -f 1-2)"
CONSTRAINT_URL="https://raw.githubusercontent.com/apache/airflow/constraints-${AIRFLOW_VERSION}/constraints-${PYTHON_VERSION}.txt"
pip install "apache-airflow==${AIRFLOW_VERSION}" --constraint "${CONSTRAINT_URL}"

执行文件即可。也可以使用pip进行安装:

pip install "apache-airflow==2.7.2"

查询版本:

airflow version

3.启动单机版

3.1 快速启动

该命令初始化数据库、创建用户并启动所有组件。

airflow standalone

# 启动成功标志
standalone | Airflow is ready
standalone | Login with username: admin  password: ZUUNtd9ppZZTQuqy
standalone | Airflow Standalone is for development purposes only. Do not use this in production!

3.2 分步骤启动

如果您想手动运行Airflow的各个部分,而不是使用一体化的独立命令,您可以运行:

  1. 该命令用于执行数据库迁移。在使用 Airflow 之前,你需要初始化数据库结构。db migrate 命令会根据你的配置文件创建数据库表格,以便存储任务调度、任务实例、DAG(Directed Acyclic Graph,有向无环图)等信息。运行后的信息:
airflow db migrate

初始化数据库后的输出信息:

DB: sqlite:////root/airflow/airflow.db
Performing upgrade to the metadata database sqlite:////root/airflow/airflow.db
[2023-10-19T14:21:37.687+0800] {
   migration.py:213} INFO - Context impl SQLiteImpl.
[2023-10-19T14:21:37.688+0800] {
   migration.py:216} INFO - Will assume non-transactional DDL.
[2023-10-19T14:21:37.690+0800] {
   db.py:1620} INFO - Creating tables
INFO  [alembic.runtime.migration] Context impl SQLiteImpl.
INFO  [alembic.runtime.migration] Will assume non-transactional DDL.
WARNI [unusual_prefix_911b7e3bced5159145cb88698226ecde6e08c7be_example_kubernetes_executor] The example_kubernetes_executor example DAG requires the kubernetes provider. Please install it with: pip install apache-airflow[cncf.kubernetes]
WARNI [unusual_prefix_008dd7238a3787d68b758fe337b9f566c5014ba3_tutorial_taskflow_api_virtualenv] The tutorial_taskflow_api_virtualenv example DAG requires virtualenv, please install it.
WARNI [unusual_prefix_db2b4614a7fb1ba43706f0a1f2be91e808476bfa_example_python_operator] The virtalenv_python example task requires virtualenv, please install it.
WARNI [unusual_prefix_5624127e5a8d9c88ab5a41d62ecf92869309dd74_example_local_kubernetes_executor] Could not import DAGs in example_local_kubernetes_executor.py
Traceback (most recent call last):
  File "/root/anaconda3/envs/airflow/lib/python3.8/site-packages/airflow/example_dags/example_local_kubernetes_executor.py", line 37, in <module>
    from kubernetes.client import models as k8s
ModuleNotFoundError: No module named 'kubernetes'
WARNI [unusual_prefix_5624127e5a8d9c88ab5a41d62ecf92869309dd74_example_local_kubernetes_executor] Install Kubernetes dependencies with: pip install apache-airflow[cncf.kubernetes]
WARNI [unusual_prefix_f16a910b73b9eed67cbb95faa136bc7fd6c14eb6_workday] Could not import pandas. Holidays will not be considered.
Database migrating done!

2.该命令用于创建 Airflow 的用户。在这个例子中,它创建了一个名为 "admin" 的用户,具有管理员角色(Admin),并提供了一些用户信息,如名字、姓氏、电子邮件等。

airflow users create \
    --username admin \
    --firstname Peter \
    --lastname Parker \
    --role Admin \
    --email spiderman@superhero.org

airflow users create \
    --username test \
    --firstname te \
    --lastname st \
    --role Admin \
    --email testman@superhero.org

Password:
Repeat for confirmation:
[2023-10-19T15:08:26.070+0800] {
   manager.py:211} INFO - Added user %s
User "test" created with role "Admin"

3.该命令启动 Airflow 的 Web 服务器。Web 服务器提供了一个用户界面,你可以通过浏览器访问。--port 8080 选项指定了 Web 服务器监听的端口号,这里是 8080。你可以通过访问 http://localhost:8080 来打开 Airflow Web UI。

airflow webserver --port 8080 -D

4.该命令启动 Airflow 的调度器。调度器负责按照你的 DAG(工作流)定义定期运行任务。它会检查定义的任务调度时间,然后触发相应的任务实例。调度器是 Airflow 中关键的组件之一,确保任务按照计划执行。

airflow scheduler -D

-D: 表示以守护进程(daemon)模式运行。以守护进程模式运行意味着该进程将在后台持续运行,而不占用当前终端。

3.3 启动后

在运行这些命令后,Airflow将创建$AIRFLOW_HOME文件夹,并创建默认值为Airflow .cfg的文件,这将使您快速运行。您可以使用环境变量覆盖默认值,请参见配置参考:https://airflow.apache.org/docs/apache-airflow/stable/configurations-ref.html。您可以在$AIRFLOW_HOME/airflow.cfg中检查该文件,或者通过Admin->Configuration菜单中的UI检查该文件。如果由systemd启动webserver的PID将存储在$AIRFLOW_HOME/airflow-webserver.pid/run/airflow/webserver.pid文件中。

files-0.jpg

3.4 服务启动停止脚本

感谢 https://blog.csdn.net/weixin_45417821/article/details/128729413 的分享,脚本airflow-service.sh内容:

#!/bin/bash
case $1 in
"start"){
   
    echo " --------start airflow-------"
    conda activate airflow;airflow webserver -p 8080 -D;airflow scheduler -D;conda deactivate
};;
"stop"){
   
    echo " --------stop airflow-------"
    ps -ef | egrep 'scheduler|airflow-webserver' | grep -v grep | awk '{print $2}' | xargs kill -15
};;
esac

脚本的执行环境为非虚拟环境也就是Linux本Lin,为何要使用source进行执行小伙伴儿们可以自行学习啊:

# 启动
source ./airflow-service.sh start
# 停止
source ./airflow-service.sh stop

4.访问

4.1 登录

在浏览器中访问localhost:8080,并使用终端显示的管理员帐户详细信息登录。

login-0.jpg

4.2 测试

在主页中启用example_bash_operatorDAG。

example-0.jpg
页面的两条信息说明:

  • 开箱即用,Airflow使用SQLite数据库,由于使用此数据库后端不可能实现并行化,因此您应该很快就能适应该数据库。它与SequentialExecutor一起工作,后者只按顺序运行任务实例。虽然有很多限制,但它允许您快速启动和运行,并了解UI和命令行实用程序。
  • 当您将Airflow扩展并部署到生产环境中时,您还需要从我们在这里使用的独立命令转移到单独运行组件。您可以在生产部署中了解更多信息:https://airflow.apache.org/docs/apache-airflow/stable/administration-and-deployment/production-deployment.html

任务的详情:

example-1.jpg
下面是几个将触发几个任务实例的命令。当您运行下面的命令时,您应该能够在example_bash_operator DAG中看到作业的状态变化。

# 1.run your first task instance
airflow tasks test example_bash_operator runme_0 2015-01-01

# 执行成功标志
[2023-10-19T14:15:55.666+0800] {
   taskinstance.py:1400} INFO - Marking task as SUCCESS. dag_id=example_bash_operator, task_id=runme_0, execution_date=20150101T000000, start_date=20231019T061426, end_date=20231019T061555

# 2.run a backfill over 2 days
airflow dags backfill example_bash_operator \
    --start-date 2015-01-01 \
    --end-date 2015-01-02

# 执行成功标志
[2023-10-19T14:17:59.128+0800] {
   backfill_job_runner.py:412} INFO - [backfill progress] | finished run 2 of 2 | tasks waiting: 0 | succeeded: 10 | running: 0 | failed: 0 | skipped: 4 | deadlocked: 0 | not ready: 0
[2023-10-19T14:17:59.136+0800] {
   backfill_job_runner.py:971} INFO - Backfill done for DAG <DAG: example_bash_operator>. Exiting.

5.更新日志

  • 2024-02-21 修改启动脚本airflow-service.sh内容,及调用脚本。
相关实践学习
通过Ingress进行灰度发布
本场景您将运行一个简单的应用,部署一个新的应用用于新的发布,并通过Ingress能力实现灰度发布。
容器应用与集群管理
欢迎来到《容器应用与集群管理》课程,本课程是“云原生容器Clouder认证“系列中的第二阶段。课程将向您介绍与容器集群相关的概念和技术,这些概念和技术可以帮助您了解阿里云容器服务ACK/ACK Serverless的使用。同时,本课程也会向您介绍可以采取的工具、方法和可操作步骤,以帮助您了解如何基于容器服务ACK Serverless构建和管理企业级应用。 学习完本课程后,您将能够: 掌握容器集群、容器编排的基本概念 掌握Kubernetes的基础概念及核心思想 掌握阿里云容器服务ACK/ACK Serverless概念及使用方法 基于容器服务ACK Serverless搭建和管理企业级网站应用
目录
相关文章
|
6月前
|
SQL 分布式计算 Hadoop
Azkaban【基础 01】核心概念+特点+Web界面+架构+Job类型(一篇即可入门Azkaban工作流调度系统)
【2月更文挑战第6天】Azkaban【基础 01】核心概念+特点+Web界面+架构+Job类型(一篇即可入门Azkaban工作流调度系统)
475 0
|
消息中间件 存储 监控
五分钟快速了解Airflow工作流
简介 Airflow是一个以编程方式创作、调度和监控工作流的平台。 使用 Airflow 将工作流创作为有向无环图(DAG)任务。 Airflow 调度程序按照你指定的依赖项在一组workers上执行您的任务。同时,Airflow拥有丰富的命令行实用程序使得在DAG上进行复杂的诊断变得轻而易举。并且提供了丰富的用户界面使可视化生产中运行的工作流、监控进度和需要排查问题时变得非常容易。 当工作流被定义为代码时,它们变得更易于维护、可版本化、可测试和协作。
|
3月前
|
资源调度 运维 Devops
阿里云云效操作报错合集之yarn install时报错,是什么导致的
本合集将整理呈现用户在使用过程中遇到的报错及其对应的解决办法,包括但不限于账户权限设置错误、项目配置不正确、代码提交冲突、构建任务执行失败、测试环境异常、需求流转阻塞等问题。阿里云云效是一站式企业级研发协同和DevOps平台,为企业提供从需求规划、开发、测试、发布到运维、运营的全流程端到端服务和工具支撑,致力于提升企业的研发效能和创新能力。
|
3月前
|
分布式计算 Serverless 数据处理
EMR Serverless Spark 实践教程 | 通过 Apache Airflow 使用 Livy Operator 提交任务
Apache Airflow 是一个强大的工作流程自动化和调度工具,它允许开发者编排、计划和监控数据管道的执行。EMR Serverless Spark 为处理大规模数据处理任务提供了一个无服务器计算环境。本文为您介绍如何通过 Apache Airflow 的 Livy Operator 实现自动化地向 EMR Serverless Spark 提交任务,以实现任务调度和执行的自动化,帮助您更有效地管理数据处理任务。
204 0
|
5月前
|
监控 数据处理 调度
使用Apache Airflow进行工作流编排:技术详解与实践
【6月更文挑战第5天】Apache Airflow是开源的工作流编排平台,用Python定义复杂数据处理管道,提供直观DAGs、强大调度、丰富插件、易扩展性和实时监控。本文深入介绍Airflow基本概念、特性,阐述安装配置、工作流定义、调度监控的步骤,并通过实践案例展示如何构建数据获取、处理到存储的工作流。Airflow简化了复杂数据任务管理,适应不断发展的数据技术需求。
1101 3
|
6月前
|
存储 数据安全/隐私保护 Docker
Airflow安装
Airflow安装
139 0
|
XML 分布式计算 数据可视化
本地部署 zeppelin 0.10.1
本地部署 zeppelin 0.10.1
221 0
|
消息中间件 SQL 运维
【大数据开发运维解决方案】hadoop+kylin安装及官方cube/steam cube案例文档
对于hadoop+kylin的安装过程在上一篇文章已经详细的写了, 请读者先看完上一篇文章再看本本篇文章,本文主要大致介绍kylin官官方提供的常规批量cube创建和kafka+kylin流式构建cube(steam cube)的操作过程,具体详细过程请看官方文档。
【大数据开发运维解决方案】hadoop+kylin安装及官方cube/steam cube案例文档
|
消息中间件 Java Kafka
阿里云实时计算Flink部署运行pyflink脚本
最近有个需求需要使用py模型输出数据,本地已完成测试,需要完成在实时计算Flink上完成部署及运行
2376 2
阿里云实时计算Flink部署运行pyflink脚本
|
存储 关系型数据库 MySQL
电商项目之 Azkaban 安装(Exec 参数配置)|学习笔记
快速学习电商项目之 Azkaban 安装(Exec 参数配置)
电商项目之 Azkaban 安装(Exec 参数配置)|学习笔记