自己动手实现分布式任务调度框架(续)(1)

简介: 自己动手实现分布式任务调度框架(续)

  之前写过一篇:自己动手实现分布式任务调度框架本来是用来闲来分享一下自己的思维方式,时至今日发现居然有些人正在使用了,本着对代码负责人的态度,对代码部分已知bug进行了修改,并增加了若干功能,如立即启动,实时停止等功能,新增加的功能会在这一篇做详细的说明。

  提到分布式任务调度,市面上本身已经有一些框架工具可以使用,但是个人觉得功能做的都太丰富,架构都过于复杂,所以才有了我重复造轮子。个人喜欢把复杂的问题简单化,利用有限的资源实现竟可能多的功能。因为有几个朋友问部署方式,这里再次强调下:我的这个服务可以直接打成jar放在自己本地仓库,然后依赖进去,或者直接copy代码过去,当成自己项目的一部分就可以了。也就是说跟随你们自己的项目启动,所以我这里也没有写界面。下面先谈谈怎么基于上次的代码实现任务立即启动吧!

  调度和自己服务整合后部署图抽象成如下:

  用户在前端点击立即请求按钮,通过各种负载均衡软件或者设备,到达某台机器的某个带有本调度框架的服务,然后进行具体的执行,也就是说这个立即启动就是一个最常见最简单的请求,没有过多复杂的问题(比如多节点会不会重复执行这些)。最简单的办法,当用户请求过来直接用一个线程或者线程池执行用户点的那个任务的逻辑代码就行了,当然我这里没有那么粗暴,现有的调度代码资源如下:

package com.rdpaas.task.scheduler;
import com.rdpaas.task.common.Invocation;
import com.rdpaas.task.common.Node;
import com.rdpaas.task.common.NotifyCmd;
import com.rdpaas.task.common.Task;
import com.rdpaas.task.common.TaskDetail;
import com.rdpaas.task.common.TaskStatus;
import com.rdpaas.task.config.EasyJobConfig;
import com.rdpaas.task.repository.NodeRepository;
import com.rdpaas.task.repository.TaskRepository;
import com.rdpaas.task.strategy.Strategy;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/**
 * 任务调度器
 * @author rongdi
 * @date 2019-03-13 21:15
*/
@Component
public class TaskExecutor {
    private static final Logger logger = LoggerFactory.getLogger(TaskExecutor.class);
    @Autowired
    private TaskRepository taskRepository;
    @Autowired
    private NodeRepository nodeRepository;
    @Autowired
    private EasyJobConfig config;
    /**
     * 创建任务到期延时队列
      */
    private DelayQueue<DelayItem<Task>> taskQueue = new DelayQueue<>();
    /**
     * 可以明确知道最多只会运行2个线程,直接使用系统自带工具就可以了
     */
    private ExecutorService bossPool = Executors.newFixedThreadPool(2);
    /**
     * 正在执行的任务的Future
     */
    private Map<Long,Future> doingFutures = new HashMap<>();
    /**
     * 声明工作线程池
     */
    private ThreadPoolExecutor workerPool;
    
    /**
     * 获取任务的策略
     */
    private Strategy strategy;
    @PostConstruct
    public void init() {
        /**
         * 根据配置选择一个节点获取任务的策略
         */
        strategy = Strategy.choose(config.getNodeStrategy());
        /**
         * 自定义线程池,初始线程数量corePoolSize,线程池等待队列大小queueSize,当初始线程都有任务,并且等待队列满后
         * 线程数量会自动扩充最大线程数maxSize,当新扩充的线程空闲60s后自动回收.自定义线程池是因为Executors那几个线程工具
         * 各有各的弊端,不适合生产使用
         */
        workerPool = new ThreadPoolExecutor(config.getCorePoolSize(), config.getMaxPoolSize(), 60, TimeUnit.SECONDS, new ArrayBlockingQueue<>(config.getQueueSize()));
        /**
         * 执行待处理任务加载线程
         */
        bossPool.execute(new Loader());
        /**
         * 执行任务调度线程
         */
        bossPool.execute(new Boss());
    
    }
    class Loader implements Runnable {
        @Override
        public void run() {
            for(;;) {
                try { 
                    /**
                     * 先获取可用的节点列表
                     */
                    List<Node> nodes = nodeRepository.getEnableNodes(config.getHeartBeatSeconds() * 2);
                    if(nodes == null || nodes.isEmpty()) {
                        continue;
                    }
                    /**
                     * 查找还有指定时间(单位秒)才开始的主任务列表
                     */
                    List<Task> tasks = taskRepository.listNotStartedTasks(config.getFetchDuration());
                    if(tasks == null || tasks.isEmpty()) {
                        continue;
                    }
                    for(Task task:tasks) {
                        
                        boolean accept = strategy.accept(nodes, task, config.getNodeId());
                        /**
                         * 不该自己拿就不要抢
                         */
                        if(!accept) {
                            continue;
                        }
                        /**
                         * 先设置成待执行
                         */
                        task.setStatus(TaskStatus.PENDING);
                        task.setNodeId(config.getNodeId());
                        /**
                         * 使用乐观锁尝试更新状态,如果更新成功,其他节点就不会更新成功。如果其它节点也正在查询未完成的
                         * 任务列表和当前这段时间有节点已经更新了这个任务,version必然和查出来时候的version不一样了,这里更新
                         * 必然会返回0了
                         */
                        int n = taskRepository.updateWithVersion(task);
                        Date nextStartTime = task.getNextStartTime();
                        if(n == 0 || nextStartTime == null) {
                            continue;
                        }
                        /**
                         * 封装成延时对象放入延时队列,这里再查一次是因为上面乐观锁已经更新了版本,会导致后面结束任务更新不成功
                         */
                        task = taskRepository.get(task.getId());
                        DelayItem<Task> delayItem = new DelayItem<Task>(nextStartTime.getTime() - new Date().getTime(), task);
                        taskQueue.offer(delayItem);
                        
                    }
                    Thread.sleep(config.getFetchPeriod());
                } catch(Exception e) {
                    logger.error("fetch task list failed,cause by:{}", e);
                }
            }
        }
        
    }
    
    class Boss implements Runnable {
        @Override
        public void run() {
            for (;;) {
                try {
                     /**
                     * 时间到了就可以从延时队列拿出任务对象,然后交给worker线程池去执行
                     */
                    DelayItem<Task> item = taskQueue.take();
                    if(item != null && item.getItem() != null) {
                        Task task = item.getItem();
                        /**
                         * 真正开始执行了设置成执行中
                         */
                        task.setStatus(TaskStatus.DOING);
                        /**
                         * loader线程中已经使用乐观锁控制了,这里没必要了
                         */
                        taskRepository.update(task);
                        /**
                         * 提交到线程池
                         */
                        Future future = workerPool.submit(new Worker(task));
                        /**
                         * 暂存在doingFutures
                         */
                        doingFutures.put(task.getId(),future);
                    }
                     
                } catch (Exception e) {
                    logger.error("fetch task failed,cause by:{}", e);
                }
            }
        }
    }
    class Worker implements Callable<String> {
        private Task task;
        public Worker(Task task) {
            this.task = task;
        }
        @Override
        public String call() {
            logger.info("Begin to execute task:{}",task.getId());
            TaskDetail detail = null;
            try {
                //开始任务
                detail = taskRepository.start(task);
                if(detail == null) return null;
                //执行任务
                task.getInvokor().invoke();
                //完成任务
                finish(task,detail);
                logger.info("finished execute task:{}",task.getId());
                /**
                 * 执行完后删了
                 */
                doingFutures.remove(task.getId());
            } catch (Exception e) {
                logger.error("execute task:{} error,cause by:{}",task.getId(), e);
                try {
                    taskRepository.fail(task,detail,e.getCause().getMessage());
                } catch(Exception e1) {
                    logger.error("fail task:{} error,cause by:{}",task.getId(), e);
                }
            }
            return null;
        }
    }
    /**
     * 完成子任务,如果父任务失败了,子任务不会执行
     * @param task
     * @param detail
     * @throws Exception
     */
    private void finish(Task task,TaskDetail detail) throws Exception {
        //查看是否有子类任务
        List<Task> childTasks = taskRepository.getChilds(task.getId());
        if(childTasks == null || childTasks.isEmpty()) {
            //当没有子任务时完成父任务
            taskRepository.finish(task,detail);
            return;
        } else {
            for (Task childTask : childTasks) {
                //开始任务
                TaskDetail childDetail = null;
                try {
                    //将子任务状态改成执行中
                    childTask.setStatus(TaskStatus.DOING);
                    childTask.setNodeId(config.getNodeId());
                    //开始子任务
                    childDetail = taskRepository.startChild(childTask,detail);
                    //使用乐观锁更新下状态,不然这里可能和恢复线程产生并发问题
                    int n = taskRepository.updateWithVersion(childTask);
                    if (n > 0) {
                        //再从数据库取一下,避免上面update修改后version不同步
                        childTask = taskRepository.get(childTask.getId());
                        //执行子任务
                        childTask.getInvokor().invoke();
                        //完成子任务
                        finish(childTask, childDetail);
                    }
                } catch (Exception e) {
                    logger.error("execute child task error,cause by:{}", e);
                    try {
                        taskRepository.fail(childTask, childDetail, e.getCause().getMessage());
                    } catch (Exception e1) {
                        logger.error("fail child task error,cause by:{}", e);
                    }
                }
            }
            /**
             * 当有子任务时完成子任务后再完成父任务
             */
            taskRepository.finish(task,detail);
        }
    }
    /**
     * 添加任务
     * @param name
     * @param cronExp
     * @param invockor
     * @return
     * @throws Exception
     */
    public long addTask(String name, String cronExp, Invocation invockor) throws Exception {
        Task task = new Task(name,cronExp,invockor);
        return taskRepository.insert(task);
    }
    /**
     * 添加子任务
     * @param pid
     * @param name
     * @param cronExp
     * @param invockor
     * @return
     * @throws Exception
     */
    public long addChildTask(Long pid,String name, String cronExp, Invocation invockor) throws Exception {
        Task task = new Task(name,cronExp,invockor);
        task.setPid(pid);
        return taskRepository.insert(task);
    }
   
}

  上面主要就是三组线程,Loader负责加载将要执行的任务放入本地的任务队列,Boss线程负责取出任务队列的任务,然后分配Worker线程池的一个线程去执行。由上面的代码可以看到如果要立即执行,其实只需要把一个延时为0的任务放入任务队列,等着Boss线程去取然后分配给worker执行就可以实现了,代码如下:

/**
     * 立即执行任务,就是设置一下延时为0加入任务队列就好了,这个可以外部直接调用
     * @param taskId
     * @return
     */
    public boolean startNow(Long taskId) {
        Task task = taskRepository.get(taskId);
        task.setStatus(TaskStatus.DOING);
        taskRepository.update(task);
        DelayItem<Task> delayItem = new DelayItem<Task>(0L, task);
        return taskQueue.offer(delayItem);
    }

  启动不用再多说,下面介绍一下停止任务,根据面向对象的思维,用户要想停止一个任务,最终执行停止任务的就是正在执行任务的那个节点。停止任务有两种情况,第一种任务没有正在运行如何停止,第二种是任务正在运行如何停止。第一种其实直接改变一下任务对象的状态为停止就行了,不必多说。下面主要考虑如何停止正在运行的任务,细心的朋友可能已经发现上面代码和之前那一篇代码有点区别,之前用的Runnble作为线程实现接口,这个用了Callable,其实在java中停止线程池中正在运行的线程最常用的就是直接调用future的cancel方法了,要想获取到这个future对象就需要将以前实现Runnbale改成实现Callable,然后提交到线程池由execute改成submit就可以了,然后每次提交到线程池得到的future对象使用taskId一起保存在一个map中,方便根据taskId随时找到。当然任务执行完后要及时删除这个map里的任务,以免常驻其中导致内存溢出。停止任务的请求流程如下

  图还是原来的图,但是这时候情况不一样了,因为停止任务的时候假如当前正在执行这个任务的节点处于服务1,负载均衡是不知道要去把你引到服务1的,他可能会引入到服务2,那就悲剧了,所以通用的做法就是停止请求过来不管落到哪个节点上,那个节点就往一个公用的mq上发一个带有停止任务业务含义的消息,各个节点订阅这个消息,然后判断都判断任务在不在自己这里执行,如果在就执行停止操作。但是这样势必让我们的调度服务又要依赖一个外部的消息队列服务,就算很方便的就可以引入一个外部的消息队列,但是你真的可以驾驭的了吗,消息丢了咋办,重复发送了咋办,消息服务挂了咋办,网络断了咋办,又引入了一大堆问题,那我是不是又要写n篇文章来分别解决这些问题。往往现实却是就是这么残酷,你解决了一个问题,引入了更多的问题,这就是为什么bug永远改不完的道理了。当然这不是我的风格,我的风格是利用有限的资源做尽可能多的事情(可能是由于我工作的企业都是那种资源贫瘠的,养成了我这种习惯,土豪公司的程序员请绕道,哈哈)。

  简化一下问题:目前的问题就是如何让正在执行任务的节点知道,然后停止正在执行的这个任务,其实就是这个停止通知如何实现。这不免让我想起了12306网站上买票,其实我们作为老百姓多么希望12306可以在有票的时候发个短信通知一下我们,然后我们上去抢,但是现实却是,你要么使用软件一直刷,要么是自己隔一段时间上去瞄一下有没有票。如果把有票了给我们发短信通知定义为异步通知,那么这种我们要隔一段时间自己去瞄一下的方式就是同步轮训。这两种方式都能达到告知的目的,关键的区别在于你到底有没有时间去一直去瞄,不过相比于可以回家,这些时间都是值得的。个人认为软件的设计其实就是一个权衡是否值得的过程。如果约定了不使用外部消息队列这种异步通知的方式,那么我们只能使用同步轮训的方式了。不过正好我们的任务调度本身已经有一个心跳机制,没隔一段时间就去更新一下节点状态,如果我们把用户的停止请求作为命令信息更新到每个节点的上,然后随着心跳获取到这个节点的信息,然后判断这个命令,做相应的处理是不是就可以完美解决这个问题。值得吗?很明显是值得的,我们只是在心跳逻辑上加一个小小的副作用就实现了通知功能了。代码如下

package com.rdpaas.task.common;
/**
 * @author rongdi
 * @date 2019/11/26
*/
public enum NotifyCmd {
    //没有通知,默认状态
    NO_NOTIFY(0),
    //开启任务(Task)
    START_TASK(1),
    //修改任务(Task)
    EDIT_TASK(2),
    //停止任务(Task)
    STOP_TASK(3);
    int id;
    NotifyCmd(int id) {
        this.id = id;
    }
    public int getId() {
        return id;
    }
    public static NotifyCmd valueOf(int id) {
        switch (id) {
            case 1:
                return START_TASK;
            case 2:
                return EDIT_TASK;
            case 3:
                return STOP_TASK;
            default:
                return NO_NOTIFY;
        }
    }
}
package com.rdpaas.task.handles;
import com.rdpaas.task.common.NotifyCmd;
import com.rdpaas.task.utils.SpringContextUtil;
/**
 * @author: rongdi
 * @date:
*/
public interface NotifyHandler<T> {
    static NotifyHandler chooseHandler(NotifyCmd notifyCmd) {
        return SpringContextUtil.getByTypeAndName(NotifyHandler.class,notifyCmd.toString());
    }
    public void update(T t);
}
package com.rdpaas.task.handles;
import com.rdpaas.task.scheduler.TaskExecutor;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
/**
 * @author: rongdi
 * @date:
*/
@Component("STOP_TASK")
public class StopTaskHandler implements NotifyHandler<Long> {
    @Autowired
    private TaskExecutor taskExecutor;
    @Override
    public void update(Long taskId) {
        taskExecutor.stop(taskId);
    }
}
class HeartBeat implements Runnable {
        @Override
        public void run() {
            for(;;) {
                try {
                    /**
                     * 时间到了就可以从延时队列拿出节点对象,然后更新时间和序号,
                     * 最后再新建一个超时时间为心跳时间的节点对象放入延时队列,形成循环的心跳
                     */
                    DelayItem<Node> item = heartBeatQueue.take();
                    if(item != null && item.getItem() != null) {
                        Node node = item.getItem();
                        handHeartBeat(node);
                    }
                    heartBeatQueue.offer(new DelayItem<>(config.getHeartBeatSeconds() * 1000,new Node(config.getNodeId())));
                } catch (Exception e) {
                    logger.error("task heart beat error,cause by:{} ",e);
                }
            }
        }
    }
    /**
     * 处理节点心跳
     * @param node
     */
    private void handHeartBeat(Node node) {
        if(node == null) {
            return;
        }
        /**
         * 先看看数据库是否存在这个节点
         * 如果不存在:先查找下一个序号,然后设置到node对象中,最后插入
         * 如果存在:直接根据nodeId更新当前节点的序号和时间
         */
        Node currNode= nodeRepository.getByNodeId(node.getNodeId());
        if(currNode == null) {
            node.setRownum(nodeRepository.getNextRownum());
            nodeRepository.insert(node);
        } else  {
            nodeRepository.updateHeartBeat(node.getNodeId());
            NotifyCmd cmd = currNode.getNotifyCmd();
            String notifyValue = currNode.getNotifyValue();
            if(cmd != null && cmd != NotifyCmd.NO_NOTIFY) {
                /**
                 * 借助心跳做一下通知的事情,比如及时停止正在执行的任务
                 * 根据指令名称查找Handler
                 */
                NotifyHandler handler = NotifyHandler.chooseHandler(currNode.getNotifyCmd());
                if(handler == null || StringUtils.isEmpty(notifyValue)) {
                    return;
                }
                /**
                 * 执行操作
                 */
                handler.update(Long.valueOf(notifyValue));
            }
            
        }
    }


自己动手实现分布式任务调度框架(续)(2)https://developer.aliyun.com/article/1542828

相关文章
|
1天前
|
数据采集 分布式计算 MaxCompute
MaxCompute 分布式计算框架 MaxFrame 服务正式商业化公告
MaxCompute 分布式计算框架 MaxFrame 服务于北京时间2024年09月27日正式商业化!
14 3
|
3天前
|
负载均衡 监控 Dubbo
分布式框架-dubbo
分布式框架-dubbo
|
18天前
|
运维 NoSQL Java
SpringBoot接入轻量级分布式日志框架GrayLog技术分享
在当今的软件开发环境中,日志管理扮演着至关重要的角色,尤其是在微服务架构下,分布式日志的统一收集、分析和展示成为了开发者和运维人员必须面对的问题。GrayLog作为一个轻量级的分布式日志框架,以其简洁、高效和易部署的特性,逐渐受到广大开发者的青睐。本文将详细介绍如何在SpringBoot项目中接入GrayLog,以实现日志的集中管理和分析。
94 1
|
3天前
|
XML 负载均衡 监控
分布式-dubbo-简易版的RPC框架
分布式-dubbo-简易版的RPC框架
|
30天前
|
数据采集 分布式计算 并行计算
Dask与Pandas:无缝迁移至分布式数据框架
【8月更文第29天】Pandas 是 Python 社区中最受欢迎的数据分析库之一,它提供了高效且易于使用的数据结构,如 DataFrame 和 Series,以及大量的数据分析功能。然而,随着数据集规模的增大,单机上的 Pandas 开始显现出性能瓶颈。这时,Dask 就成为了一个很好的解决方案,它能够利用多核 CPU 和多台机器进行分布式计算,从而有效地处理大规模数据集。
59 1
|
25天前
|
分布式计算 资源调度 Hadoop
在YARN集群上运行部署MapReduce分布式计算框架
主要介绍了如何在YARN集群上配置和运行MapReduce分布式计算框架,包括准备数据、运行MapReduce任务、查看任务日志,并启动HistoryServer服务以便于日志查看。
33 0
|
27天前
|
缓存 分布式计算 Java
详细解读MapReduce框架中的分布式缓存
【8月更文挑战第31天】
17 0
|
1月前
|
机器学习/深度学习 编译器 PyTorch
自研分布式训练框架EPL问题之吸引社区参与共建如何解决
自研分布式训练框架EPL问题之吸引社区参与共建如何解决
|
1月前
|
并行计算 算法 调度
自研分布式训练框架EPL问题之提高GPU利用率如何解决
自研分布式训练框架EPL问题之提高GPU利用率如何解决
|
1月前
|
算法 异构计算
自研分布式训练框架EPL问题之帮助加速Bert Large模型的训练如何解决
自研分布式训练框架EPL问题之帮助加速Bert Large模型的训练如何解决

热门文章

最新文章