Schedulerx2.0支持MapReduce模型-阿里云开发者社区

开发者社区> 黄晓萌> 正文

Schedulerx2.0支持MapReduce模型

简介: 1. 前言 Schedulerx2.0提供了map模型,通过一个map方法就能将海量数据分布式到多台机器上分布式执行,随着业务方的深入使用,又提出了更多的需求,比如: 监听所有子任务完成的事件 处理所有子任务返回的订单号 汇总结果进行工作流数据传输 2. 简介 MapReduce模型是Map模型的扩展,废弃了postProcess方法,新增reduce接口,需要实现MapReduceJobProcessor。
+关注继续查看

1. 前言

Schedulerx2.0提供了map模型,通过一个map方法就能将海量数据分布式到多台机器上分布式执行,随着业务方的深入使用,又提出了更多的需求,比如:

  • 监听所有子任务完成的事件
  • 处理所有子任务返回的订单号
  • 汇总结果进行工作流数据传输

2. 简介

MapReduce模型是Map模型的扩展,废弃了postProcess方法,新增reduce接口,需要实现MapReduceJobProcessor。

MapReduce模型只有一个reduce,所有子任务完成后会执行reduce方法,可以在reduce方法中返回该任务实例的执行结果,作为工作流的上下游数据传递。如果有子任务失败,reduce不会执行。

MapReduce模型,还能处理所有子任务的结果。子任务通过return ProcessResult(true, result)返回结果(比如返回订单号),reduce的时候,可以通过context拿到所有子任务的结果,进行相应的处理,不需要业务方自己做存储。注意:所有子任务结果会缓存在master节点,对内存有压力,建议子任务个数和result不要太大。

3. 接口

  • public ProcessResult process(JobContext context) throws Exception; (必选)
  • public ProcessResult map(List<? extends Object> taskList, String taskName); (必选)
  • public ProcessResult reduce(JobContext context); (必选)
  • public void kill(JobContext context); (可选)

4. 执行方式

和map模型一样,MapReduce模型,也支持如下执行方式:

  • 并行计算:支持子任务300以下,有子任务列表。
  • 内存网格:基于内存计算,子任务5W以下,速度快。
  • 网格计算:基于文件计算,子任务100W以下。

5. 原理

Schedulerx2.0中,MapReduce模型只有一个reduce,所有子任务完成后会执行reduce方法,原理如下图所示:
image
可以在reduce方法中返回该任务实例的执行结果,作为工作流的上下游数据传递。
Reduce方法也会通过ProcessResult返回任务状态,只有所有子任务和reduce都返回true,才算这次实例成功。

6. 最佳实践

6.1 通过mapreduce进行工作流上下游传递

下面我举个例子,比如一个工作流JobA->JobB->JobC。JobA和JobC是java任务单机执行,JobB是网格计算MapReduce任务。代码如下:

public class TestJobA extends JavaProcessor {

    @Override
    public ProcessResult process(JobContext context) throws Exception {
        System.out.println("hello JobA");
        return new ProcessResult(true, String.valueOf(10));
    }

}
public class TestJobB extends MapReduceJobProcessor {

    @Override
    public ProcessResult process(JobContext context) {
        String executorName = context.getTaskName();
        if (isRootTask(context)) {
            System.out.println("start root task");
            String upstreamData = context.getUpstreamData().get(0).getData();
            int dispatchNum = Integer.valueOf(upstreamData);
            List<String> msgList = Lists.newArrayList();
            for (int i = 0; i <= dispatchNum; i++) {
                msgList.add("msg_" + i);
            }
            return map(msgList, "Level1Dispatch");
        } else if (executorName.equals("Level1Dispatch")) {
            String executor = (String)context.getTask();
            System.out.println(executor);
            return new ProcessResult(true);
        }

        return new ProcessResult(false);
    }

    public ProcessResult reduce(JobContext context) throws Exception {
        return new ProcessResult(true, "520");
    }

}
public class TestJobC extends JavaProcessor {

    @Override
    public ProcessResult process(JobContext context) throws Exception {
        System.out.println("hello JobC");
        String upstreamData = context.getUpstreamData().get(0).getData();
        System.out.print(upstreamData);
        return new ProcessResult(true);
    }

}

执行结果如下:
image
jobA输出了10,jobB产生了0~10个msg并通过reduce输出520,jobC打印520。

6.2 Mapreduce处理所有子任务结果,由reduce汇总

@Component
public class TestMapReduceJobProcessor extends MapReduceJobProcessor {

    @Override
    public ProcessResult process(JobContext context) throws Exception {
        String taskName = context.getTaskName();
        int dispatchNum = 10;
        if (context.getJobParameters() != null) {
            dispatchNum = Integer.valueOf(context.getJobParameters());
        }
        if (isRootTask(context)) {
            System.out.println("start root task");
            List<String> msgList = Lists.newArrayList();
            for (int i = 0; i <= dispatchNum; i++) {
                msgList.add("msg_" + i);
            }
            return map(msgList, "Level1Dispatch");
        } else if (taskName.equals("Level1Dispatch")) {
            String task = (String)context.getTask();
            Thread.sleep(2000);
            return new ProcessResult(true, task);
        }

        return new ProcessResult(false);
    }

    @Override
    public ProcessResult reduce(JobContext context) throws Exception {
        for (Entry<Long, String> result : context.getTaskResults().entrySet()) {
            System.out.println("taskId:" + result.getKey() + ", result:" + result.getValue());
        }
        return new ProcessResult(true, "TestMapReduceJobProcessor.reduce");
    }

}

reduce执行结果如下:

taskId:0, result:
taskId:1, result:msg_0
taskId:2, result:msg_1
taskId:3, result:msg_2
taskId:4, result:msg_3
taskId:5, result:msg_4
taskId:6, result:msg_5
taskId:7, result:msg_6
taskId:8, result:msg_7
taskId:9, result:msg_8
taskId:10, result:msg_9
taskId:11, result:msg_10

taskId=0表示的是根节点,一般不会返回结果,不需要管。

版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。

相关文章
Fundebug微信小程序BUG监控服务支持Source Map
摘要: 自动还原真实出错位置,快速修复BUG。 Source Map功能 微信小程序的Source Map功能目前只在 iOS 6.7.2 及以上版本支持。 微信小程序在打包时,会将所有 js 代码打包成一个文件,从而减少体积,加快访问速度。
1209 0
阿里云服务器端口号设置
阿里云服务器初级使用者可能面临的问题之一. 使用tomcat或者其他服务器软件设置端口号后,比如 一些不是默认的, mysql的 3306, mssql的1433,有时候打不开网页, 原因是没有在ecs安全组去设置这个端口号. 解决: 点击ecs下网络和安全下的安全组 在弹出的安全组中,如果没有就新建安全组,然后点击配置规则 最后如上图点击添加...或快速创建.   have fun!  将编程看作是一门艺术,而不单单是个技术。
4398 0
StreamingPro 支持Spark Structured Streaming
Structured Streaming 的文章参考这里: Spark 2.0 Structured Streaming 分析。2.0的时候只是把架子搭建起来了,当时也只支持FileSource(监控目录增量文件),到2.0.2后支持Kafka了,也就进入实用阶段了,目前只支持0.10的Kafka。
1795 0
E-MapReduce的Presto组件默认支持访问oss数据
阿里云E-MapReduce从EMR-2.1.0版本镜像开始,Presto组件默认就支持访问oss数据了,不再需要引导操作额外支持。
2277 0
Schedulerx2.0支持多语言版本的分片模型
1. 简介 任务调度系统可以对多种任务进行调度(定时、编排、重刷历史数据等),有些任务调度系统还提供了分布式任务,帮助用户解决大数据处理的难题。分布式任务主要分为静态分片和动态分片。 1.1 静态分片 主要场景是处理固定的分片数,比如分库分表固定1024张表,需要若干台机器分布式去处理。
2614 0
StreamingPro添加Scala script 模块支持
SQL 在解析字符串方面,能力还是有限,因为支持的算子譬如substring,split等有限,且不具备复杂的流程表达能力。我们内部有个通过JSON描述的DSL引擎方便配置化解析,然而也有一定的学习时间成本。
1951 0
+关注
黄晓萌
新一代分布式任务调度Schedulerx2.0的发起者
12
文章
0
问答
来源圈子
更多
文章排行榜
最热
最新
相关电子书
更多
文娱运维技术
立即下载
《SaaS模式云原生数据仓库应用场景实践》
立即下载
《看见新力量:二》电子书
立即下载