1. 前言
Schedulerx2.0的客户端提供分布式执行、多种任务类型、统一日志等框架,用户只要依赖schedulerx-worker这个jar包,通过schedulerx2.0提供的编程模型,简单几行代码就能实现一套高可靠可运维的分布式执行引擎。
这篇文章重点是介绍基于schedulerx2.0的分布式执行引擎原理和最佳实践,相信看完这篇文章,大家都能写出高效率的分布式作业,说不定速度能提升好几倍:)
2. 可扩展的执行引擎
Worker总体架构参考Yarn的架构,分为TaskMaster, Container, Processor三层:
- TaskMaster:类似于yarn的AppMaster,支持可扩展的分布式执行框架,进行整个jobInstance的生命周期管理、container的资源管理,同时还有failover等能力。默认实现StandaloneTaskMaster(单机执行),BroadcastTaskMaster(广播执行),MapTaskMaster(并行计算、内存网格、网格计算),MapReduceTaskMaster(并行计算、内存网格、网格计算)。
- Container:执行业务逻辑的容器框架,支持线程/进程/docker/actor等。
- Processor:业务逻辑框架,不同的processor表示不同的任务类型。
以MapTaskMaster为例,大概的原理如下图所示:
3. 分布式编程模型之Map模型
Schedulerx2.0提供了多种分布式编程模型,这篇文章主要介绍Map模型(之后的文章还会介绍MapReduce模型,适用更多的业务场景),简单几行代码就可以将海量数据分布式到多台机器上进行分布式跑批,非常简单易用。
针对不同的跑批场景,map模型作业还提供了并行计算、内存网格、网格计算三种执行方式:
- 并行计算:子任务300以下,有子任务列表。
- 内存网格:子任务5W以下,无子任务列表,速度快。
- 网格计算:子任务100W以下,无子任务列表。
4. 并行计算原理
因为并行任务具有子任务列表:
如上图,子任务列表可以看到每个子任务的状态、机器,还有重跑、查看日志等操作。
因为并行计算要做到子任务级别的可视化,并且worker挂了、重启还能支持手动重跑,就需要把task持久化到server端:
如上图所示:
- server触发jobInstance到某个worker,选中为master。
- MapTaskMaster选择某个worker执行root任务,当执行map方法时,会回调MapTaskMaster。
- MapTaskMaster收到map方法,会把task持久化到server端。
- 同时,MapTaskMaster还有个pull线程,不停拉取INIT状态的task,并派发给其他worker执行。
5. 网格计算原理
网格计算要支持百万级别的task,如果所有任务都往server回写,server肯定扛不住,所以网格计算的存储实际上是分布式在用户自己的机器上的:
如上图所示:
- server触发jobInstance到某个worker,选中为master。
- MapTaskMaster选择某个worker执行root任务,当执行map方法时,会回调MapTaskMaster。
- MapTaskMaster收到map方法,会把task持久化到本地h2数据库。
- 同时,MapTaskMaster还有个pull线程,不停拉取INIT状态的task,并派发给其他worker执行。
6. 最佳实践
6.1 需求
举个例子:
- 读取A表中status=0的数据。
- 处理这些数据,插入B表。
- 把A表中处理过的数据的修改status=1。
- 数据量有4亿+,希望缩短时间。
6.2 反面案例
我们先看下如下代码是否有问题?
public class ScanSingleTableProcessor extends MapJobProcessor {
private static int pageSize = 1000;
@Override
public ProcessResult process(JobContext context) {
String taskName = context.getTaskName();
Object task = context.getTask();
if (WorkerConstants.MAP_TASK_ROOT_NAME.equals(taskName)) {
int recordCount = queryRecordCount();
int pageAmount = recordCount / pageSize;//计算分页数量
for(int i = 0 ; i < pageAmount ; i ++) {
List<Record> recordList = queryRecord(i);//根据分页查询一页数据
map(recordList, "record记录");//把子任务分发出去并行处理
}
return new ProcessResult(true);//true表示执行成功,false表示失败
} else if ("record记录".equals(taskName)) {
//TODO
return new ProcessResult(true);
}
return new ProcessResult(false);
}
}
如上面的代码所示,在root任务中,会把数据库所有记录读取出来,每一行就是一个Record,然后分发出去,分布式到不同的worker上去执行。逻辑是没有问题的,但是实际上性能非常的差。结合网格计算原理,我们把上面的代码绘制成下面这幅图:
如上图所示,root任务一开始会全量的读取A表的数据,然后会全量的存到h2中,pull线程还会全量的从h2读取一次所有的task,还会分发给所有客户端。所以实际上对A表中的数据:
- 全量读2次
- 全量写一次
- 全量传输一次
这个效率是非常低的。
6.3 正面案例
下面给出正面案例的代码:
public class ScanSingleTableJobProcessor extends MapJobProcessor {
private static final int pageSize = 100;
static class PageTask {
private int startId;
private int endId;
public PageTask(int startId, int endId) {
this.startId = startId;
this.endId = endId;
}
public int getStartId() {
return startId;
}
public int getEndId() {
return endId;
}
}
@Override
public ProcessResult process(JobContext context) {
String taskName = context.getTaskName();
Object task = context.getTask();
if (taskName.equals(WorkerConstants.MAP_TASK_ROOT_NAME)) {
System.out.println("start root task");
Pair<Integer, Integer> idPair = queryMinAndMaxId();
int minId = idPair.getFirst();
int maxId = idPair.getSecond();
List<PageTask> taskList = Lists.newArrayList();
int step = (int) ((maxId - minId) / pageSize); //计算分页数量
for (int i = minId; i < maxId; i+=step) {
taskList.add(new PageTask(i, (i+step > maxId ? maxId : i+step)));
}
return map(taskList, "Level1Dispatch");
} else if (taskName.equals("Level1Dispatch")) {
PageTask record = (PageTask)task;
long startId = record.getStartId();
long endId = record.getEndId();
//TODO
return new ProcessResult(true);
}
return new ProcessResult(true);
}
@Override
public void postProcess(JobContext context) {
//TODO
System.out.println("all tasks is finished.");
}
private Pair<Integer, Integer> queryMinAndMaxId() {
//TODO select min(id),max(id) from xxx
return null;
}
}
如上面的代码所示,
- 每个task不是整行记录的record,而是PageTask,里面就2个字段,startId和endId。
- root任务,没有全量的读取A表,而是读一下整张表的minId和maxId,然后构造PageTask进行分页。比如task1表示PageTask[1,1000],task2表示PageTask[1001,2000]。每个task处理A表不同的数据。
- 在下一级task中,如果拿到的是PageTask,再根据id区间去A表处理数据。
根据上面的代码和网格计算原理,得出下面这幅图:
如上图所示,
- A表只需要全量读取一次。
- 子任务数量比反面案例少了上千、上万倍。
- 子任务的body非常小,如果recod中有大字段,也少了上千、上万倍。
综上,对A表访问次数少了好几倍,对h2存储压力少了上万倍,不但执行速度可以快很多,还保证不会把自己本地的h2数据库搞挂。