Schedulerx2.0工作流支持数据传输

本文涉及的产品
数据传输服务 DTS,数据同步 small 3个月
推荐场景:
数据库上云
数据传输服务 DTS,数据迁移 small 3个月
推荐场景:
MySQL数据库上云
数据传输服务 DTS,数据同步 1个月
简介: 1. 前言Schedulerx2.0是阿里中间件自研的基于akka架构的新一代分布式任务调度平台,提供定时、任务编排、分布式跑批等功能,具有高可靠、海量任务、秒级调度等能力。Schedulerx2.0提供可视化的工作流进行任务编排,该文章将详细介绍如何使用schedulerx2.0的工作流进行上下游任务的数据传输。

1. 前言

Schedulerx2.0是阿里中间件自研的基于akka架构的新一代分布式任务调度平台,提供定时、任务编排、分布式跑批等功能,具有高可靠、海量任务、秒级调度等能力。

Schedulerx2.0提供可视化的工作流进行任务编排,该文章将详细介绍如何使用schedulerx2.0的工作流进行上下游任务的数据传输。

2. 接口介绍

2.1 支持的执行方式和任务类型

当前只有java任务支持数据传输,网格计算请使用MapReduce模型进行数据传输。

2.2 返回执行结果

/**
 *
 * @param status
 * @param result, the size should less than 1000 bytes
 * @throws Exception
 */
public ProcessResult(boolean status, String result) throws Exception;

在Processor结尾,通过该结果替代ProcessResult(boolean status),可以返回执行结果。

result的长度不能超过1000个字节(注意,不是String的长度,如果有中文字符,可能会超过1000个字节!),如果超过1000个字节,任务会失败。

2.3 获取上游数据

List<JobInstanceData> upstreamDatas = JobContext.getUpstreamData();

在Processor里,可以通过该接口从JobContext中拿到上游的数据。上游的数据是一个list(可能有多个父节点),JobInstanceData里有两个属性,分别是jobName和data(String类型)。

3. Demo演示

首先我们写三个jobProcessor

public class TestSimpleJobA extends JavaProcessor {
    @Override
    public ProcessResult process(JobContext context) throws Exception {
        System.out.println("TestSimpleJobA " + DateTime.now().toString("yyyy-MM-dd HH:mm:ss"));
        return new ProcessResult(true, String.valueOf(1));
    }
}
public class TestSimpleJobB extends JavaProcessor {
    @Override
    public ProcessResult process(JobContext context) throws Exception {
        System.out.println("TestSimpleJobB " + DateTime.now().toString("yyyy-MM-dd HH:mm:ss"));
        return new ProcessResult(true, String.valueOf(2));
    }
}
public class TestSimpleJobC extends JavaProcessor {

    @Override
    public ProcessResult process(JobContext context) throws Exception {
        List<JobInstanceData> upstreamDatas = context.getUpstreamData();
        int sum = 0;
        for (JobInstanceData jobInstanceData : upstreamDatas) {
            System.out.println("jobName=" + jobInstanceData.getJobName() + ", data=" + jobInstanceData.getData());
            sum += Integer.valueOf(jobInstanceData.getData());
        }
        System.out.println("TestSimpleJobC sum=" + sum);
        return new ProcessResult(true, String.valueOf(sum));
    }

}

通过控制台配置工作流如下图所示
image

触发一次该工作流,然后进入工作流实例图,右键jobA的实例,进入详情,可以看到jobA实例结果=1,如下图
image
同理,可以看到jobB的实例结果=2, jobC的实例结果=3

控制台也能看到jobC的机器打印

jobName=jobB, data=2
jobName=jobA, data=1
TestSimpleJobC sum=3
相关实践学习
如何在云端创建MySQL数据库
开始实验后,系统会自动创建一台自建MySQL的 源数据库 ECS 实例和一台 目标数据库 RDS。
Sqoop 企业级大数据迁移方案实战
Sqoop是一个用于在Hadoop和关系数据库服务器之间传输数据的工具。它用于从关系数据库(如MySQL,Oracle)导入数据到Hadoop HDFS,并从Hadoop文件系统导出到关系数据库。 本课程主要讲解了Sqoop的设计思想及原理、部署安装及配置、详细具体的使用方法技巧与实操案例、企业级任务管理等。结合日常工作实践,培养解决实际问题的能力。本课程由黑马程序员提供。
目录
相关文章
|
分布式计算 并行计算 数据库
Schedulerx2.0分布式计算原理&最佳实践
1. 前言 Schedulerx2.0的客户端提供分布式执行、多种任务类型、统一日志等框架,用户只要依赖schedulerx-worker这个jar包,通过schedulerx2.0提供的编程模型,简单几行代码就能实现一套高可靠可运维的分布式执行引擎。
23588 2
|
11月前
|
缓存 Cloud Native 调度
Fluid支持分层数据缓存本地性调度(Tiered Locality Scheduling)
依赖容器化带来的高效部署、敏捷迭代,以及云计算在资源成本和弹性扩展方面的天然优势,以 Kubernetes 为代表的云原生编排框架吸引着越来越多的 AI 与大数据应用在其上部署和运行。但是数据密集型应用计算框架的设计理念和云原生灵活的应用编排的分歧,导致了数据访问和计算瓶颈。 CNCF开源项目Fluid作为 AI 与大数据云原生应用提供一层高效便捷的数据抽象,将数据从存储抽象出来,针对具体的场景(比如大模型),加速计算访问数据。
865 0
|
资源调度 分布式计算 运维
阿里巴巴任务调度SchedulerX支持一次性任务
阿里巴巴任务调度SchedulerX2.0支持一次性任务
1225 2
|
资源调度 运维 DataWorks
阿里分布式任务调度SchedulerX2.0支持Dataworks任务
在实际业务场景中业务处理往往依赖前置数据准备,目前在分布式任务调度平台上可进行dataworks任务数据处理与业务数据处理任务依赖编排定时调度。
1206 1
|
资源调度 分布式计算 自然语言处理
EDAS之分布式任务调度SchedulerX系列文章
分布式任务调度SchedulerX2.0文章列表总览
594 1
|
运维 资源调度 监控
阿里巴巴任务调度SchedulerX兼容ElasticJob
阿里巴巴任务调度SchedulerX2.0兼容开源ElasticJob任务接口,用户不需要修改一行代码,即可以将ElasticJob任务在SchedulerX2.0平台上托管,享有低成本、免运维、可视化、报警监控等能力。
984 0
阿里巴巴任务调度SchedulerX兼容ElasticJob
|
缓存 资源调度 运维
SchedulerX 如何帮助用户解决分布式任务调度难题?
本文分别对任务调度平台的资源定义、可视化管控能力、分布式批处理能力进行了简述,并基于 SchedulerX 的能力结合实际业务场景提供了一些基础参考案例。希望通过上述内容能让大家方便地熟悉任务调度平台接入使用概况,对于现有用户也可结合自身团队特点进行平台资源管控隔离,以及在产品业务量增长后通过分布式批处理能力来提升处理效率。
SchedulerX 如何帮助用户解决分布式任务调度难题?
|
资源调度 监控 数据可视化
阿里巴巴任务调度SchedulerX支持日志服务
阿里巴巴任务调度SchedulerX2.0的日志服务,可以让业务方不需要修改一行代码,只需要增加一个log4j2/logback的配置,即可将每次任务调度的框架日志和业务日志进行收集,同时提供白屏日志检索功能,可以通过任务调度平台快速定位任务失败的原因。
1171 0
|
资源调度 分布式计算 运维
SchedulerX2.0支持一次性任务
SchedulerX2.0支持一次性任务
519 0
|
缓存 资源调度 分布式计算
阿里云分布式任务调度SchedulerX2.0正式商业化
Schedulerx2.0在公有云公测2年,服务超过1000家公司,积累了丰富的经验,稳定性也得到了足够的验证。为了提供更优质的服务,于2021.9.1正式商业化,同时也会带来更加强大的能力
1747 0