[AirFlow]AirFlow使用指南一 安装与启动

本文涉及的产品
云数据库 RDS MySQL,集群系列 2核4GB
推荐场景:
搭建个人博客
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
云数据库 RDS MySQL,高可用系列 2核4GB
简介: 版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/SunnyYoona/article/details/76592793 1.
版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/SunnyYoona/article/details/76592793

1. 安装

通过pip安装:

xiaosi@yoona:~$ pip install airflow

如果速度比较慢,可以使用下面提供的源进行安装:

xiaosi@yoona:~$ pip install -i https://pypi.tuna.tsinghua.edu.cn/simple airflow

如果出现下面提示,表示你的airflow安装成功了:

Successfully installed airflow alembic croniter dill flask flask-admin flask-cache flask-login flask-swagger flask-wtf funcsigs future gitpython gunicorn jinja2 lxml markdown pandas psutil pygments python-daemon python-dateutil python-nvd3 requests setproctitle sqlalchemy tabulate thrift zope.deprecation Mako python-editor click itsdangerous Werkzeug wtforms PyYAML ordereddict gitdb2 MarkupSafe pytz numpy docutils setuptools lockfile six python-slugify idna urllib3 certifi chardet smmap2 Unidecode
Cleaning up...

安装完成之后我的默认安装在~/.local/bin目录下

2. 配置

如果不修改路径,默认的配置为~/airflow

永久修改环境变量

echo "export AIRFLOW_HOME=/home/xiaosi/opt/airflow" >> /etc/profile
source /etc/profile

为了便于操作方便,进行如下配置:

echo "export PATH=/home/xiaosi/.local/bin:$PATH" >> /etc/profile
source /etc/profile

3. 初始化

初始化数据库:

xiaosi@yoona:~$ airflow initdb
[2017-08-02 16:39:22,319] {__init__.py:57} INFO - Using executor SequentialExecutor
[2017-08-02 16:39:22,432] {driver.py:120} INFO - Generating grammar tables from /usr/lib/python2.7/lib2to3/Grammar.txt
[2017-08-02 16:39:22,451] {driver.py:120} INFO - Generating grammar tables from /usr/lib/python2.7/lib2to3/PatternGrammar.txt
DB: sqlite:////home/xiaosi/opt/airflow/airflow.db
[2017-08-02 16:39:22,708] {db.py:287} INFO - Creating tables
INFO  [alembic.runtime.migration] Context impl SQLiteImpl.
INFO  [alembic.runtime.migration] Will assume non-transactional DDL.
INFO  [alembic.runtime.migration] Running upgrade  -> e3a246e0dc1, current schema
INFO  [alembic.runtime.migration] Running upgrade e3a246e0dc1 -> 1507a7289a2f, create is_encrypted
/home/xiaosi/.local/lib/python2.7/site-packages/alembic/util/messaging.py:69: UserWarning: Skipping unsupported ALTER for creation of implicit constraint
  warnings.warn(msg)
INFO  [alembic.runtime.migration] Running upgrade 1507a7289a2f -> 13eb55f81627, maintain history for compatibility with earlier migrations
INFO  [alembic.runtime.migration] Running upgrade 13eb55f81627 -> 338e90f54d61, More logging into task_isntance
INFO  [alembic.runtime.migration] Running upgrade 338e90f54d61 -> 52d714495f0, job_id indices
INFO  [alembic.runtime.migration] Running upgrade 52d714495f0 -> 502898887f84, Adding extra to Log
INFO  [alembic.runtime.migration] Running upgrade 502898887f84 -> 1b38cef5b76e, add dagrun
INFO  [alembic.runtime.migration] Running upgrade 1b38cef5b76e -> 2e541a1dcfed, task_duration
INFO  [alembic.runtime.migration] Running upgrade 2e541a1dcfed -> 40e67319e3a9, dagrun_config
INFO  [alembic.runtime.migration] Running upgrade 40e67319e3a9 -> 561833c1c74b, add password column to user
INFO  [alembic.runtime.migration] Running upgrade 561833c1c74b -> 4446e08588, dagrun start end
INFO  [alembic.runtime.migration] Running upgrade 4446e08588 -> bbc73705a13e, Add notification_sent column to sla_miss
INFO  [alembic.runtime.migration] Running upgrade bbc73705a13e -> bba5a7cfc896, Add a column to track the encryption state of the 'Extra' field in connection
INFO  [alembic.runtime.migration] Running upgrade bba5a7cfc896 -> 1968acfc09e3, add is_encrypted column to variable table
INFO  [alembic.runtime.migration] Running upgrade 1968acfc09e3 -> 2e82aab8ef20, rename user table
INFO  [alembic.runtime.migration] Running upgrade 2e82aab8ef20 -> 211e584da130, add TI state index
INFO  [alembic.runtime.migration] Running upgrade 211e584da130 -> 64de9cddf6c9, add task fails journal table
INFO  [alembic.runtime.migration] Running upgrade 64de9cddf6c9 -> f2ca10b85618, add dag_stats table
INFO  [alembic.runtime.migration] Running upgrade f2ca10b85618 -> 4addfa1236f1, Add fractional seconds to mysql tables
INFO  [alembic.runtime.migration] Running upgrade 4addfa1236f1 -> 8504051e801b, xcom dag task indices
INFO  [alembic.runtime.migration] Running upgrade 8504051e801b -> 5e7d17757c7a, add pid field to TaskInstance
INFO  [alembic.runtime.migration] Running upgrade 5e7d17757c7a -> 127d2bf2dfa7, Add dag_id/state index on dag_run table
Done.

运行上述命令之后,会在$AIRFLOW_HOME目录下生成如下文件:

xiaosi@yoona:~/opt/airflow$ ll
总用量 88
drwxrwxr-x  2 xiaosi xiaosi  4096  82 16:39 ./
drwxrwxr-x 26 xiaosi xiaosi  4096  731 13:56 ../
-rw-rw-r--  1 xiaosi xiaosi 11424  82 16:38 airflow.cfg
-rw-r--r--  1 xiaosi xiaosi 58368  82 16:39 airflow.db
-rw-rw-r--  1 xiaosi xiaosi  1554  82 16:38 unittests.cfg

4. 修改默认数据库

找到$AIRFLOW_HOME/airflow.cfg配置文件,进行如下修改:

sql_alchemy_conn = mysql://root:root@localhost:3306/airflow

备注

数据库用户名与密码均为root,airflow使用的数据库为airflow.使用如下命令创建对应的数据库:

mysql> create database airflow;
Query OK, 1 row affected (0.00 sec)

重新初始化服务器数据库:

xiaosi@yoona:~$ airflow initdb

出现了如下错误:

xiaosi@yoona:~$ airflow initdb
Traceback (most recent call last):
  File "/home/xiaosi/.local/bin/airflow", line 17, in <module>
    from airflow import configuration
  File "/home/xiaosi/.local/lib/python2.7/site-packages/airflow/__init__.py", line 30, in <module>
    from airflow import settings
  File "/home/xiaosi/.local/lib/python2.7/site-packages/airflow/settings.py", line 159, in <module>
    configure_orm()
  File "/home/xiaosi/.local/lib/python2.7/site-packages/airflow/settings.py", line 147, in configure_orm
    engine = create_engine(SQL_ALCHEMY_CONN, **engine_args)
  File "/home/xiaosi/.local/lib/python2.7/site-packages/sqlalchemy/engine/__init__.py", line 387, in create_engine
    return strategy.create(*args, **kwargs)
  File "/home/xiaosi/.local/lib/python2.7/site-packages/sqlalchemy/engine/strategies.py", line 80, in create
    dbapi = dialect_cls.dbapi(**dbapi_args)
  File "/home/xiaosi/.local/lib/python2.7/site-packages/sqlalchemy/dialects/mysql/mysqldb.py", line 110, in dbapi
    return __import__('MySQLdb')
ImportError: No module named MySQLdb

解决方案:

MySQL是最流行的开源数据库之一,但在Python标准库中并没有集成MySQL接口程序,MySQLdb是一个第三方包,需独立下载并安装。

sudo apt-get install python-mysqldb

再次初始化:

xiaosi@yoona:~$ airflow initdb
[2017-08-02 17:22:21,169] {__init__.py:57} INFO - Using executor SequentialExecutor
[2017-08-02 17:22:21,282] {driver.py:120} INFO - Generating grammar tables from /usr/lib/python2.7/lib2to3/Grammar.txt
[2017-08-02 17:22:21,302] {driver.py:120} INFO - Generating grammar tables from /usr/lib/python2.7/lib2to3/PatternGrammar.txt
DB: mysql://root:***@localhost:3306/airflow
[2017-08-02 17:22:21,553] {db.py:287} INFO - Creating tables
INFO  [alembic.runtime.migration] Context impl MySQLImpl.
INFO  [alembic.runtime.migration] Will assume non-transactional DDL.
INFO  [alembic.runtime.migration] Running upgrade  -> e3a246e0dc1, current schema
INFO  [alembic.runtime.migration] Running upgrade e3a246e0dc1 -> 1507a7289a2f, create is_encrypted
INFO  [alembic.runtime.migration] Running upgrade 1507a7289a2f -> 13eb55f81627, maintain history for compatibility with earlier migrations
INFO  [alembic.runtime.migration] Running upgrade 13eb55f81627 -> 338e90f54d61, More logging into task_isntance
INFO  [alembic.runtime.migration] Running upgrade 338e90f54d61 -> 52d714495f0, job_id indices
INFO  [alembic.runtime.migration] Running upgrade 52d714495f0 -> 502898887f84, Adding extra to Log
INFO  [alembic.runtime.migration] Running upgrade 502898887f84 -> 1b38cef5b76e, add dagrun
INFO  [alembic.runtime.migration] Running upgrade 1b38cef5b76e -> 2e541a1dcfed, task_duration
INFO  [alembic.runtime.migration] Running upgrade 2e541a1dcfed -> 40e67319e3a9, dagrun_config
INFO  [alembic.runtime.migration] Running upgrade 40e67319e3a9 -> 561833c1c74b, add password column to user
INFO  [alembic.runtime.migration] Running upgrade 561833c1c74b -> 4446e08588, dagrun start end
INFO  [alembic.runtime.migration] Running upgrade 4446e08588 -> bbc73705a13e, Add notification_sent column to sla_miss
INFO  [alembic.runtime.migration] Running upgrade bbc73705a13e -> bba5a7cfc896, Add a column to track the encryption state of the 'Extra' field in connection
INFO  [alembic.runtime.migration] Running upgrade bba5a7cfc896 -> 1968acfc09e3, add is_encrypted column to variable table
INFO  [alembic.runtime.migration] Running upgrade 1968acfc09e3 -> 2e82aab8ef20, rename user table
INFO  [alembic.runtime.migration] Running upgrade 2e82aab8ef20 -> 211e584da130, add TI state index
INFO  [alembic.runtime.migration] Running upgrade 211e584da130 -> 64de9cddf6c9, add task fails journal table
INFO  [alembic.runtime.migration] Running upgrade 64de9cddf6c9 -> f2ca10b85618, add dag_stats table
INFO  [alembic.runtime.migration] Running upgrade f2ca10b85618 -> 4addfa1236f1, Add fractional seconds to mysql tables
INFO  [alembic.runtime.migration] Running upgrade 4addfa1236f1 -> 8504051e801b, xcom dag task indices
INFO  [alembic.runtime.migration] Running upgrade 8504051e801b -> 5e7d17757c7a, add pid field to TaskInstance
INFO  [alembic.runtime.migration] Running upgrade 5e7d17757c7a -> 127d2bf2dfa7, Add dag_id/state index on dag_run table
Done.

查看一下airflow数据库中做了哪些操作:

mysql> use airflow;
Reading table information for completion of table and column names
You can turn off this feature to get a quicker startup with -A

Database changed
mysql> show tables;
+-------------------+
| Tables_in_airflow |
+-------------------+
| alembic_version   |
| chart             |
| connection        |
| dag               |
| dag_pickle        |
| dag_run           |
| dag_stats         |
| import_error      |
| job               |
| known_event       |
| known_event_type  |
| log               |
| sla_miss          |
| slot_pool         |
| task_fail         |
| task_instance     |
| users             |
| variable          |
| xcom              |
+-------------------+
19 rows in set (0.00 sec)

5. 启动

通过如下命令就可以启动后台管理界面,默认访问localhost:8080即可:

xiaosi@yoona:~$ airflow webserver
[2017-08-02 17:25:31,961] {__init__.py:57} INFO - Using executor SequentialExecutor
[2017-08-02 17:25:32,075] {driver.py:120} INFO - Generating grammar tables from /usr/lib/python2.7/lib2to3/Grammar.txt
[2017-08-02 17:25:32,095] {driver.py:120} INFO - Generating grammar tables from /usr/lib/python2.7/lib2to3/PatternGrammar.txt
  ____________       _____________
 ____    |__( )_________  __/__  /________      __
____  /| |_  /__  ___/_  /_ __  /_  __ \_ | /| / /
___  ___ |  / _  /   _  __/ _  / / /_/ /_ |/ |/ /
 _/_/  |_/_/  /_/    /_/    /_/  \____/____/|__/

/home/xiaosi/.local/lib/python2.7/site-packages/flask/exthook.py:71: ExtDeprecationWarning: Importing flask.ext.cache is deprecated, use flask_cache instead.
  .format(x=modname), ExtDeprecationWarning
[2017-08-02 17:25:32,469] [9703] {models.py:167} INFO - Filling up the DagBag from /home/xiaosi/opt/airflow/dags
Running the Gunicorn Server with:
Workers: 4 sync
Host: 0.0.0.0:8080
Timeout: 120
Logfiles: - -
=================================================================            
[2017-08-02 17:25:33,052] {__init__.py:57} INFO - Using executor SequentialExecutor
[2017-08-02 17:25:33,156] {driver.py:120} INFO - Generating grammar tables from /usr/lib/python2.7/lib2to3/Grammar.txt
[2017-08-02 17:25:33,179] {driver.py:120} INFO - Generating grammar tables from /usr/lib/python2.7/lib2to3/PatternGrammar.txt
[2017-08-02 17:25:33 +0000] [9706] [INFO] Starting gunicorn 19.3.0
[2017-08-02 17:25:33 +0000] [9706] [INFO] Listening at: http://0.0.0.0:8080 (9706)
[2017-08-02 17:25:33 +0000] [9706] [INFO] Using worker: sync
...

呈现出的主界面如下: img

相关实践学习
如何在云端创建MySQL数据库
开始实验后,系统会自动创建一台自建MySQL的 源数据库 ECS 实例和一台 目标数据库 RDS。
全面了解阿里云能为你做什么
阿里云在全球各地部署高效节能的绿色数据中心,利用清洁计算为万物互联的新世界提供源源不断的能源动力,目前开服的区域包括中国(华北、华东、华南、香港)、新加坡、美国(美东、美西)、欧洲、中东、澳大利亚、日本。目前阿里云的产品涵盖弹性计算、数据库、存储与CDN、分析与搜索、云通信、网络、管理与监控、应用服务、互联网中间件、移动服务、视频服务等。通过本课程,来了解阿里云能够为你的业务带来哪些帮助 &nbsp; &nbsp; 相关的阿里云产品:云服务器ECS 云服务器 ECS(Elastic Compute Service)是一种弹性可伸缩的计算服务,助您降低 IT 成本,提升运维效率,使您更专注于核心业务创新。产品详情: https://www.aliyun.com/product/ecs
目录
相关文章
|
7月前
|
SQL 分布式计算 Hadoop
Azkaban【基础 01】核心概念+特点+Web界面+架构+Job类型(一篇即可入门Azkaban工作流调度系统)
【2月更文挑战第6天】Azkaban【基础 01】核心概念+特点+Web界面+架构+Job类型(一篇即可入门Azkaban工作流调度系统)
557 0
|
关系型数据库 MySQL Linux
airflow安装教程(local模式)
airflow安装教程(local模式)
airflow安装教程(local模式)
|
4月前
|
分布式计算 监控 调度
airflow是什么
Apache Airflow是一个用于调度和监控有依赖任务的工作流平台,它使用Python编程定义任务和工作流,提供了命令行和Web界面工具,支持包括Spark、MR、Hive在内的多种数据处理任务的提交和管理。
153 5
|
7月前
|
存储 数据安全/隐私保护 Docker
Airflow安装
Airflow安装
152 0
|
7月前
|
存储 监控 Linux
Airflow【部署 01】Airflow官网Quick Start实操(一篇学会部署Airflow)
【2月更文挑战第7天】Airflow【部署 01】Airflow官网Quick Start实操(一篇学会部署Airflow)
630 1
|
SQL 关系型数据库 MySQL
66 Azkaban安装部署
66 Azkaban安装部署
83 0
|
缓存 资源调度 前端开发
📒十分钟快速入门Yarn包管理工具
📒十分钟快速入门Yarn包管理工具
274 4
📒十分钟快速入门Yarn包管理工具
|
存储 缓存 监控
Ansible最佳实践之 AWX 作业创建和启动
写在前面 分享一些 AWX 作业创建和启动的笔记 博文内容涉及: 创建作业模板 涉及相关参数,作业模板角色配置介绍 运行作业模板并测试的Demo 食用方式: 需要了解 Ansible 理解不足小伙伴帮忙指正
362 0
Ansible最佳实践之 AWX 作业创建和启动
|
Linux 数据安全/隐私保护 Docker
Airflow容器化安装
1. 下载容器配置文件 2. 创建用户和用户组(airflow) 3.设置用户airflow为管理员 4. 在airflow用户下,pip 安装celery 和 apache-airflow (python3.8) 5. 初始化镜像 6. 启动容器
Airflow容器化安装
|
分布式计算 数据可视化 数据挖掘
Windows下使用zeppelin、Dockers搭建Flink学习环境
Windows下使用zeppelin、Dockers搭建Flink学习环境
857 0
Windows下使用zeppelin、Dockers搭建Flink学习环境