Schedulerx2.0分布式计算原理&最佳实践

简介: 1. 前言 Schedulerx2.0的客户端提供分布式执行、多种任务类型、统一日志等框架,用户只要依赖schedulerx-worker这个jar包,通过schedulerx2.0提供的编程模型,简单几行代码就能实现一套高可靠可运维的分布式执行引擎。

1. 前言

Schedulerx2.0的客户端提供分布式执行、多种任务类型、统一日志等框架,用户只要依赖schedulerx-worker这个jar包,通过schedulerx2.0提供的编程模型,简单几行代码就能实现一套高可靠可运维的分布式执行引擎。

这篇文章重点是介绍基于schedulerx2.0的分布式执行引擎原理和最佳实践,相信看完这篇文章,大家都能写出高效率的分布式作业,说不定速度能提升好几倍:)

2. 可扩展的执行引擎

Worker总体架构参考Yarn的架构,分为TaskMaster, Container, Processor三层:

image

  • TaskMaster:类似于yarn的AppMaster,支持可扩展的分布式执行框架,进行整个jobInstance的生命周期管理、container的资源管理,同时还有failover等能力。默认实现StandaloneTaskMaster(单机执行),BroadcastTaskMaster(广播执行),MapTaskMaster(并行计算、内存网格、网格计算),MapReduceTaskMaster(并行计算、内存网格、网格计算)。
  • Container:执行业务逻辑的容器框架,支持线程/进程/docker/actor等。
  • Processor:业务逻辑框架,不同的processor表示不同的任务类型。

以MapTaskMaster为例,大概的原理如下图所示:

image

3. 分布式编程模型之Map模型

Schedulerx2.0提供了多种分布式编程模型,这篇文章主要介绍Map模型(之后的文章还会介绍MapReduce模型,适用更多的业务场景),简单几行代码就可以将海量数据分布式到多台机器上进行分布式跑批,非常简单易用。

针对不同的跑批场景,map模型作业还提供了并行计算、内存网格、网格计算三种执行方式:

  • 并行计算:子任务300以下,有子任务列表。
  • 内存网格:子任务5W以下,无子任务列表,速度快。
  • 网格计算:子任务100W以下,无子任务列表。

4. 并行计算原理

因为并行任务具有子任务列表:

image
如上图,子任务列表可以看到每个子任务的状态、机器,还有重跑、查看日志等操作。

因为并行计算要做到子任务级别的可视化,并且worker挂了、重启还能支持手动重跑,就需要把task持久化到server端:

image
如上图所示:

  1. server触发jobInstance到某个worker,选中为master。
  2. MapTaskMaster选择某个worker执行root任务,当执行map方法时,会回调MapTaskMaster。
  3. MapTaskMaster收到map方法,会把task持久化到server端。
  4. 同时,MapTaskMaster还有个pull线程,不停拉取INIT状态的task,并派发给其他worker执行。

5. 网格计算原理

网格计算要支持百万级别的task,如果所有任务都往server回写,server肯定扛不住,所以网格计算的存储实际上是分布式在用户自己的机器上的:
image
如上图所示:

  1. server触发jobInstance到某个worker,选中为master。
  2. MapTaskMaster选择某个worker执行root任务,当执行map方法时,会回调MapTaskMaster。
  3. MapTaskMaster收到map方法,会把task持久化到本地h2数据库。
  4. 同时,MapTaskMaster还有个pull线程,不停拉取INIT状态的task,并派发给其他worker执行。

6. 最佳实践

6.1 需求

举个例子:

  1. 读取A表中status=0的数据。
  2. 处理这些数据,插入B表。
  3. 把A表中处理过的数据的修改status=1。
  4. 数据量有4亿+,希望缩短时间。

6.2 反面案例

我们先看下如下代码是否有问题?

public class ScanSingleTableProcessor extends MapJobProcessor {
    private static int pageSize = 1000;

    @Override
    public ProcessResult process(JobContext context) {
        String taskName = context.getTaskName();
        Object task = context.getTask();

        if (WorkerConstants.MAP_TASK_ROOT_NAME.equals(taskName)) {
            int recordCount = queryRecordCount();
            int pageAmount = recordCount / pageSize;//计算分页数量
            for(int i = 0 ; i < pageAmount ; i ++) {
                List<Record> recordList = queryRecord(i);//根据分页查询一页数据
                map(recordList, "record记录");//把子任务分发出去并行处理
            }
            return new ProcessResult(true);//true表示执行成功,false表示失败
        } else if ("record记录".equals(taskName)) {
            //TODO
            return new ProcessResult(true);
        }
        return new ProcessResult(false);
    }
}

如上面的代码所示,在root任务中,会把数据库所有记录读取出来,每一行就是一个Record,然后分发出去,分布式到不同的worker上去执行。逻辑是没有问题的,但是实际上性能非常的差。结合网格计算原理,我们把上面的代码绘制成下面这幅图:
image
如上图所示,root任务一开始会全量的读取A表的数据,然后会全量的存到h2中,pull线程还会全量的从h2读取一次所有的task,还会分发给所有客户端。所以实际上对A表中的数据:

  • 全量读2次
  • 全量写一次
  • 全量传输一次

这个效率是非常低的。

6.3 正面案例

下面给出正面案例的代码:

public class ScanSingleTableJobProcessor extends MapJobProcessor {
    private static final int pageSize = 100;

    static class PageTask {
        private int startId;
        private int endId;
        public PageTask(int startId, int endId) {
             this.startId = startId;
             this.endId = endId;
        }
        public int getStartId() {
              return startId;
        }
        public int getEndId() {
              return endId;
        }
    }

    @Override
    public ProcessResult process(JobContext context) {
        String taskName = context.getTaskName();
        Object task = context.getTask();
        if (taskName.equals(WorkerConstants.MAP_TASK_ROOT_NAME)) {
            System.out.println("start root task");
            Pair<Integer, Integer> idPair = queryMinAndMaxId();
            int minId = idPair.getFirst();
            int maxId = idPair.getSecond();
            List<PageTask> taskList = Lists.newArrayList();
            int step = (int) ((maxId - minId) / pageSize); //计算分页数量
            for (int i = minId; i < maxId; i+=step) {
                taskList.add(new PageTask(i, (i+step > maxId ? maxId : i+step)));
            }
            return map(taskList, "Level1Dispatch");
        } else if (taskName.equals("Level1Dispatch")) {
            PageTask record = (PageTask)task;
            long startId = record.getStartId();
            long endId = record.getEndId();
            //TODO
            return new ProcessResult(true);
        }
        return new ProcessResult(true);
    }

    @Override
    public void postProcess(JobContext context) {
        //TODO
        System.out.println("all tasks is finished.");
    }

    private Pair<Integer, Integer> queryMinAndMaxId() {
        //TODO select min(id),max(id) from xxx
        return null;
    }
}

如上面的代码所示,

  • 每个task不是整行记录的record,而是PageTask,里面就2个字段,startId和endId。
  • root任务,没有全量的读取A表,而是读一下整张表的minId和maxId,然后构造PageTask进行分页。比如task1表示PageTask[1,1000],task2表示PageTask[1001,2000]。每个task处理A表不同的数据。
  • 在下一级task中,如果拿到的是PageTask,再根据id区间去A表处理数据。

根据上面的代码和网格计算原理,得出下面这幅图:
image
如上图所示,

  • A表只需要全量读取一次。
  • 子任务数量比反面案例少了上千、上万倍。
  • 子任务的body非常小,如果recod中有大字段,也少了上千、上万倍。

综上,对A表访问次数少了好几倍,对h2存储压力少了上万倍,不但执行速度可以快很多,还保证不会把自己本地的h2数据库搞挂。

目录
相关文章
|
3月前
|
NoSQL Java 测试技术
字节二面:Spring Boot Redis 可重入分布式锁实现原理?
字节二面:Spring Boot Redis 可重入分布式锁实现原理?
157 1
|
3月前
|
监控 Dubbo Java
深入理解Zookeeper系列-2.Zookeeper基本使用和分布式锁原理
深入理解Zookeeper系列-2.Zookeeper基本使用和分布式锁原理
58 0
|
4月前
|
NoSQL 关系型数据库 MySQL
分布式锁的原理解析与实现工具介绍
分布式锁的原理解析与实现工具介绍
35 1
|
2月前
|
存储 数据采集 监控
SkyWalking全景解析:从原理到实现的分布式追踪之旅
SkyWalking全景解析:从原理到实现的分布式追踪之旅
297 1
|
19天前
|
设计模式 安全 Java
【分布式技术专题】「Tomcat技术专题」 探索Tomcat技术架构设计模式的奥秘(Server和Service组件原理分析)
【分布式技术专题】「Tomcat技术专题」 探索Tomcat技术架构设计模式的奥秘(Server和Service组件原理分析)
23 0
|
3月前
|
存储 安全 JavaScript
【分布式技术专题】「授权认证体系」深度解析OAuth2.0协议的原理和流程框架实现指南(授权流程和模式)
在传统的客户端-服务器身份验证模式中,客户端请求服务器上访问受限的资源(受保护的资源)时,需要使用资源所有者的凭据在服务器上进行身份验证。资源所有者为了给第三方应用提供受限资源的访问权限,需要与第三方共享它的凭据。这就导致一些问题和局限:
368 2
【分布式技术专题】「授权认证体系」深度解析OAuth2.0协议的原理和流程框架实现指南(授权流程和模式)
|
3月前
|
NoSQL 中间件 API
分布式锁【数据库乐观锁实现的分布式锁、Zookeeper分布式锁原理、Redis实现的分布式锁】(三)-全面详解(学习总结---从入门到深化)(下)
分布式锁【数据库乐观锁实现的分布式锁、Zookeeper分布式锁原理、Redis实现的分布式锁】(三)-全面详解(学习总结---从入门到深化)
80 2
|
3月前
|
NoSQL Java API
分布式锁【数据库乐观锁实现的分布式锁、Zookeeper分布式锁原理、Redis实现的分布式锁】(三)-全面详解(学习总结---从入门到深化)(上)
分布式锁【数据库乐观锁实现的分布式锁、Zookeeper分布式锁原理、Redis实现的分布式锁】(三)-全面详解(学习总结---从入门到深化)
71 0
|
19天前
|
缓存 算法 关系型数据库
深度思考:雪花算法snowflake分布式id生成原理详解
雪花算法snowflake是一种优秀的分布式ID生成方案,其优点突出:它能生成全局唯一且递增的ID,确保了数据的一致性和准确性;同时,该算法灵活性强,可自定义各部分bit位,满足不同业务场景的需求;此外,雪花算法生成ID的速度快,效率高,能有效应对高并发场景,是分布式系统中不可或缺的组件。
深度思考:雪花算法snowflake分布式id生成原理详解
|
19天前
|
存储 Java 应用服务中间件
【分布式技术专题】「架构实践于案例分析」盘点互联网应用服务中常用分布式事务(刚性事务和柔性事务)的原理和方案
【分布式技术专题】「架构实践于案例分析」盘点互联网应用服务中常用分布式事务(刚性事务和柔性事务)的原理和方案
42 0

热门文章

最新文章

相关实验场景

更多