使用Airflow调度MaxCompute

简介: MaxCompute对外提供了Python接口,通过提供的接口使用AirFlow进行调度

一、环境准备

• Python 2.7.5 PyODPS支持Python2.6以上版本
• Airflow apache-airflow-1.10.7
1.安装MaxCompute需要的包

pip install setuptools>=3.0
pip install requests>=2.4.0
pip install greenlet>=0.4.10  # 可选,安装后能加速Tunnel上传。
pip install cython>=0.19.0  # 可选,不建议Windows用户安装。
pip install pyodps

注意:如果requests包冲突,先卸载再安装对应的版本
2.执行如下命令检查安装是否成功

python -c "from odps import ODPS"

二、开发步骤

1.在Airflow家目录编写python调度脚本Airiflow_MC.py

#-*- coding: UTF-8 -*-
import sys
from odps import ODPS
from odps import options
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from datetime import datetime, timedelta
import time
#修改系统默认编码。
reload(sys)
sys.setdefaultencoding('utf8')

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date':datetime(2020,1,15),
    'retry_delay': timedelta(minutes=5),
}
dag = DAG(
    'Airiflow_MC', default_args=default_args, schedule_interval=timedelta(seconds=30))
#打印时间
def get_time():
    print '当前时间是{}'.format(time.time())
    return time.time()
#执行MaxCompute的查询任务
def mc_job ():
    #MaxCompute参数设置
    options.sql.settings = {'options.tunnel.limit_instance_tunnel': False, 'odps.sql.allow.fullscan': True}
    
    odps = ODPS('**your-access-id**', '**your-secret-access-key**', '**your-default-project**',endpoint='**your-end-point**')
    # project = odps.get_project('my_project')  # 取到某个项目。
    project = odps.get_project()  # 取到默认项目。
    # 获取表。
    # t = odps.get_table('tableName')
    # 接受传入的分区参数。
    with odps.execute_sql('select * from tableName').open_reader() as reader:
        count = reader.count
    print("查询表数据条数:{}".format(count))
    for record in reader:
        print record
    return count
t1 = PythonOperator (
    task_id = 'get_time' ,
    provide_context = False ,
    python_callable = get_Time ,
    dag = dag )

t2 = PythonOperator (
    task_id = 'mc_job' ,
    provide_context = False ,
    python_callable = mc_job ,
    dag = dag )
t2.set_upstream(t1)

2.提交

python Airiflow_MC.py

3.进行测试

# print the list of active DAGs
airflow list_dags

# prints the list of tasks the "tutorial" dag_id
airflow list_tasks Airiflow_MC

# prints the hierarchy of tasks in the tutorial DAG
airflow list_tasks Airiflow_MC --tree
#测试task
airflow test Airiflow_MC get_time 2010-01-16
airflow test Airiflow_MC mc_job 2010-01-16

4.运行调度任务
登录到web界面点击按钮运行

5.查看任务运行结果
1.点击view log
image.png

2.查看结果

image.png

相关实践学习
简单用户画像分析
本场景主要介绍基于海量日志数据进行简单用户画像分析为背景,如何通过使用DataWorks完成数据采集 、加工数据、配置数据质量监控和数据可视化展现等任务。
SaaS 模式云数据仓库必修课
本课程由阿里云开发者社区和阿里云大数据团队共同出品,是SaaS模式云原生数据仓库领导者MaxCompute核心课程。本课程由阿里云资深产品和技术专家们从概念到方法,从场景到实践,体系化的将阿里巴巴飞天大数据平台10多年的经过验证的方法与实践深入浅出的讲给开发者们。帮助大数据开发者快速了解并掌握SaaS模式的云原生的数据仓库,助力开发者学习了解先进的技术栈,并能在实际业务中敏捷的进行大数据分析,赋能企业业务。 通过本课程可以了解SaaS模式云原生数据仓库领导者MaxCompute核心功能及典型适用场景,可应用MaxCompute实现数仓搭建,快速进行大数据分析。适合大数据工程师、大数据分析师 大量数据需要处理、存储和管理,需要搭建数据仓库?学它! 没有足够人员和经验来运维大数据平台,不想自建IDC买机器,需要免运维的大数据平台?会SQL就等于会大数据?学它! 想知道大数据用得对不对,想用更少的钱得到持续演进的数仓能力?获得极致弹性的计算资源和更好的性能,以及持续保护数据安全的生产环境?学它! 想要获得灵活的分析能力,快速洞察数据规律特征?想要兼得数据湖的灵活性与数据仓库的成长性?学它! 出品人:阿里云大数据产品及研发团队专家 产品 MaxCompute 官网 https://www.aliyun.com/product/odps 
目录
相关文章
|
7月前
|
分布式计算 Java 大数据
大数据常用调度平台
大数据常用调度平台
196 0
|
存储 SQL 消息中间件
大数据生态圈常用组件(一):数据库、查询引擎、ETL工具、调度工具等
大数据生态圈常用组件(一):数据库、查询引擎、ETL工具、调度工具等
|
2月前
|
存储 分布式计算 调度
MaxCompute资源问题之删除调度资源如何解决
MaxCompute资源指的是在MaxCompute项目中使用的计算资源和存储资源;本合集旨在向用户展示如何高效管理MaxCompute资源,包括资源包管理、配额调整和性能优化等方面。
30 0
|
4月前
|
Oracle 关系型数据库 大数据
助力工业物联网,工业大数据之服务域:Shell调度测试【三十三】
助力工业物联网,工业大数据之服务域:Shell调度测试【三十三】
20 1
|
4月前
|
分布式计算 大数据 调度
大数据计算MaxCompute怎么将一个Quota的资源优先供给给标准模式的生产库调度使用?
大数据计算MaxCompute怎么将一个Quota的资源优先供给给标准模式的生产库调度使用?
33 2
|
4月前
|
资源调度 分布式计算 大数据
【云计算与大数据技术】资源管理、调度模型策略的讲解
【云计算与大数据技术】资源管理、调度模型策略的讲解
102 0
|
4月前
|
分布式计算 DataWorks 调度
DataWorks在绑定MaxCompute并进行周期性调度前
DataWorks在绑定MaxCompute并进行周期性调度前
37 3
|
7月前
|
存储 大数据 调度
亚信AISWare DataOS大数据中台套件介绍02——开发程序及调度上下线
DataOS是一款汇总大部分大数据套件的企业型中台,可以满足企业大部分对大数据存储、计算、稽核的需求。但是平台使用体检并不好,所以有条件还是推荐使用阿里云的大数据组件
224 1
|
11月前
|
算法 大数据 数据处理
大数据开发基础的操作系统的内存管理和调度
在大数据开发中,操作系统的内存管理和调度是非常重要的概念。这些概念可以帮助我们更好地理解计算机系统的工作原理和方式,并且对于实现高效的大数据处理和传输具有重要的意义。以下是这些概念的简要介绍。
68 0
|
大数据 调度
《大数据在物流行业应用突破——大规模云上调度实践》电子版地址
大数据在物流行业应用突破——大规模云上调度实践
54 0
《大数据在物流行业应用突破——大规模云上调度实践》电子版地址

相关产品

  • 云原生大数据计算服务 MaxCompute