阿里任务调度Schedulerx2.0之MapReduce模型

简介: 阿里巴巴任务调度Schedulerx2.0自研轻量级分布式模型MapReduce,可以进行大数据的实时/离线跑批。通过一个map方法就能将海量数据分布式到多台机器上执行,通过process方法处理子任务的业务,最后通过reduce方法可以获取所有子任务执行的状态和结果

简介

阿里巴巴任务调度Schedulerx2.0自研轻量级分布式模型MapReduce,可以进行大数据的实时/离线跑批。通过一个map方法就能将海量数据分布式到多台机器上执行,通过process方法处理子任务的业务,最后通过reduce方法可以获取所有子任务执行的状态和结果。常见场景,比如

  1. 电商领域:通过MapReduce模型不停轮询订单,进行订单确认。如果有订单超时未支付,则关闭订单。如果订单支付完成,并且各参数没问题,则更新订单状态为已完成。
  2. 物流领域:通过MapReduce模型不停扫描入仓的订单,通过订单的收件地址,进行拣货出库。
  3. IoT领域:通过MapReduce模型不停轮询所有设备的状态,如果发现有设备有故障,则汇报给主机,更新设备状态。


同时,MapReduce模型的任务,也可以结合工作流一起使用,通过reduce方法可以返回这次跑批的结果,进行工作流上下游数据传递。


对比大数据跑批的优势

速度快

大数据跑批,需要配合导入导出工具,先将传统数据库中的数据导入到大数据平台中,跑批结束后,再把结果导回数据库中,导入导出增加了很多时间开销。同时大部分大数据跑批(比如Hadoop的MapReduce模型)过程也比较慢,涉及到数据的拆分和中间结果的数据传输,比较耗时间。不适合用来做实时跑批。

Schedulerx2.0的轻量级MapReduce模型,可以直接操作用户的原始数据库数据,不涉及到数据的导入导出和中间结果的数据传输,可以作为实时业务的跑批。

数据安全

大数据跑批,需要首先将数据上传到大数据平台中,如果使用云厂商的大数据平台,用户往往担心数据安全问题。

Schedulerx2.0的跑批,不需要上传数据,计算节点也是用户自己的,没有任何安全问题。

成本低

大数据跑批,需要将数据上传到大数据平台,跑批过程消耗大数据的计算资源,需要为存储和计算成本买单。

Schedulerx2.0的跑批,不需要任何额外的存储和计算成本,只需要应用依赖一个jar包,即可以将应用自己的机器自建成一个分布式计算引擎,进行MapReduce模型的跑批。

编程简单

大数据跑批,需要学习大数据的知识,学习成本比较高。如果涉及到非常复杂的业务逻辑(比如需要比较多的条件判断和循环),无法通过大数据跑批解决。虽然大数据系统一般都有提供UDF,但是使用起来还是比较麻烦。

Schedulerx2.0的跑批,直接编写业务代码,兼容Spring原生语法,可以直接调用已经封装好的各种service代码,开发迅速,可读性高。


接口说明

使用MapReduce模型,只需要继承com.alibaba.schedulerx.worker.processor.MapReduceJobProcessor,该抽象类有如下接口须实现:

接口

是否必选

描述

public ProcessResult map(List<? extends Object> taskList, String taskName)

通过map方法分发子任务列表

public ProcessResult process(JobContext context) throws Exception

执行子任务的具体业务实现,通过JobContext可以拿到子任务的信息

public ProcessResult reduce(JobContext context)

所有子任务执行完,会回调reduce方法,可以在JobContext中拿到所有子任务的执行状态和结果

public void kill(JobContext context)

实现该方法,可以主动停止正在执行的子任务

public boolean runReduceIfFail(JobContext context)

如果有子任务失败,是否执行reduce方法,默认是


原理

Schedulerx2.0中,MapReduce模型只有一个reduce,所有子任务完成后会执行reduce方法,原理如下图所示:更多原理请参考

可以在reduce方法中返回该任务实例的执行结果,作为工作流的上下游数据传递。

Reduce方法也会通过ProcessResult返回任务状态,只有所有子任务和reduce都返回true,才算这次实例成功。

Demo

订单跑批

  1. 构建OrderInfo数据结构
packagecom.alibaba.schedulerx.test.info;
publicclassOrderInfo {
privateStringid;
privateintvalue;
publicOrderInfo(Stringid, intvalue) {
this.id=id;
this.value=value;
    }
publicStringgetId() {
returnid;
    }
publicvoidsetId(Stringid) {
this.id=id;
    }
publicintgetValue() {
returnvalue;
    }
publicvoidsetValue(intvalue) {
this.value=value;
    }
@OverridepublicStringtoString() {
return"OrderInfo [id="+id+", value="+value+"]";
    }
}
  1. 继承MapReduceProcessor,创建订单处理任务
packagecom.alibaba.schedulerx.test.processor;
importjava.util.ArrayList;
importjava.util.List;
importjava.util.Map;
importjava.util.Map.Entry;
importcom.alibaba.schedulerx.common.domain.TaskStatus;
importcom.alibaba.schedulerx.test.processor.TestMapReduceJobProcessor.OrderInfo;
importcom.alibaba.schedulerx.worker.domain.JobContext;
importcom.alibaba.schedulerx.worker.processor.MapReduceJobProcessor;
importcom.alibaba.schedulerx.worker.processor.ProcessResult;
publicclassTradeOrderJobextendsMapReduceJobProcessor {
@OverridepublicProcessResultprocess(JobContextcontext) {
StringtaskName=context.getTaskName();
if (isRootTask(context)) {
System.out.println("start root task");
List<OrderInfo>orderInfos=getOrderInfos();
returnmap(orderInfos, "OrderInfo");
        } elseif (taskName.equals("OrderInfo")) {
OrderInfoorderInfo= (OrderInfo)context.getTask();
System.out.print(orderInfo);
returnnewProcessResult(true, String.valueOf(orderInfo.getValue()));
        }
returnnewProcessResult(false);
    }
@OverridepublicProcessResultreduce(JobContextcontext) throwsException {
Map<Long, String>allTaskResults=context.getTaskResults();
Map<Long, TaskStatus>allTaskStatuses=context.getTaskStatuses();
longsum=0;
for (Entry<Long, String>entry : allTaskResults.entrySet()) {
System.out.println(entry.getKey() +":"+entry.getValue());
// 过滤根任务if (entry.getKey() ==0) {
continue;
            }
if (allTaskStatuses.get(entry.getKey()).equals(TaskStatus.SUCCESS)) {
sum+=Integer.valueOf(entry.getValue());
            }
        }
System.out.print("reduce: count="+sum);
returnnewProcessResult(true, String.valueOf(sum));
    }
/*** 获取订单列表,具体实现需要自己从数据中获取*/privateList<OrderInfo>getOrderInfos() {
List<OrderInfo>orderList=newArrayList<>();
for (inti=1; i<=20; i++) {
OrderInfoorderInfo=newOrderInfo("id_"+i, i);
orderList.add(orderInfo);
        }
returnorderList;
    }
}
  1. 注册2个worker实例,执行任务如下

可以看到如程序描述,构造了20个订单,分别由两个worker分布式执行,最后reduce汇总结果为1+2+...+20=210


在工作流中的集成使用

举个例子,比如一个工作流JobA->JobB->JobC。JobA和JobC是java任务单机执行,JobB是网格计算

MapReduce任务。代码如下:

publicclassTestJobAextendsJavaProcessor {
@OverridepublicProcessResultprocess(JobContextcontext) throwsException {
System.out.println("hello JobA");
returnnewProcessResult(true, String.valueOf(10));
    }
}


publicclassTestJobBextendsMapReduceJobProcessor {
@OverridepublicProcessResultprocess(JobContextcontext) {
StringexecutorName=context.getTaskName();
if (executorName.equals(WorkerConstants.MAP_TASK_ROOT_NAME)) {
System.out.println("start root task");
StringupstreamData=context.getUpstreamData().get(0).getData();
intdispatchNum=Integer.valueOf(upstreamData);
List<String>msgList=Lists.newArrayList();
for (inti=0; i<=dispatchNum; i++) {
msgList.add("msg_"+i);
            }
returnmap(msgList, "Level1Dispatch");
        } elseif (executorName.equals("Level1Dispatch")) {
Stringexecutor= (String)context.getTask();
System.out.println(executor);
returnnewProcessResult(true);
        }
returnnewProcessResult(false);
    }
publicProcessResultreduce(JobContextcontext) throwsException {
returnnewProcessResult(true, "520");
    }
}


publicclassTestJobCextendsJavaProcessor {
@OverridepublicProcessResultprocess(JobContextcontext) throwsException {
System.out.println("hello JobC");
StringupstreamData=context.getUpstreamData().get(0).getData();
System.out.print(upstreamData);
returnnewProcessResult(true);
    }
}

执行结果如下:

jobA输出了10,jobB产生了0~10个msg并通过reduce输出520,jobC打印520。


如何快速定位任务失败的原因

如果通过MapReduce跑批的业务有问题,如何快速在分布式应用中分析问题(比如订单跑批业务,如何快速分析出有问题的订单处)?

阿里任务调度SchedulerX提供了日志服务解决方案,可以搜集每次跑批的日志,通过控制台可以快速检索出问题的原因。以Demo-订单跑批为例,进行简单修改

packagecom.alibaba.schedulerx.test.processor;
importjava.util.ArrayList;
importjava.util.List;
importjava.util.Map;
importjava.util.Map.Entry;
importorg.apache.logging.log4j.LogManager;
importorg.apache.logging.log4j.Logger;
importcom.alibaba.schedulerx.common.domain.TaskStatus;
importcom.alibaba.schedulerx.test.processor.TestMapReduceJobProcessor.OrderInfo;
importcom.alibaba.schedulerx.worker.domain.JobContext;
importcom.alibaba.schedulerx.worker.processor.MapReduceJobProcessor;
importcom.alibaba.schedulerx.worker.processor.ProcessResult;
publicclassTradeOrderJobextendsMapReduceJobProcessor {
privatestaticfinalLoggerLOGGER=LogManager.getLogger("schedulerx");
@OverridepublicProcessResultprocess(JobContextcontext) {
StringtaskName=context.getTaskName();
if (isRootTask(context)) {
LOGGER.info("start root task");
List<OrderInfo>orderInfos=getOrderInfos();
returnmap(orderInfos, "OrderInfo");
        } elseif (taskName.equals("OrderInfo")) {
OrderInfoorderInfo= (OrderInfo)context.getTask();
//id_10这个订单,构造一个异常(1/0会抛异常)if (orderInfo.getId().equals("id_10")) {
inta=1/0;
            }
LOGGER.info(orderInfo);
returnnewProcessResult(true, String.valueOf(orderInfo.getValue()));
        }
returnnewProcessResult(false);
    }
@OverridepublicProcessResultreduce(JobContextcontext) throwsException {
Map<Long, String>allTaskResults=context.getTaskResults();
Map<Long, TaskStatus>allTaskStatuses=context.getTaskStatuses();
longsum=0;
for (Entry<Long, String>entry : allTaskResults.entrySet()) {
// 过滤根任务if (entry.getKey() ==0) {
continue;
            }
if (allTaskStatuses.get(entry.getKey()).equals(TaskStatus.SUCCESS)) {
sum+=Integer.valueOf(entry.getValue());
            }
        }
LOGGER.info("reduce: count="+sum);
returnnewProcessResult(true, String.valueOf(sum));
    }
/*** 获取订单列表,具体实现需要自己从数据中获取*/privateList<OrderInfo>getOrderInfos() {
List<OrderInfo>orderList=newArrayList<>();
for (inti=1; i<=20; i++) {
OrderInfoorderInfo=newOrderInfo("id_"+i, i);
orderList.add(orderInfo);
        }
returnorderList;
    }
}

通过任务管理->历史记录,可以看到最近执行的历史,发现任务执行失败了

点击日志,可以看到这次跑批的所有日志

通过关键字搜索"ERROR",可以快速定位到失败的原因

相关实践学习
简单用户画像分析
本场景主要介绍基于海量日志数据进行简单用户画像分析为背景,如何通过使用DataWorks完成数据采集 、加工数据、配置数据质量监控和数据可视化展现等任务。
SaaS 模式云数据仓库必修课
本课程由阿里云开发者社区和阿里云大数据团队共同出品,是SaaS模式云原生数据仓库领导者MaxCompute核心课程。本课程由阿里云资深产品和技术专家们从概念到方法,从场景到实践,体系化的将阿里巴巴飞天大数据平台10多年的经过验证的方法与实践深入浅出的讲给开发者们。帮助大数据开发者快速了解并掌握SaaS模式的云原生的数据仓库,助力开发者学习了解先进的技术栈,并能在实际业务中敏捷的进行大数据分析,赋能企业业务。 通过本课程可以了解SaaS模式云原生数据仓库领导者MaxCompute核心功能及典型适用场景,可应用MaxCompute实现数仓搭建,快速进行大数据分析。适合大数据工程师、大数据分析师 大量数据需要处理、存储和管理,需要搭建数据仓库?学它! 没有足够人员和经验来运维大数据平台,不想自建IDC买机器,需要免运维的大数据平台?会SQL就等于会大数据?学它! 想知道大数据用得对不对,想用更少的钱得到持续演进的数仓能力?获得极致弹性的计算资源和更好的性能,以及持续保护数据安全的生产环境?学它! 想要获得灵活的分析能力,快速洞察数据规律特征?想要兼得数据湖的灵活性与数据仓库的成长性?学它! 出品人:阿里云大数据产品及研发团队专家 产品 MaxCompute 官网 https://www.aliyun.com/product/odps&nbsp;
目录
相关文章
|
4月前
|
存储 分布式计算 负载均衡
【大数据技术Hadoop+Spark】MapReduce概要、思想、编程模型组件、工作原理详解(超详细)
【大数据技术Hadoop+Spark】MapReduce概要、思想、编程模型组件、工作原理详解(超详细)
59 0
|
6天前
|
机器学习/深度学习 分布式计算 监控
面经:MapReduce编程模型与优化策略详解
【4月更文挑战第10天】本文是关于MapReduce在大数据处理中的关键作用的博客摘要。作者分享了面试经验,强调了MapReduce的基本原理、Hadoop API、优化策略和应用场景。MapReduce包含Map和Reduce两个主要阶段,Map阶段处理输入数据生成中间键值对,Reduce阶段进行聚合计算。面试重点包括理解MapReduce工作流程、使用Hadoop API编写Map/Reduce函数、选择优化策略(如分区、Combiner和序列化)以及应用场景,如日志分析和机器学习。
18 2
|
分布式计算 数据挖掘 Java
MapReduce 基础模型|学习笔记
快速学习 MapReduce 基础模型
140 0
MapReduce 基础模型|学习笔记
|
分布式计算 Python
Python实现一个最简单的MapReduce编程模型WordCount
Python实现一个最简单的MapReduce编程模型WordCount
114 0
|
分布式计算 资源调度 并行计算
|
分布式计算 大数据 Java
MapReduce 编程模型 & WordCount 示例(下)
之前在学习大数据的时候,很多东西很零散的做了一些笔记,但是都没有好好去整理它们,这篇文章也是对之前的笔记的整理,或者叫输出吧。一来是加深自己的理解,二来是希望这些东西能帮助想要学习大数据或者说正在学习大数据的朋友。如果你看到里面的东西,让你知道了它,这也是一种进步嘛。说不定就开启了你的另一扇大门呢?
MapReduce 编程模型 & WordCount 示例(下)
|
存储 分布式计算 大数据
MapReduce 编程模型 & WordCount 示例(上)
之前在学习大数据的时候,很多东西很零散的做了一些笔记,但是都没有好好去整理它们,这篇文章也是对之前的笔记的整理,或者叫输出吧。一来是加深自己的理解,二来是希望这些东西能帮助想要学习大数据或者说正在学习大数据的朋友。如果你看到里面的东西,让你知道了它,这也是一种进步嘛。说不定就开启了你的另一扇大门呢?
MapReduce 编程模型 & WordCount 示例(上)
|
分布式计算 自然语言处理 Hadoop
【云计算 Hadoop】Hadoop 版本 生态圈 MapReduce模型(二)
【云计算 Hadoop】Hadoop 版本 生态圈 MapReduce模型(二)
119 0
【云计算 Hadoop】Hadoop 版本 生态圈 MapReduce模型(二)
|
存储 分布式计算 资源调度
【云计算 Hadoop】Hadoop 版本 生态圈 MapReduce模型(一)
【云计算 Hadoop】Hadoop 版本 生态圈 MapReduce模型(一)
171 0
【云计算 Hadoop】Hadoop 版本 生态圈 MapReduce模型(一)
|
分布式计算 数据挖掘 Java
MapReduce 基础模型|学习笔记
快速学习 MapReduce 基础模型
129 0
MapReduce 基础模型|学习笔记

相关实验场景

更多