开发者社区 问答 正文

Python-通过Airflow DAG进行数据流作业

我正在尝试通过Airflow中的BashOperator使用数据流运行器来执行apap梁管道python文件。我已经知道如何将参数动态传递给python文件。我期待优化参数-避免单独发送所有参数。示例片段:

text_context.py

import sys

def run_awc_orders(*args, **kwargs): print("all arguments -> ", args)

if name == "main": print("all params -> ", sys.argv) run_awc_orders( sys.argv[1], sys.argv[2], sys.argv[3]) my_dag.py

test_DF_job = BashOperator( task_id='test_DF_job', provide_context=True, bash_command="python /usr/local/airflow/dags/test_context.py {{ execution_date }} {{ next_execution_date }} {{ params.db_params.new_text }} --runner DataflowRunner --key path_to_creds_json_file --project project_name --staging_location staging_gcp_bucket_location --temp_location=temp_gcp_bucket_location --job_name test-job", params={ 'db_params': { 'new_text': 'Hello World' } }, dag=dag ) 因此,这就是我们在气流UI的日志中看到的内容。

[2019-09-25 06:44:44,103] {bash_operator.py:128} INFO - all params -> ['/usr/local/airflow/dags/test_context.py', '2019-09-23T00:00:00+00:00', '2019-09-24T00:00:00+00:00', '127.0.0.1'] [2019-09-25 06:44:44,103] {bash_operator.py:128} INFO - all arguments -> ('2019-09-23T00:00:00+00:00', '2019-09-24T00:00:00+00:00', '127.0.0.1') [2019-09-25 06:44:44,106] {bash_operator.py:132} INFO - Command exited with return code 0

展开
收起
被纵养的懒猫 2019-09-25 15:17:43 909 分享 版权
阿里云 AI 助理回答

在您的场景中,您希望优化参数传递给通过BashOperator执行的Python脚本的方式。目前,您是直接将参数作为命令行参数传递,并且每个参数都显式地在bash_command中列出。为了更高效和灵活地处理参数,您可以考虑以下几种方法:

1. 使用JSON字符串

将所有参数打包成一个JSON对象,然后在Python脚本中解析这个JSON字符串。

修改my_dag.py:

import json

# 准备参数字典
params_dict = {
    'execution_date': '{{ execution_date }}',
    'next_execution_date': '{{ next_execution_date }}',
    'new_text': '{{ params.db_params.new_text }}',
    'runner': 'DataflowRunner',
    'key': 'path_to_creds_json_file',
    'project': 'project_name',
    'staging_location': 'staging_gcp_bucket_location',
    'temp_location': 'temp_gcp_bucket_location',
    'job_name': 'test-job'
}

# 将字典转换为JSON字符串
json_params = json.dumps(params_dict)

test_DF_job = BashOperator(
    task_id='test_DF_job',
    provide_context=True,
    bash_command=f"python /usr/local/airflow/dags/test_context.py '{json_params}'",
    params={'db_params': {'new_text': 'Hello World'}},
    dag=dag
)

修改test_context.py:

import sys
import json

def run_awc_orders(params):
    print("all parameters -> ", params)

if __name__ == "__main__":
    json_str = sys.argv[1]
    params = json.loads(json_str)
    run_awc_orders(params)

这种方法允许您以结构化的方式传递任意数量的参数,同时保持代码的可读性和灵活性。

2. 使用环境变量

另一种方式是使用环境变量来传递参数。这在需要传递敏感信息(如密钥)时特别有用,因为环境变量不会直接出现在日志中。

修改my_dag.py:

env_vars = {
    'EXECUTION_DATE': '{{ execution_date }}',
    'NEXT_EXECUTION_DATE': '{{ next_execution_date }}',
    'NEW_TEXT': '{{ params.db_params.new_text }}',
    # 其他环境变量...
}

test_DF_job = BashOperator(
    task_id='test_DF_job',
    provide_context=True,
    env=env_vars,  # 直接设置环境变量
    bash_command="python /usr/local/airflow/dags/test_context.py",
    params={'db_params': {'new_text': 'Hello World'}},
    dag=dag
)

修改test_context.py:

import os

def run_awc_orders():
    print("Execution Date -> ", os.environ.get('EXECUTION_DATE'))
    print("Next Execution Date -> ", os.environ.get('NEXT_EXECUTION_DATE'))
    print("New Text -> ", os.environ.get('NEW_TEXT'))

if __name__ == "__main__":
    run_awc_orders()

这两种方法都可以帮助您避免在Bash命令中硬编码多个参数,使代码更加整洁和易于维护。选择哪种方法取决于您的具体需求,比如参数的数量、类型以及是否包含敏感信息。

有帮助
无帮助
AI 助理回答生成答案可能存在不准确,仅供参考
0 条回答
写回答
取消 提交回答
问答分类:
问答标签:
问答地址: