自己动手实现分布式任务调度框架(续)(1)

简介: 自己动手实现分布式任务调度框架(续)

  之前写过一篇:自己动手实现分布式任务调度框架本来是用来闲来分享一下自己的思维方式,时至今日发现居然有些人正在使用了,本着对代码负责人的态度,对代码部分已知bug进行了修改,并增加了若干功能,如立即启动,实时停止等功能,新增加的功能会在这一篇做详细的说明。

  提到分布式任务调度,市面上本身已经有一些框架工具可以使用,但是个人觉得功能做的都太丰富,架构都过于复杂,所以才有了我重复造轮子。个人喜欢把复杂的问题简单化,利用有限的资源实现竟可能多的功能。因为有几个朋友问部署方式,这里再次强调下:我的这个服务可以直接打成jar放在自己本地仓库,然后依赖进去,或者直接copy代码过去,当成自己项目的一部分就可以了。也就是说跟随你们自己的项目启动,所以我这里也没有写界面。下面先谈谈怎么基于上次的代码实现任务立即启动吧!

  调度和自己服务整合后部署图抽象成如下:

  用户在前端点击立即请求按钮,通过各种负载均衡软件或者设备,到达某台机器的某个带有本调度框架的服务,然后进行具体的执行,也就是说这个立即启动就是一个最常见最简单的请求,没有过多复杂的问题(比如多节点会不会重复执行这些)。最简单的办法,当用户请求过来直接用一个线程或者线程池执行用户点的那个任务的逻辑代码就行了,当然我这里没有那么粗暴,现有的调度代码资源如下:

package com.rdpaas.task.scheduler;
import com.rdpaas.task.common.Invocation;
import com.rdpaas.task.common.Node;
import com.rdpaas.task.common.NotifyCmd;
import com.rdpaas.task.common.Task;
import com.rdpaas.task.common.TaskDetail;
import com.rdpaas.task.common.TaskStatus;
import com.rdpaas.task.config.EasyJobConfig;
import com.rdpaas.task.repository.NodeRepository;
import com.rdpaas.task.repository.TaskRepository;
import com.rdpaas.task.strategy.Strategy;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/**
 * 任务调度器
 * @author rongdi
 * @date 2019-03-13 21:15
*/
@Component
public class TaskExecutor {
    private static final Logger logger = LoggerFactory.getLogger(TaskExecutor.class);
    @Autowired
    private TaskRepository taskRepository;
    @Autowired
    private NodeRepository nodeRepository;
    @Autowired
    private EasyJobConfig config;
    /**
     * 创建任务到期延时队列
      */
    private DelayQueue<DelayItem<Task>> taskQueue = new DelayQueue<>();
    /**
     * 可以明确知道最多只会运行2个线程,直接使用系统自带工具就可以了
     */
    private ExecutorService bossPool = Executors.newFixedThreadPool(2);
    /**
     * 正在执行的任务的Future
     */
    private Map<Long,Future> doingFutures = new HashMap<>();
    /**
     * 声明工作线程池
     */
    private ThreadPoolExecutor workerPool;
    
    /**
     * 获取任务的策略
     */
    private Strategy strategy;
    @PostConstruct
    public void init() {
        /**
         * 根据配置选择一个节点获取任务的策略
         */
        strategy = Strategy.choose(config.getNodeStrategy());
        /**
         * 自定义线程池,初始线程数量corePoolSize,线程池等待队列大小queueSize,当初始线程都有任务,并且等待队列满后
         * 线程数量会自动扩充最大线程数maxSize,当新扩充的线程空闲60s后自动回收.自定义线程池是因为Executors那几个线程工具
         * 各有各的弊端,不适合生产使用
         */
        workerPool = new ThreadPoolExecutor(config.getCorePoolSize(), config.getMaxPoolSize(), 60, TimeUnit.SECONDS, new ArrayBlockingQueue<>(config.getQueueSize()));
        /**
         * 执行待处理任务加载线程
         */
        bossPool.execute(new Loader());
        /**
         * 执行任务调度线程
         */
        bossPool.execute(new Boss());
    
    }
    class Loader implements Runnable {
        @Override
        public void run() {
            for(;;) {
                try { 
                    /**
                     * 先获取可用的节点列表
                     */
                    List<Node> nodes = nodeRepository.getEnableNodes(config.getHeartBeatSeconds() * 2);
                    if(nodes == null || nodes.isEmpty()) {
                        continue;
                    }
                    /**
                     * 查找还有指定时间(单位秒)才开始的主任务列表
                     */
                    List<Task> tasks = taskRepository.listNotStartedTasks(config.getFetchDuration());
                    if(tasks == null || tasks.isEmpty()) {
                        continue;
                    }
                    for(Task task:tasks) {
                        
                        boolean accept = strategy.accept(nodes, task, config.getNodeId());
                        /**
                         * 不该自己拿就不要抢
                         */
                        if(!accept) {
                            continue;
                        }
                        /**
                         * 先设置成待执行
                         */
                        task.setStatus(TaskStatus.PENDING);
                        task.setNodeId(config.getNodeId());
                        /**
                         * 使用乐观锁尝试更新状态,如果更新成功,其他节点就不会更新成功。如果其它节点也正在查询未完成的
                         * 任务列表和当前这段时间有节点已经更新了这个任务,version必然和查出来时候的version不一样了,这里更新
                         * 必然会返回0了
                         */
                        int n = taskRepository.updateWithVersion(task);
                        Date nextStartTime = task.getNextStartTime();
                        if(n == 0 || nextStartTime == null) {
                            continue;
                        }
                        /**
                         * 封装成延时对象放入延时队列,这里再查一次是因为上面乐观锁已经更新了版本,会导致后面结束任务更新不成功
                         */
                        task = taskRepository.get(task.getId());
                        DelayItem<Task> delayItem = new DelayItem<Task>(nextStartTime.getTime() - new Date().getTime(), task);
                        taskQueue.offer(delayItem);
                        
                    }
                    Thread.sleep(config.getFetchPeriod());
                } catch(Exception e) {
                    logger.error("fetch task list failed,cause by:{}", e);
                }
            }
        }
        
    }
    
    class Boss implements Runnable {
        @Override
        public void run() {
            for (;;) {
                try {
                     /**
                     * 时间到了就可以从延时队列拿出任务对象,然后交给worker线程池去执行
                     */
                    DelayItem<Task> item = taskQueue.take();
                    if(item != null && item.getItem() != null) {
                        Task task = item.getItem();
                        /**
                         * 真正开始执行了设置成执行中
                         */
                        task.setStatus(TaskStatus.DOING);
                        /**
                         * loader线程中已经使用乐观锁控制了,这里没必要了
                         */
                        taskRepository.update(task);
                        /**
                         * 提交到线程池
                         */
                        Future future = workerPool.submit(new Worker(task));
                        /**
                         * 暂存在doingFutures
                         */
                        doingFutures.put(task.getId(),future);
                    }
                     
                } catch (Exception e) {
                    logger.error("fetch task failed,cause by:{}", e);
                }
            }
        }
    }
    class Worker implements Callable<String> {
        private Task task;
        public Worker(Task task) {
            this.task = task;
        }
        @Override
        public String call() {
            logger.info("Begin to execute task:{}",task.getId());
            TaskDetail detail = null;
            try {
                //开始任务
                detail = taskRepository.start(task);
                if(detail == null) return null;
                //执行任务
                task.getInvokor().invoke();
                //完成任务
                finish(task,detail);
                logger.info("finished execute task:{}",task.getId());
                /**
                 * 执行完后删了
                 */
                doingFutures.remove(task.getId());
            } catch (Exception e) {
                logger.error("execute task:{} error,cause by:{}",task.getId(), e);
                try {
                    taskRepository.fail(task,detail,e.getCause().getMessage());
                } catch(Exception e1) {
                    logger.error("fail task:{} error,cause by:{}",task.getId(), e);
                }
            }
            return null;
        }
    }
    /**
     * 完成子任务,如果父任务失败了,子任务不会执行
     * @param task
     * @param detail
     * @throws Exception
     */
    private void finish(Task task,TaskDetail detail) throws Exception {
        //查看是否有子类任务
        List<Task> childTasks = taskRepository.getChilds(task.getId());
        if(childTasks == null || childTasks.isEmpty()) {
            //当没有子任务时完成父任务
            taskRepository.finish(task,detail);
            return;
        } else {
            for (Task childTask : childTasks) {
                //开始任务
                TaskDetail childDetail = null;
                try {
                    //将子任务状态改成执行中
                    childTask.setStatus(TaskStatus.DOING);
                    childTask.setNodeId(config.getNodeId());
                    //开始子任务
                    childDetail = taskRepository.startChild(childTask,detail);
                    //使用乐观锁更新下状态,不然这里可能和恢复线程产生并发问题
                    int n = taskRepository.updateWithVersion(childTask);
                    if (n > 0) {
                        //再从数据库取一下,避免上面update修改后version不同步
                        childTask = taskRepository.get(childTask.getId());
                        //执行子任务
                        childTask.getInvokor().invoke();
                        //完成子任务
                        finish(childTask, childDetail);
                    }
                } catch (Exception e) {
                    logger.error("execute child task error,cause by:{}", e);
                    try {
                        taskRepository.fail(childTask, childDetail, e.getCause().getMessage());
                    } catch (Exception e1) {
                        logger.error("fail child task error,cause by:{}", e);
                    }
                }
            }
            /**
             * 当有子任务时完成子任务后再完成父任务
             */
            taskRepository.finish(task,detail);
        }
    }
    /**
     * 添加任务
     * @param name
     * @param cronExp
     * @param invockor
     * @return
     * @throws Exception
     */
    public long addTask(String name, String cronExp, Invocation invockor) throws Exception {
        Task task = new Task(name,cronExp,invockor);
        return taskRepository.insert(task);
    }
    /**
     * 添加子任务
     * @param pid
     * @param name
     * @param cronExp
     * @param invockor
     * @return
     * @throws Exception
     */
    public long addChildTask(Long pid,String name, String cronExp, Invocation invockor) throws Exception {
        Task task = new Task(name,cronExp,invockor);
        task.setPid(pid);
        return taskRepository.insert(task);
    }
   
}

  上面主要就是三组线程,Loader负责加载将要执行的任务放入本地的任务队列,Boss线程负责取出任务队列的任务,然后分配Worker线程池的一个线程去执行。由上面的代码可以看到如果要立即执行,其实只需要把一个延时为0的任务放入任务队列,等着Boss线程去取然后分配给worker执行就可以实现了,代码如下:

/**
     * 立即执行任务,就是设置一下延时为0加入任务队列就好了,这个可以外部直接调用
     * @param taskId
     * @return
     */
    public boolean startNow(Long taskId) {
        Task task = taskRepository.get(taskId);
        task.setStatus(TaskStatus.DOING);
        taskRepository.update(task);
        DelayItem<Task> delayItem = new DelayItem<Task>(0L, task);
        return taskQueue.offer(delayItem);
    }

  启动不用再多说,下面介绍一下停止任务,根据面向对象的思维,用户要想停止一个任务,最终执行停止任务的就是正在执行任务的那个节点。停止任务有两种情况,第一种任务没有正在运行如何停止,第二种是任务正在运行如何停止。第一种其实直接改变一下任务对象的状态为停止就行了,不必多说。下面主要考虑如何停止正在运行的任务,细心的朋友可能已经发现上面代码和之前那一篇代码有点区别,之前用的Runnble作为线程实现接口,这个用了Callable,其实在java中停止线程池中正在运行的线程最常用的就是直接调用future的cancel方法了,要想获取到这个future对象就需要将以前实现Runnbale改成实现Callable,然后提交到线程池由execute改成submit就可以了,然后每次提交到线程池得到的future对象使用taskId一起保存在一个map中,方便根据taskId随时找到。当然任务执行完后要及时删除这个map里的任务,以免常驻其中导致内存溢出。停止任务的请求流程如下

  图还是原来的图,但是这时候情况不一样了,因为停止任务的时候假如当前正在执行这个任务的节点处于服务1,负载均衡是不知道要去把你引到服务1的,他可能会引入到服务2,那就悲剧了,所以通用的做法就是停止请求过来不管落到哪个节点上,那个节点就往一个公用的mq上发一个带有停止任务业务含义的消息,各个节点订阅这个消息,然后判断都判断任务在不在自己这里执行,如果在就执行停止操作。但是这样势必让我们的调度服务又要依赖一个外部的消息队列服务,就算很方便的就可以引入一个外部的消息队列,但是你真的可以驾驭的了吗,消息丢了咋办,重复发送了咋办,消息服务挂了咋办,网络断了咋办,又引入了一大堆问题,那我是不是又要写n篇文章来分别解决这些问题。往往现实却是就是这么残酷,你解决了一个问题,引入了更多的问题,这就是为什么bug永远改不完的道理了。当然这不是我的风格,我的风格是利用有限的资源做尽可能多的事情(可能是由于我工作的企业都是那种资源贫瘠的,养成了我这种习惯,土豪公司的程序员请绕道,哈哈)。

  简化一下问题:目前的问题就是如何让正在执行任务的节点知道,然后停止正在执行的这个任务,其实就是这个停止通知如何实现。这不免让我想起了12306网站上买票,其实我们作为老百姓多么希望12306可以在有票的时候发个短信通知一下我们,然后我们上去抢,但是现实却是,你要么使用软件一直刷,要么是自己隔一段时间上去瞄一下有没有票。如果把有票了给我们发短信通知定义为异步通知,那么这种我们要隔一段时间自己去瞄一下的方式就是同步轮训。这两种方式都能达到告知的目的,关键的区别在于你到底有没有时间去一直去瞄,不过相比于可以回家,这些时间都是值得的。个人认为软件的设计其实就是一个权衡是否值得的过程。如果约定了不使用外部消息队列这种异步通知的方式,那么我们只能使用同步轮训的方式了。不过正好我们的任务调度本身已经有一个心跳机制,没隔一段时间就去更新一下节点状态,如果我们把用户的停止请求作为命令信息更新到每个节点的上,然后随着心跳获取到这个节点的信息,然后判断这个命令,做相应的处理是不是就可以完美解决这个问题。值得吗?很明显是值得的,我们只是在心跳逻辑上加一个小小的副作用就实现了通知功能了。代码如下

package com.rdpaas.task.common;
/**
 * @author rongdi
 * @date 2019/11/26
*/
public enum NotifyCmd {
    //没有通知,默认状态
    NO_NOTIFY(0),
    //开启任务(Task)
    START_TASK(1),
    //修改任务(Task)
    EDIT_TASK(2),
    //停止任务(Task)
    STOP_TASK(3);
    int id;
    NotifyCmd(int id) {
        this.id = id;
    }
    public int getId() {
        return id;
    }
    public static NotifyCmd valueOf(int id) {
        switch (id) {
            case 1:
                return START_TASK;
            case 2:
                return EDIT_TASK;
            case 3:
                return STOP_TASK;
            default:
                return NO_NOTIFY;
        }
    }
}
package com.rdpaas.task.handles;
import com.rdpaas.task.common.NotifyCmd;
import com.rdpaas.task.utils.SpringContextUtil;
/**
 * @author: rongdi
 * @date:
*/
public interface NotifyHandler<T> {
    static NotifyHandler chooseHandler(NotifyCmd notifyCmd) {
        return SpringContextUtil.getByTypeAndName(NotifyHandler.class,notifyCmd.toString());
    }
    public void update(T t);
}
package com.rdpaas.task.handles;
import com.rdpaas.task.scheduler.TaskExecutor;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
/**
 * @author: rongdi
 * @date:
*/
@Component("STOP_TASK")
public class StopTaskHandler implements NotifyHandler<Long> {
    @Autowired
    private TaskExecutor taskExecutor;
    @Override
    public void update(Long taskId) {
        taskExecutor.stop(taskId);
    }
}
class HeartBeat implements Runnable {
        @Override
        public void run() {
            for(;;) {
                try {
                    /**
                     * 时间到了就可以从延时队列拿出节点对象,然后更新时间和序号,
                     * 最后再新建一个超时时间为心跳时间的节点对象放入延时队列,形成循环的心跳
                     */
                    DelayItem<Node> item = heartBeatQueue.take();
                    if(item != null && item.getItem() != null) {
                        Node node = item.getItem();
                        handHeartBeat(node);
                    }
                    heartBeatQueue.offer(new DelayItem<>(config.getHeartBeatSeconds() * 1000,new Node(config.getNodeId())));
                } catch (Exception e) {
                    logger.error("task heart beat error,cause by:{} ",e);
                }
            }
        }
    }
    /**
     * 处理节点心跳
     * @param node
     */
    private void handHeartBeat(Node node) {
        if(node == null) {
            return;
        }
        /**
         * 先看看数据库是否存在这个节点
         * 如果不存在:先查找下一个序号,然后设置到node对象中,最后插入
         * 如果存在:直接根据nodeId更新当前节点的序号和时间
         */
        Node currNode= nodeRepository.getByNodeId(node.getNodeId());
        if(currNode == null) {
            node.setRownum(nodeRepository.getNextRownum());
            nodeRepository.insert(node);
        } else  {
            nodeRepository.updateHeartBeat(node.getNodeId());
            NotifyCmd cmd = currNode.getNotifyCmd();
            String notifyValue = currNode.getNotifyValue();
            if(cmd != null && cmd != NotifyCmd.NO_NOTIFY) {
                /**
                 * 借助心跳做一下通知的事情,比如及时停止正在执行的任务
                 * 根据指令名称查找Handler
                 */
                NotifyHandler handler = NotifyHandler.chooseHandler(currNode.getNotifyCmd());
                if(handler == null || StringUtils.isEmpty(notifyValue)) {
                    return;
                }
                /**
                 * 执行操作
                 */
                handler.update(Long.valueOf(notifyValue));
            }
            
        }
    }


自己动手实现分布式任务调度框架(续)(2)https://developer.aliyun.com/article/1542828

相关文章
|
27天前
|
Java 数据库
在Java中使用Seata框架实现分布式事务的详细步骤
通过以上步骤,利用 Seata 框架可以实现较为简单的分布式事务处理。在实际应用中,还需要根据具体业务需求进行更详细的配置和处理。同时,要注意处理各种异常情况,以确保分布式事务的正确执行。
|
27天前
|
消息中间件 Java Kafka
在Java中实现分布式事务的常用框架和方法
总之,选择合适的分布式事务框架和方法需要综合考虑业务需求、性能、复杂度等因素。不同的框架和方法都有其特点和适用场景,需要根据具体情况进行评估和选择。同时,随着技术的不断发展,分布式事务的解决方案也在不断更新和完善,以更好地满足业务的需求。你还可以进一步深入研究和了解这些框架和方法,以便在实际应用中更好地实现分布式事务管理。
|
3天前
|
分布式计算 大数据 数据处理
技术评测:MaxCompute MaxFrame——阿里云自研分布式计算框架的Python编程接口
随着大数据和人工智能技术的发展,数据处理的需求日益增长。阿里云推出的MaxCompute MaxFrame(简称“MaxFrame”)是一个专为Python开发者设计的分布式计算框架,它不仅支持Python编程接口,还能直接利用MaxCompute的云原生大数据计算资源和服务。本文将通过一系列最佳实践测评,探讨MaxFrame在分布式Pandas处理以及大语言模型数据处理场景中的表现,并分析其在实际工作中的应用潜力。
17 2
|
26天前
|
存储 Java 关系型数据库
在Spring Boot中整合Seata框架实现分布式事务
可以在 Spring Boot 中成功整合 Seata 框架,实现分布式事务的管理和处理。在实际应用中,还需要根据具体的业务需求和技术架构进行进一步的优化和调整。同时,要注意处理各种可能出现的问题,以保障分布式事务的顺利执行。
46 6
|
26天前
|
数据库
如何在Seata框架中配置分布式事务的隔离级别?
总的来说,配置分布式事务的隔离级别是实现分布式事务管理的重要环节之一,需要认真对待和仔细调整,以满足业务的需求和性能要求。你还可以进一步深入研究和实践 Seata 框架的配置和使用,以更好地应对各种分布式事务场景的挑战。
28 6
|
24天前
|
消息中间件 运维 数据库
Seata框架和其他分布式事务框架有什么区别
Seata框架和其他分布式事务框架有什么区别
23 1
|
1月前
|
机器学习/深度学习 自然语言处理 并行计算
DeepSpeed分布式训练框架深度学习指南
【11月更文挑战第6天】随着深度学习模型规模的日益增大,训练这些模型所需的计算资源和时间成本也随之增加。传统的单机训练方式已难以应对大规模模型的训练需求。
132 3
|
1月前
|
机器学习/深度学习 并行计算 Java
谈谈分布式训练框架DeepSpeed与Megatron
【11月更文挑战第3天】随着深度学习技术的不断发展,大规模模型的训练需求日益增长。为了应对这种需求,分布式训练框架应运而生,其中DeepSpeed和Megatron是两个备受瞩目的框架。本文将深入探讨这两个框架的背景、业务场景、优缺点、主要功能及底层实现逻辑,并提供一个基于Java语言的简单demo例子,帮助读者更好地理解这些技术。
79 2
|
2月前
|
分布式计算 Hadoop
Hadoop-27 ZooKeeper集群 集群配置启动 3台云服务器 myid集群 zoo.cfg多节点配置 分布式协调框架 Leader Follower Observer
Hadoop-27 ZooKeeper集群 集群配置启动 3台云服务器 myid集群 zoo.cfg多节点配置 分布式协调框架 Leader Follower Observer
52 1
|
2月前
|
存储 SQL 消息中间件
Hadoop-26 ZooKeeper集群 3台云服务器 基础概念简介与环境的配置使用 架构组成 分布式协调框架 Leader Follower Observer
Hadoop-26 ZooKeeper集群 3台云服务器 基础概念简介与环境的配置使用 架构组成 分布式协调框架 Leader Follower Observer
52 0
下一篇
DataWorks