开发者社区> 问答> 正文

Python-通过Airflow DAG进行数据流作业

我正在尝试通过Airflow中的BashOperator使用数据流运行器来执行apap梁管道python文件。我已经知道如何将参数动态传递给python文件。我期待优化参数-避免单独发送所有参数。示例片段:

text_context.py

import sys

def run_awc_orders(*args, **kwargs): print("all arguments -> ", args)

if name == "main": print("all params -> ", sys.argv) run_awc_orders( sys.argv[1], sys.argv[2], sys.argv[3]) my_dag.py

test_DF_job = BashOperator( task_id='test_DF_job', provide_context=True, bash_command="python /usr/local/airflow/dags/test_context.py {{ execution_date }} {{ next_execution_date }} {{ params.db_params.new_text }} --runner DataflowRunner --key path_to_creds_json_file --project project_name --staging_location staging_gcp_bucket_location --temp_location=temp_gcp_bucket_location --job_name test-job", params={ 'db_params': { 'new_text': 'Hello World' } }, dag=dag ) 因此,这就是我们在气流UI的日志中看到的内容。

[2019-09-25 06:44:44,103] {bash_operator.py:128} INFO - all params -> ['/usr/local/airflow/dags/test_context.py', '2019-09-23T00:00:00+00:00', '2019-09-24T00:00:00+00:00', '127.0.0.1'] [2019-09-25 06:44:44,103] {bash_operator.py:128} INFO - all arguments -> ('2019-09-23T00:00:00+00:00', '2019-09-24T00:00:00+00:00', '127.0.0.1') [2019-09-25 06:44:44,106] {bash_operator.py:132} INFO - Command exited with return code 0

展开
收起
被纵养的懒猫 2019-09-25 15:17:43 888 0
0 条回答
写回答
取消 提交回答
问答分类:
问答标签:
问答地址:
问答排行榜
最热
最新

相关电子书

更多
From Python Scikit-Learn to Sc 立即下载
Data Pre-Processing in Python: 立即下载
双剑合璧-Python和大数据计算平台的结合 立即下载