天任务依赖分钟任务-阿里云开发者社区

开发者社区> 大数据> 正文

天任务依赖分钟任务

简介: 本文主要是描述天任务依赖分钟任务的一个业务流程,数据同步每天5分钟跑一次,下游依赖一个天任务来计算今天所有同步任务的数据。

分钟依赖天调度

背景

每5分钟抽取一次数据,待每天0:00的同步任务抽取完成后,对当天总共288次同步任务抽取的所有数据进行计算。

实现思路

本次实验涉及的大数据产品有MaxCompute(大数据计算服务)DataWorks(数据工场,原大数据开发套件)

创建一个同步任务为上游,一个SQL为下游;
同步任务调度时间设置为每5分组调度一次(开始时间0:00,结束时间23:59,时间间隔5分钟),并配置[依赖上一周期-本节点]以形成自依赖;
SQL任务设置为每天0:00调度一次。

实现原理

在DataWorks调度系统中,下游对上游的依赖遵循原则为:下游任务生成的实例会找到当天离自己最近结束的一个上游实例作为上游依赖,如上游依赖实例运行成功,才会触发本节点实例运行。
如上游节点每天生成多个实例,则下游无法识别是哪一个实例离它最近结束,因此会导致必须等上游当天生成的所有实例运行完成后才会运行;因此,上游必须配置自依赖,SQL任务在0:00的实例才会准确依赖于0:00生成的同步任务实例结束后再运行。

前提条件

  1. 您在开始本次实验前,需要确保自己有阿里云云账号并已实名认证。详情请参见注册阿里云账号企业实名认证个人实名认证
  2. 创建数据库,准备源端数据和目标端表。

创建业务流程

  1. 登录阿里云官网,单击右上角的登录,填写您的阿里云账号和密码。
  2. 选择产品 >大数据 >大数据开发 >DataWorks >管理控制台 DataWorks管理控制台详情页,单击数据开发界面创建workshop业务流程,如下图:

1

分钟任务

上游:数据集成任务mysql同步到odps,根据过滤条件过滤每5分钟更新的数据,目标端的分区是根据定时时间的前5分钟创建的,保证所有数据都写到同一天的分区里,如下图设置:

2

过滤条件:insert_time>=${startTime}  and  insert_time<${endTime}
调度配置里给上面参数赋值:startTime=$[yyyymmddhh24miss-5/24/60] endTime=$[yyyymmddhh24miss]

例如:insert_time>=20190320000000 and  insert_time<20190320000500  
过滤出20号零点00分到05分钟的数据保存到目标端ds=20190320000000的分区里,
依次类推20号23点55分到21号零点的数据保存到ds=20190320235500的分区里。
这样一天的数据都能保存在20号的分区里。

字段映射:源端和目标端都创建了3列id、name、insert_time。insert_time为时间列这样数据过滤可以根据时间来过滤。

3

调度配置界面:给过滤条件里的参数赋值:startTime=$[yyyymmddhh24miss-5/24/60] endTime=$[yyyymmddhh24miss] 每个参数之间用空格隔开。


4

开始时间是从00:00点开始,间隔5分钟调度一次,为了保证一天实例都运行完,需要设置自依赖,如下图的依赖本节点操作:

5

注意:上面配置能保证将您一天产生的实例能依次运行完并且保存到odps表里同一天的分区里。
如果您的分钟任务里有一个调度任务出错,那你了设置自依赖后面的实例都不会运行,这就需要您这边手动处理掉。
避免碰到上面问题,您也可以在数据过滤里设置成:insert_time<${endTime},但是每次都是全量同步,只要有成功的也可以将endTime数据同步过去,也就不用设置自依赖。但是这样会给您的数据库造成巨大的压力。

天调度任务:

下游:workshop_odps是一个sql节点天任务,将workshop_odps_mi一天分区里的数据都过滤出来插到 workshop_odps_dd表中。如下图设置:

6

insert overwrite table workshop_odps_dd partition (ds=${yestoday})
select id, name,insert_time from workshop_odps_mi where ${startTime}<=ds and ds<${endTime};
调度配置里的赋值:
startTime=$[yyyymmddhh24miss-1] endTime=$[yyyymmddhh24miss] yestoday=$[yyyymmdd-1]
将workshop_odps_mi一天分区里的数据都过滤出来插到 workshop_odps_dd表中。
例如:insert overwrite table workshop_odps_dd partition (ds=20190320)
select id, name,insert_time from workshop_odps_mi where 20190320000000<=ds and ds<20190321000000;
因为天运行定时时间会在分钟任务运行完后,所以插入workshop_odps_dd分区ds时间要跟分钟任务时间在同一天,
您将ds时间减1即可。

调度配置界面:

7

运行结果

同步任务源端数据库的数据总条数,一共120000条

10


同步插入的数据总条数120000条,所以一天的总数据全部插入到相应的表中。

11

版权声明:本文中所有内容均属于阿里云开发者社区所有,任何媒体、网站或个人未经阿里云开发者社区协议授权不得转载、链接、转贴或以其他方式复制发布/发表。申请授权请邮件developerteam@list.alibaba-inc.com,已获得阿里云开发者社区协议授权的媒体、网站,在转载使用时必须注明"稿件来源:阿里云开发者社区,原文作者姓名",违者本社区将依法追究责任。 如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件至:developer2020@service.aliyun.com 进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容。

分享:
大数据
使用钉钉扫一扫加入圈子
+ 订阅

大数据计算实践乐园,近距离学习前沿技术

其他文章