Apache AirFlow安装部署

简介: 1.环境依赖 Centos7 组件 版本 Python 2.7.5 AirFlow 1.10.5 pyhton依赖库 (airflow) [bigdata@carbondata airflow]$ pip list DEPRECATION: Python 2.



组件 版本
Python 2.7.5
AirFlow 1.10.5


(airflow) [bigdata@carbondata airflow]$ pip list
DEPRECATION: Python 2.7 will reach the end of its life on January 1st, 2020. Please upgrade your Python as Python 2.7 won't be maintained after that date. A future version of pip will drop support for Python 2.7. More details about Python 2 support in pip, can be found at https://pip.pypa.io/en/latest/development/release-process/#python-2-support
Package                Version    
---------------------- -----------
alembic                1.1.0      
apache-airflow         1.10.5     
apispec                2.0.2      
attrs                  19.1.0     
Babel                  2.7.0      
cached-property        1.5.1      
certifi                2019.6.16  
chardet                3.0.4      
Click                  7.0        
colorama               0.4.1      
colorlog               4.0.2      
configparser           3.5.3      
croniter               0.3.30     
dill                   0.2.9      
docutils               0.15.2     
dumb-init              1.2.2      
enum34                 1.1.6      
Flask                  1.1.1      
Flask-Admin            1.5.3      
Flask-AppBuilder       1.13.1     
Flask-Babel            0.12.2     
Flask-Caching          1.3.3      
Flask-JWT-Extended     3.22.0     
Flask-Login            0.4.1      
Flask-OpenID           1.2.5      
Flask-SQLAlchemy       2.4.0      
flask-swagger          0.2.13     
Flask-WTF              0.14.2     
funcsigs               1.0.0      
functools32            3.2.3.post2
future                 0.16.0     
futures                3.3.0      
gunicorn               19.9.0     
idna                   2.8        
iso8601                0.1.12     
itsdangerous           1.1.0      
Jinja2                 2.10.1     
json-merge-patch       0.2        
jsonschema             3.0.2      
lazy-object-proxy      1.4.2      
lockfile               0.12.2     
Mako                   1.1.0      
Markdown               2.6.11     
MarkupSafe             1.1.1      
marshmallow            2.19.5     
marshmallow-enum       1.5.1      
marshmallow-sqlalchemy 0.17.2     
monotonic              1.5        
numpy                  1.16.5     
ordereddict            1.1        
pandas                 0.24.2     
pendulum               1.4.4      
pip                    19.2.3     
prison                 0.1.0      
psutil                 5.6.3      
Pygments               2.4.2      
PyJWT                  1.7.1      
pyrsistent             0.15.4     
python-daemon          2.1.2      
python-dateutil        2.8.0      
python-editor          1.0.4      
python-openid          2.2.5      
pytz                   2019.2     
pytzdata               2019.2     
PyYAML                 5.1.2      
requests               2.22.0     
setproctitle           1.1.10     
setuptools             41.2.0     
six                    1.12.0     
SQLAlchemy             1.3.8      
tabulate               0.8.3      
tenacity               4.12.0     
termcolor              1.1.0      
text-unidecode         1.2        
thrift                 0.11.0     
tzlocal                1.5.1      
unicodecsv             0.14.1     
urllib3                1.25.3     
Werkzeug               0.15.6     
wheel                  0.33.6     
WTForms                2.2.1      
zope.deprecation       4.4.0      






## 在线安装
pip install virtualenv

## 离线安装



## Step1: 创建虚拟机
virtualenv airflow

## Step2: 进入虚拟机
source ./airflow/bin/activate
## 在线安装
pip install airflow

## 离线安装
pip install --no-index --find-links=/path/to/airflow_1.10.5/package -r airflow-1.10.5.txt


命令: airflow initdb

(airflow) [bigdata@carbondata airflow]$ airflow initdb
[2019-09-05 23:16:38,422] {__init__.py:51} INFO - Using executor SequentialExecutor
DEPRECATION: Python 2.7 will reach the end of its life on January 1st, 2020. Airflow 1.10 will be the last release series to support Python 2
DB: sqlite:////home/bigdata/airflow/airflow.db
[2019-09-05 23:16:39,766] {db.py:369} INFO - Creating tables
INFO  [alembic.runtime.migration] Context impl SQLiteImpl.
INFO  [alembic.runtime.migration] Will assume non-transactional DDL.
INFO  [alembic.runtime.migration] Running upgrade  -> e3a246e0dc1, current schema
INFO  [alembic.runtime.migration] Running upgrade e3a246e0dc1 -> 1507a7289a2f, create is_encrypted
/home/bigdata/airflow/lib/python2.7/site-packages/alembic/ddl/sqlite.py:39: UserWarning: Skipping unsupported ALTER for creation of implicit constraint
  "Skipping unsupported ALTER for "
INFO  [alembic.runtime.migration] Running upgrade 1507a7289a2f -> 13eb55f81627, maintain history for compatibility with earlier migrations
INFO  [alembic.runtime.migration] Running upgrade 13eb55f81627 -> 338e90f54d61, More logging into task_instance
INFO  [alembic.runtime.migration] Running upgrade 338e90f54d61 -> 52d714495f0, job_id indices
INFO  [alembic.runtime.migration] Running upgrade 52d714495f0 -> 502898887f84, Adding extra to Log
INFO  [alembic.runtime.migration] Running upgrade 502898887f84 -> 1b38cef5b76e, add dagrun
INFO  [alembic.runtime.migration] Running upgrade 1b38cef5b76e -> 2e541a1dcfed, task_duration
INFO  [alembic.runtime.migration] Running upgrade 2e541a1dcfed -> 40e67319e3a9, dagrun_config
INFO  [alembic.runtime.migration] Running upgrade 40e67319e3a9 -> 561833c1c74b, add password column to user
INFO  [alembic.runtime.migration] Running upgrade 561833c1c74b -> 4446e08588, dagrun start end
INFO  [alembic.runtime.migration] Running upgrade 4446e08588 -> bbc73705a13e, Add notification_sent column to sla_miss
INFO  [alembic.runtime.migration] Running upgrade bbc73705a13e -> bba5a7cfc896, Add a column to track the encryption state of the 'Extra' field in connection
INFO  [alembic.runtime.migration] Running upgrade bba5a7cfc896 -> 1968acfc09e3, add is_encrypted column to variable table
INFO  [alembic.runtime.migration] Running upgrade 1968acfc09e3 -> 2e82aab8ef20, rename user table
INFO  [alembic.runtime.migration] Running upgrade 2e82aab8ef20 -> 211e584da130, add TI state index
INFO  [alembic.runtime.migration] Running upgrade 211e584da130 -> 64de9cddf6c9, add task fails journal table
INFO  [alembic.runtime.migration] Running upgrade 64de9cddf6c9 -> f2ca10b85618, add dag_stats table
INFO  [alembic.runtime.migration] Running upgrade f2ca10b85618 -> 4addfa1236f1, Add fractional seconds to mysql tables
INFO  [alembic.runtime.migration] Running upgrade 4addfa1236f1 -> 8504051e801b, xcom dag task indices
INFO  [alembic.runtime.migration] Running upgrade 8504051e801b -> 5e7d17757c7a, add pid field to TaskInstance
INFO  [alembic.runtime.migration] Running upgrade 5e7d17757c7a -> 127d2bf2dfa7, Add dag_id/state index on dag_run table
INFO  [alembic.runtime.migration] Running upgrade 127d2bf2dfa7 -> cc1e65623dc7, add max tries column to task instance
INFO  [alembic.runtime.migration] Running upgrade cc1e65623dc7 -> bdaa763e6c56, Make xcom value column a large binary
INFO  [alembic.runtime.migration] Running upgrade bdaa763e6c56 -> 947454bf1dff, add ti job_id index
INFO  [alembic.runtime.migration] Running upgrade 947454bf1dff -> d2ae31099d61, Increase text size for MySQL (not relevant for other DBs' text types)
INFO  [alembic.runtime.migration] Running upgrade d2ae31099d61 -> 0e2a74e0fc9f, Add time zone awareness
INFO  [alembic.runtime.migration] Running upgrade d2ae31099d61 -> 33ae817a1ff4, kubernetes_resource_checkpointing
INFO  [alembic.runtime.migration] Running upgrade 33ae817a1ff4 -> 27c6a30d7c24, kubernetes_resource_checkpointing
INFO  [alembic.runtime.migration] Running upgrade 27c6a30d7c24 -> 86770d1215c0, add kubernetes scheduler uniqueness
INFO  [alembic.runtime.migration] Running upgrade 86770d1215c0, 0e2a74e0fc9f -> 05f30312d566, merge heads
INFO  [alembic.runtime.migration] Running upgrade 05f30312d566 -> f23433877c24, fix mysql not null constraint
INFO  [alembic.runtime.migration] Running upgrade f23433877c24 -> 856955da8476, fix sqlite foreign key
INFO  [alembic.runtime.migration] Running upgrade 856955da8476 -> 9635ae0956e7, index-faskfail
INFO  [alembic.runtime.migration] Running upgrade 9635ae0956e7 -> dd25f486b8ea
INFO  [alembic.runtime.migration] Running upgrade dd25f486b8ea -> bf00311e1990, add index to taskinstance
INFO  [alembic.runtime.migration] Running upgrade 9635ae0956e7 -> 0a2a5b66e19d, add task_reschedule table
INFO  [alembic.runtime.migration] Running upgrade 0a2a5b66e19d, bf00311e1990 -> 03bc53e68815, merge_heads_2
INFO  [alembic.runtime.migration] Running upgrade 03bc53e68815 -> 41f5f12752f8, add superuser field
INFO  [alembic.runtime.migration] Running upgrade 41f5f12752f8 -> c8ffec048a3b, add fields to dag
INFO  [alembic.runtime.migration] Running upgrade c8ffec048a3b -> dd4ecb8fbee3, Add schedule interval to dag
INFO  [alembic.runtime.migration] Running upgrade dd4ecb8fbee3 -> 939bb1e647c8, task reschedule fk on cascade delete
INFO  [alembic.runtime.migration] Running upgrade c8ffec048a3b -> a56c9515abdc, Remove dag_stat table
INFO  [alembic.runtime.migration] Running upgrade 939bb1e647c8 -> 6e96a59344a4, Make TaskInstance.pool not nullable
INFO  [alembic.runtime.migration] Running upgrade 6e96a59344a4 -> 74effc47d867, change datetime to datetime2(6) on MSSQL tables
INFO  [alembic.runtime.migration] Running upgrade 939bb1e647c8 -> 004c1210f153, increase queue name size limit
WARNI [airflow.utils.log.logging_mixin.LoggingMixin] cryptography not found - values will not be stored encrypted.



命令: airflow webserver -p port

(airflow) [bigdata@carbondata airflow]$ airflow webserver -p 8383
[2019-09-05 23:17:30,787] {__init__.py:51} INFO - Using executor SequentialExecutor
DEPRECATION: Python 2.7 will reach the end of its life on January 1st, 2020. Airflow 1.10 will be the last release series to support Python 2
  ____________       _____________
____    |__( )_________  __/__  /________      __
____  /| |_  /__  ___/_  /_ __  /_  __ \_ | /| / /
___  ___ |  / _  /   _  __/ _  / / /_/ /_ |/ |/ /
_/_/  |_/_/  /_/    /_/    /_/  \____/____/|__/
[2019-09-05 23:17:31,379] {dagbag.py:90} INFO - Filling up the DagBag from /home/bigdata/airflow/dags
Running the Gunicorn Server with:
Workers: 4 sync
Timeout: 120
Logfiles: - -
[2019-09-05 23:17:32 +0000] [66386] [INFO] Starting gunicorn 19.9.0
[2019-09-05 23:17:32 +0000] [66386] [INFO] Listening at: (66386)
[2019-09-05 23:17:32 +0000] [66386] [INFO] Using worker: sync
[2019-09-05 23:17:32 +0000] [66396] [INFO] Booting worker with pid: 66396
[2019-09-05 23:17:32,886] {__init__.py:51} INFO - Using executor SequentialExecutor
[2019-09-05 23:17:32 +0000] [66397] [INFO] Booting worker with pid: 66397
[2019-09-05 23:17:32,958] {__init__.py:51} INFO - Using executor SequentialExecutor
[2019-09-05 23:17:33 +0000] [66399] [INFO] Booting worker with pid: 66399
[2019-09-05 23:17:33,123] {__init__.py:51} INFO - Using executor SequentialExecutor
[2019-09-05 23:17:33 +0000] [66401] [INFO] Booting worker with pid: 66401
[2019-09-05 23:17:33,227] {__init__.py:51} INFO - Using executor SequentialExecutor
[2019-09-05 23:17:33,263] {dagbag.py:90} INFO - Filling up the DagBag from /home/bigdata/airflow/dags
[2019-09-05 23:17:33,389] {dagbag.py:90} INFO - Filling up the DagBag from /home/bigdata/airflow/dags
[2019-09-05 23:17:33,691] {dagbag.py:90} INFO - Filling up the DagBag from /home/bigdata/airflow/dags
[2019-09-05 23:17:33,778] {dagbag.py:90} INFO - Filling up the DagBag from /home/bigdata/airflow/dags
[2019-09-05 23:18:05 +0000] [66386] [INFO] Handling signal: ttin
[2019-09-05 23:18:05 +0000] [66439] [INFO] Booting worker with pid: 66439
[2019-09-05 23:18:05,321] {__init__.py:51} INFO - Using executor SequentialExecutor
[2019-09-05 23:18:05,593] {dagbag.py:90} INFO - Filling up the DagBag from /home/bigdata/airflow/dags
[2019-09-05 23:18:06 +0000] [66386] [INFO] Handling signal: ttou
[2019-09-05 23:18:06 +0000] [66396] [INFO] Worker exiting (pid: 66396)


命令: airflow scheduler

(airflow) [bigdata@carbondata airflow]$ airflow scheduler &
[2] 66557
(airflow) [bigdata@carbondata airflow]$ [2019-09-05 23:19:27,397] {__init__.py:51} INFO - Using executor SequentialExecutor
DEPRECATION: Python 2.7 will reach the end of its life on January 1st, 2020. Airflow 1.10 will be the last release series to support Python 2
  ____________       _____________
____    |__( )_________  __/__  /________      __
____  /| |_  /__  ___/_  /_ __  /_  __ \_ | /| / /
___  ___ |  / _  /   _  __/ _  / / /_/ /_ |/ |/ /
_/_/  |_/_/  /_/    /_/    /_/  \____/____/|__/
[2019-09-05 23:19:42,748] {scheduler_job.py:1315} INFO - Starting the scheduler
[2019-09-05 23:19:42,748] {scheduler_job.py:1323} INFO - Running execute loop for -1 seconds
[2019-09-05 23:19:42,748] {scheduler_job.py:1324} INFO - Processing each file at most -1 times
[2019-09-05 23:19:42,748] {scheduler_job.py:1327} INFO - Searching for files in /home/bigdata/airflow/dags
[2019-09-05 23:19:42,753] {scheduler_job.py:1329} INFO - There are 20 files in /home/bigdata/airflow/dags
[2019-09-05 23:19:42,753] {scheduler_job.py:1376} INFO - Resetting orphaned tasks for active dag runs
[2019-09-05 23:19:42,796] {dag_processing.py:545} INFO - Launched DagFileProcessorManager with pid: 66585
[2019-09-05 23:19:42,809] {settings.py:54} INFO - Configured default timezone <Timezone [UTC]>
[2019-09-05 23:19:42,831] {dag_processing.py:748} ERROR - Cannot use more than 1 thread when using sqlite. Setting parallelism to 1
[2019-09-05 23:19:50 +0000] [66525] [INFO] Handling signal: ttin
[2019-09-05 23:19:50 +0000] [66593] [INFO] Booting worker with pid: 66593
[2019-09-05 23:19:50,301] {__init__.py:51} INFO - Using executor SequentialExecutor
[2019-09-05 23:19:50,589] {dagbag.py:90} INFO - Filling up the DagBag from /home/bigdata/airflow/dags
[2019-09-05 23:19:51 +0000] [66525] [INFO] Handling signal: ttou
[2019-09-05 23:19:51 +0000] [66535] [INFO] Worker exiting (pid: 66535)


URL: http://hostname:port

存储 消息中间件 druid
大数据-150 Apache Druid 安装部署 单机启动 系统架构
大数据-150 Apache Druid 安装部署 单机启动 系统架构
31 1
分布式计算 Serverless 数据处理
EMR Serverless Spark 实践教程 | 通过 Apache Airflow 使用 Livy Operator 提交任务
Apache Airflow 是一个强大的工作流程自动化和调度工具,它允许开发者编排、计划和监控数据管道的执行。EMR Serverless Spark 为处理大规模数据处理任务提供了一个无服务器计算环境。本文为您介绍如何通过 Apache Airflow 的 Livy Operator 实现自动化地向 EMR Serverless Spark 提交任务,以实现任务调度和执行的自动化,帮助您更有效地管理数据处理任务。
181 0
监控 数据处理 调度
使用Apache Airflow进行工作流编排:技术详解与实践
【6月更文挑战第5天】Apache Airflow是开源的工作流编排平台,用Python定义复杂数据处理管道,提供直观DAGs、强大调度、丰富插件、易扩展性和实时监控。本文深入介绍Airflow基本概念、特性,阐述安装配置、工作流定义、调度监控的步骤,并通过实践案例展示如何构建数据获取、处理到存储的工作流。Airflow简化了复杂数据任务管理,适应不断发展的数据技术需求。
SQL 资源调度 分布式计算
Apache DolphinScheduler 【安装部署】
Apache DolphinScheduler 【安装部署】
存储 固态存储 关系型数据库
Apache Doris 系列: 入门篇-安装部署
Apache Doris 系列: 入门篇-安装部署
1324 0
分布式计算 固态存储 Hadoop
Apache Doris Broker快速体验之Hadoop安装部署(1)1
Apache Doris Broker快速体验之Hadoop安装部署(1)1
140 0
SQL 分布式计算 Hadoop
Apache Impala 的安装部署
Apache Impala 的安装部署
210 0
机器学习/深度学习 存储 Kubernetes
如何将 Apache Airflow 用于机器学习工作流
Apache Airflow 是一个流行的平台,用于在 Python 中创建、调度和监控工作流。 它在 Github 上有超过 15,000 颗星,被 Twitter、Airbnb 和 Spotify 等公司的数据工程师使用。 如果您使用的是 Apache Airflow,那么您的架构可能已经根据任务数量及其要求进行了演变。 在 Skillup.co 工作时,我们首先有几百个 DAG 来执行我们所有的数据工程任务,然后我们开始做机器学习。
XML 分布式计算 Hadoop
Apache Doris Broker快速体验之Hadoop安装部署(1)2
Apache Doris Broker快速体验之Hadoop安装部署(1)2
201 0

