使用AirFlow调度MaxCompute

本文涉及的产品
云原生大数据计算服务 MaxCompute,5000CU*H 100GB 3个月
云原生大数据计算服务MaxCompute,500CU*H 100GB 3个月
简介: airflow是Airbnb开源的一个用python编写的调度工具,基于有向无环图(DAG),airflow可以定义一组有依赖的任务,按照依赖依次执行,通过python代码定义子任务,并支持各种Operate操作器,灵活性大,能满足用户的各种需求。本文主要介绍使用Airflow的python Operator调度MaxCompute 任务

背景

airflow是Airbnb开源的一个用python编写的调度工具,基于有向无环图(DAG),airflow可以定义一组有依赖的任务,按照依赖依次执行,通过python代码定义子任务,并支持各种Operate操作器,灵活性大,能满足用户的各种需求。本文主要介绍使用Airflow的python Operator调度MaxCompute 任务

一、环境准备

  • Python 2.7.5  PyODPS支持Python2.6以上版本
  • Airflow apache-airflow-1.10.7

1.安装MaxCompute需要的包

pip install setuptools>=3.0

pip install requests>=2.4.0

pip install greenlet>=0.4.10  # 可选,安装后能加速Tunnel上传。

pip install cython>=0.19.0  # 可选,不建议Windows用户安装。

pip install pyodps

注意:如果requests包冲突,先卸载再安装对应的版本

2.执行如下命令检查安装是否成功


python -c "from odps import ODPS"


二、开发步骤

1.在Airflow家目录编写python调度脚本Airiflow_MC.py


# -*- coding: UTF-8 -*-

import sys

import os

from odps import ODPS

from odps import options

from airflow import DAG

from airflow.operators.python_operator import PythonOperator

from datetime import datetime, timedelta

from configparser import ConfigParser

import time

reload(sys)

sys.setdefaultencoding('utf8')

#修改系统默认编码。

# MaxCompute参数设置

options.sql.settings = {'options.tunnel.limit_instance_tunnel': False, 'odps.sql.allow.fullscan': True}

cfg = ConfigParser()

cfg.read("odps.ini")

print(cfg.items())

odps = ODPS(cfg.get("odps","access_id"),cfg.get("odps","secret_access_key"),cfg.get("odps","project"),cfg.get("odps","endpoint"))

default_args = {

   'owner': 'airflow',

   'depends_on_past': False,

   'retry_delay': timedelta(minutes=5),

   'start_date':datetime(2020,1,15)

   # 'email': ['airflow@example.com'],

   # 'email_on_failure': False,

   # 'email_on_retry': False,

   # 'retries': 1,

   # 'queue': 'bash_queue',

   # 'pool': 'backfill',

   # 'priority_weight': 10,

   # 'end_date': datetime(2016, 1, 1),

}

dag = DAG(

   'Airiflow_MC', default_args=default_args, schedule_interval=timedelta(seconds=30))

def read_sql(sqlfile):

   with io.open(sqlfile, encoding='utf-8', mode='r') as f:

       sql=f.read()

   f.closed

   return sql

def get_time():

   print '当前时间是{}'.format(time.time())

   return time.time()

def mc_job ():


   project = odps.get_project()  # 取到默认项目。

   instance=odps.run_sql("select * from long_chinese;")

   print(instance.get_logview_address())

   instance.wait_for_success()

   with instance.open_reader() as reader:

       count = reader.count

   print("查询表数据条数:{}".format(count))

   for record in reader:

       print record

   return count

t1 = PythonOperator (

   task_id = 'get_time' ,

   provide_context = False ,

   python_callable = get_time,

   dag = dag )


t2 = PythonOperator (

   task_id = 'mc_job' ,

   provide_context = False ,

   python_callable = mc_job ,

   dag = dag )

t2.set_upstream(t1)


2.提交


python Airiflow_MC.py

3.进行测试


# print the list of active DAGs

airflow list_dags


# prints the list of tasks the "tutorial" dag_id

airflow list_tasks Airiflow_MC


# prints the hierarchy of tasks in the tutorial DAG

airflow list_tasks Airiflow_MC --tree

#测试task

airflow test Airiflow_MC get_time 2010-01-16

airflow test Airiflow_MC mc_job 2010-01-16

4.运行调度任务

登录到web界面点击按钮运行

03.png

5.查看任务运行结果

1.点击view log

04.png

2.查看结果



大家如果对MaxCompute有更多咨询或者建议,欢迎扫码加入 MaxCompute开发者社区钉钉群,或点击链接 申请加入。

MaxCompute 二维码拼图.png

相关实践学习
基于MaxCompute的热门话题分析
本实验围绕社交用户发布的文章做了详尽的分析,通过分析能得到用户群体年龄分布,性别分布,地理位置分布,以及热门话题的热度。
SaaS 模式云数据仓库必修课
本课程由阿里云开发者社区和阿里云大数据团队共同出品,是SaaS模式云原生数据仓库领导者MaxCompute核心课程。本课程由阿里云资深产品和技术专家们从概念到方法,从场景到实践,体系化的将阿里巴巴飞天大数据平台10多年的经过验证的方法与实践深入浅出的讲给开发者们。帮助大数据开发者快速了解并掌握SaaS模式的云原生的数据仓库,助力开发者学习了解先进的技术栈,并能在实际业务中敏捷的进行大数据分析,赋能企业业务。 通过本课程可以了解SaaS模式云原生数据仓库领导者MaxCompute核心功能及典型适用场景,可应用MaxCompute实现数仓搭建,快速进行大数据分析。适合大数据工程师、大数据分析师 大量数据需要处理、存储和管理,需要搭建数据仓库?学它! 没有足够人员和经验来运维大数据平台,不想自建IDC买机器,需要免运维的大数据平台?会SQL就等于会大数据?学它! 想知道大数据用得对不对,想用更少的钱得到持续演进的数仓能力?获得极致弹性的计算资源和更好的性能,以及持续保护数据安全的生产环境?学它! 想要获得灵活的分析能力,快速洞察数据规律特征?想要兼得数据湖的灵活性与数据仓库的成长性?学它! 出品人:阿里云大数据产品及研发团队专家 产品 MaxCompute 官网 https://www.aliyun.com/product/odps 
目录
相关文章
|
分布式计算 Java 大数据
大数据常用调度平台
大数据常用调度平台
308 0
|
7月前
|
分布式计算 算法 大数据
MaxCompute操作报错合集之使用mf时,为什么还是把独享调度资源占满了
MaxCompute是阿里云提供的大规模离线数据处理服务,用于大数据分析、挖掘和报表生成等场景。在使用MaxCompute进行数据处理时,可能会遇到各种操作报错。以下是一些常见的MaxCompute操作报错及其可能的原因与解决措施的合集。
|
7月前
|
存储 资源调度 大数据
云计算在大数据分析中的弹性资源调度策略
云计算在大数据分析中的弹性资源调度策略
|
7月前
|
存储 分布式计算 大数据
MaxCompute产品使用合集之怎么将一个Quota的资源优先供给给标准模式的生产库调度使用
MaxCompute作为一款全面的大数据处理平台,广泛应用于各类大数据分析、数据挖掘、BI及机器学习场景。掌握其核心功能、熟练操作流程、遵循最佳实践,可以帮助用户高效、安全地管理和利用海量数据。以下是一个关于MaxCompute产品使用的合集,涵盖了其核心功能、应用场景、操作流程以及最佳实践等内容。
|
8月前
|
分布式计算 大数据 数据处理
MaxCompute操作报错合集之在开发环境中配置MaxCompute参数进行调度,但参数解析不出来,如何解决
MaxCompute是阿里云提供的大规模离线数据处理服务,用于大数据分析、挖掘和报表生成等场景。在使用MaxCompute进行数据处理时,可能会遇到各种操作报错。以下是一些常见的MaxCompute操作报错及其可能的原因与解决措施的合集。
|
8月前
|
存储 分布式计算 调度
MaxCompute资源问题之删除调度资源如何解决
MaxCompute资源指的是在MaxCompute项目中使用的计算资源和存储资源;本合集旨在向用户展示如何高效管理MaxCompute资源,包括资源包管理、配额调整和性能优化等方面。
|
8月前
|
Oracle 关系型数据库 大数据
助力工业物联网,工业大数据之服务域:Shell调度测试【三十三】
助力工业物联网,工业大数据之服务域:Shell调度测试【三十三】
59 1
|
8月前
|
分布式计算 大数据 调度
大数据计算MaxCompute怎么将一个Quota的资源优先供给给标准模式的生产库调度使用?
大数据计算MaxCompute怎么将一个Quota的资源优先供给给标准模式的生产库调度使用?
79 2
|
8月前
|
分布式计算 DataWorks 调度
DataWorks在绑定MaxCompute并进行周期性调度前
DataWorks在绑定MaxCompute并进行周期性调度前
77 3
|
8月前
|
资源调度 分布式计算 大数据
【云计算与大数据技术】资源管理、调度模型策略的讲解
【云计算与大数据技术】资源管理、调度模型策略的讲解
615 0

相关产品

  • 云原生大数据计算服务 MaxCompute