设计一个简易版本的分布式任务调度系统

简介: 设计一个简易版本的分布式任务调度系统

简易版分布式任务调度实现


需求分析


事实上,市面上有很多分布式任务调度框架,比如大名鼎鼎的xxl-job,以及各个大厂自研的分布式任务调度框架等,但是说到自研,为什么自研呢?


  • 定制化需求:因为可能有特定的业务需求,需要定制化的分布式任务调度框架来满足自身的业务场景和特定需求。
  • 技术积累:可能在分布式系统和任务调度领域有丰富的技术积累和经验,他们希望通过自研框架来进一步提升技术实力和竞争力。
  • 控制权:自研框架可以让大厂更好地掌控自身的技术发展和业务发展,降低对外部框架的依赖和风险。


针对上述一些特点,那么了解一个框架,或者说自研一个框架,需要怎么做?自研一个,自己做一个简易版,只实现核心功能即可!!!


那么对于实现分布式任务调度来说,除了实现核心功能外,还需要轻便型,最好是,简单配置后就能立马使用,要多简单就多简单,所以简便性也是设计简易版分布式任务调度的核心思路。


设计思路


前面说到了,想要简便性,那么该怎么办?是都在配置文件中配置好?还是怎么样?


其实为了达到简便性,我们可以使用注解的方式,也就是说,我们把想要定时控制的方法上面加个注解,然后它就可以定时执行了,这样最简便了。当了解到了这一点,接下来开始介绍核心架构。

一个容易理解的设计思路一般都是先从图解开始的。


首先扫描其中所有带有 某种注解 的方法,将其注册到注册中心(Zk)中,然后我们的管理后台扫描这些任务在后台页面中显示,最终在利用Zk中watcher的特性,监听某个节点的话,通过其变化,动态的控制任务的启动,修改配置参数等功能。


实现原理


自定义注解


/**
 这段代码定义了一个自定义注解 DcsScheduled,它可以用来标记方法,并指定该方法作为一个 Dcs 调度任务。
 */
@Retention(RetentionPolicy.RUNTIME) // 指定该注解在运行时保留,因此可以通过反射来访问该注解的信息。
@Target(ElementType.METHOD) // 指定该注解只能应用在方法上。
public @interface DcsScheduled {
    String desc() default "缺省"; // 用于描述调度任务的说明,默认取值为"缺省"。
    String cron() default ""; // 指定调度任务的 cron 表达式,用于设置任务的执行时间规则。
    boolean autoStartup() default true; // 指定是否自动启动调度任务,默认为 true。
}
/**
 这段代码定义了一个自定义注解 EnableDcsScheduling,它可以用来在Spring Boot应用中启用Dcs调度功能。
 */
@Target({ElementType.TYPE}) // 指定该注解只能应用在类上。
@Retention(RetentionPolicy.RUNTIME) // 指定该注解在运行时保留,因此可以通过反射来访问该注解的信息。
@Import({DcsSchedulingConfiguration.class}) // 指定在应用中导入 DcsSchedulingConfiguration 类的配置。
@ImportAutoConfiguration({SchedulingConfig.class, CronTaskRegister.class, DoJoinPoint.class})
//指定在应用中自动导入 SchedulingConfig、CronTaskRegister 和 DoJoinPoint 类的配置。
@ComponentScan("cn.nhs.*") // 指定扫描并加载 cn.nhs 包及其子包下的所有组件。
public @interface EnableDcsScheduling {
}


初始化服务


容器上下文初始化


// 获取上下文将其注入全局上下文中
    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        Constants.Global.applicationContext = applicationContext;
    }


扫描自定义注解


/**
     postProcessAfterInitialization 方法是在 Spring 容器实例化 Bean 并完成初始化后立即调用的。
     具体来说,它是在 Bean 初始化完成之后、即将返回给调用者之前被调用的。
     相当于Spring生命周期中的钩子函数
     */
    @Override
    public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
        // 获取到对应的bean,如果之前已经存在在 set集合中了,相当于被处理过了,那么就直接从set集合中返回。
        Class<?> targetClass = AopProxyUtils.ultimateTargetClass(bean);
        if (this.nonAnnotatedClasses.contains(targetClass)) return bean;
        // 遍历bean中所有的方法
        Method[] methods = ReflectionUtils.getAllDeclaredMethods(bean.getClass());
        for (Method method : methods) {
            // 去找使用了 @DcsScheduled 注解的方法
            DcsScheduled dcsScheduled = AnnotationUtils.findAnnotation(method, DcsScheduled.class);
            if (null == dcsScheduled || 0 == method.getDeclaredAnnotations().length) continue;
            // 当指定的键 beanName 在 Map 中不存在时,计算一个新的值并将其插入到 Map 中。如果指定的键存在,则直接返回对应的值。
            List<ExecOrder> execOrderList = Constants.execOrderMap.computeIfAbsent(beanName, k -> new ArrayList<>());
            ExecOrder execOrder = new ExecOrder();
            execOrder.setBean(bean);
            execOrder.setBeanName(beanName);
            execOrder.setMethodName(method.getName());
            execOrder.setDesc(dcsScheduled.desc());
            execOrder.setCron(dcsScheduled.cron());
            execOrder.setAutoStartup(dcsScheduled.autoStartup());
            execOrderList.add(execOrder);
            this.nonAnnotatedClasses.add(targetClass);
        }
        return bean;
    }


监听容器刷新事件


/**
     实现 ApplicationListener<ContextRefreshedEvent> 接口可以监听 Spring 容器的刷新事件。
     当 Spring 容器启动或刷新时,会触发 ContextRefreshedEvent 事件,从而调用 onApplicationEvent 方法。
     在 onApplicationEvent 方法中,可以编写自己的逻辑来处理容器刷新事件。
     */
    @Override
    public void onApplicationEvent(ContextRefreshedEvent contextRefreshedEvent) {
        try {
            ApplicationContext applicationContext = contextRefreshedEvent.getApplicationContext();
            //1. 初始化配置
            init_config(applicationContext);
            //2. 初始化服务
            init_server(applicationContext);
            //3. 启动任务
            init_task(applicationContext);
            //4. 挂载节点
            init_node();
            //5. 心跳监听
            HeartbeatService.getInstance().startFlushScheduleStatus();
            logger.info("schedule init config、server、task、node、heart done!");
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

初始化配置

//1. 初始化配置
    private void init_config(ApplicationContext applicationContext) {
        try {
            StarterServiceProperties properties = applicationContext.getBean("nhs-schedule-starterAutoConfig", StarterAutoConfig.class).getProperties();
            Constants.Global.zkAddress = properties.getZkAddress();
            Constants.Global.schedulerServerId = properties.getSchedulerServerId();
            Constants.Global.schedulerServerName = properties.getSchedulerServerName();
            InetAddress id = InetAddress.getLocalHost();
            Constants.Global.ip = id.getHostAddress();
        } catch (Exception e) {
            logger.error("middleware schedule init config error!", e);
            throw new RuntimeException(e);
        }
    }

初始化服务

//2. 初始化服务
    private void init_server(ApplicationContext applicationContext) {
        try {
            //获取zk连接
            CuratorFramework client = ZkCuratorServer.getClient(Constants.Global.zkAddress);
            //节点组装
            // /cn/nhs/schedule/server/schedule-spring-boot-starter-test
            path_root_server = StrUtil.joinStr(path_root, LINE, "server", LINE, schedulerServerId);
            // /cn/nhs/schedule/server/schedule-spring-boot-starter-test/ip/本机ip地址
            path_root_server_ip = StrUtil.joinStr(path_root_server, LINE, "ip", LINE, Constants.Global.ip);
            //创建节点&递归删除本服务IP下的旧内容
            ZkCuratorServer.deletingChildrenIfNeeded(client, path_root_server_ip);
            ZkCuratorServer.createNode(client, path_root_server_ip);
            ZkCuratorServer.setData(client, path_root_server, schedulerServerName);
            //添加节点&监听
            //  /cn/nhs/schedule/exec
            ZkCuratorServer.createNodeSimple(client, Constants.Global.path_root_exec);
            ZkCuratorServer.addTreeCacheListener(applicationContext, client, Constants.Global.path_root_exec);
        } catch (Exception e) {
            logger.error("schedule init server error!", e);
            throw new RuntimeException(e);
        }
    }

启动服务

//3. 启动任务
    private void init_task(ApplicationContext applicationContext) {
        CronTaskRegister cronTaskRegistrar = applicationContext.getBean("nhs-schedule-cronTaskRegister", CronTaskRegister.class);
        Set<String> beanNames = Constants.execOrderMap.keySet();
        for (String beanName : beanNames) {
            List<ExecOrder> execOrderList = Constants.execOrderMap.get(beanName);
            for (ExecOrder execOrder : execOrderList) {
                if (!execOrder.getAutoStartup()) continue;
                SchedulingRunnable task = new SchedulingRunnable(execOrder.getBean(), execOrder.getBeanName(), execOrder.getMethodName());
                cronTaskRegistrar.addCronTask(task, execOrder.getCron());
            }
        }
    }

挂载节点

private void init_node() throws Exception {
        Set<String> beanNames = Constants.execOrderMap.keySet();
        for (String beanName : beanNames) {
            List<ExecOrder> execOrderList = Constants.execOrderMap.get(beanName);
            for (ExecOrder execOrder : execOrderList) {
                String path_root_server_ip_clazz = StrUtil.joinStr(path_root_server_ip, LINE, "clazz", LINE, execOrder.getBeanName());
                String path_root_server_ip_clazz_method = StrUtil.joinStr(path_root_server_ip_clazz, LINE, "method", LINE, execOrder.getMethodName());
                String path_root_server_ip_clazz_method_status = StrUtil.joinStr(path_root_server_ip_clazz, LINE, "method", LINE, execOrder.getMethodName(), "/status");
                //添加节点
                ZkCuratorServer.createNodeSimple(client, path_root_server_ip_clazz);
                ZkCuratorServer.createNodeSimple(client, path_root_server_ip_clazz_method);
                ZkCuratorServer.createNodeSimple(client, path_root_server_ip_clazz_method_status);
                //添加节点数据[临时]
                ZkCuratorServer.appendPersistentData(client, path_root_server_ip_clazz_method + "/value", JSON.toJSONString(execOrder));
                //添加节点数据[永久]
                ZkCuratorServer.setData(client, path_root_server_ip_clazz_method_status, execOrder.getAutoStartup() ? "1" : "0");
            }
        }
    }


节点监听


//所有子节点监听
    public static void addTreeCacheListener(final ApplicationContext applicationContext, final CuratorFramework client, String path) throws Exception {
        /**
         具体而言,TreeCache是ZooKeeper的一个监听器,可以监控指定节点及其子节点的变化。
         通过创建TreeCache实例并启动它,可以在ZooKeeper中的指定节点上设置监听器,以便在节点发生变化时触发相应的事件。
         启动TreeCache后,它会从指定节点开始递归地缓存其下所有的子节点和数据,并且持续监控这些节点的状态变化。
         */
        TreeCache treeCache = new TreeCache(client, path);
        treeCache.start();
        // 为 treeCache 添加一个监听器,当节点发生变化时,会调用对应的回调函数进行处理。
        treeCache.getListenable().addListener((curatorFramework, event) -> {
            // 这段代码的作用是从 ZooKeeper 的监听事件中解析出一个 Instruct 对象。
            /*
            具体来说,代码首先判断事件中是否包含有效数据,如果没有则直接返回。然后将事件中的数据转换为字节数组,
            再将字节数组转换为字符串,并根据一些条件进行判断,确保该字符串是一个合法的 JSON 格式。
            如果判断失败,则直接返回。若判断成功,则利用 JSON.parseObject() 方法将该 JSON
            字符串解析成一个 Instruct 对象,并返回该对象。
            */
            if (null == event.getData()) return;
            byte[] eventData = event.getData().getData();
            if (null == eventData || eventData.length < 1) return;
            String json = new String(eventData, Constants.Global.CHARSET_NAME);
            if ("".equals(json) || json.indexOf("{") != 0 || json.lastIndexOf("}") + 1 != json.length()) return;
            Instruct instruct = JSON.parseObject(new String(event.getData().getData(), Constants.Global.CHARSET_NAME), Instruct.class);
            // 在回调函数中,判断事件的类型,如果是节点新增或更新,则根据节点中存储的信息以及一些条件进行相应的业务逻辑处理。
            switch (event.getType()) {
                case NODE_ADDED:
                case NODE_UPDATED:
                    // 如果当前本机的ip 与 schedulerServerId 都与 回调返回的相等。
                    if (Constants.Global.ip.equals(instruct.getIp()) &&
                            Constants.Global.schedulerServerId.equals(instruct.getSchedulerServerId())) {
                        //获取对象
                        CronTaskRegister cronTaskRegistrar = applicationContext.getBean("nhs-schedule-cronTaskRegister", CronTaskRegister.class);
                        boolean isExist = applicationContext.containsBean(instruct.getBeanName());
                        if (!isExist) return;
                        Object scheduleBean = applicationContext.getBean(instruct.getBeanName());
                        // /cn/nhs/schedule/server/schedule-spring-boot-starter-test/ip/机器ip/clazz/类对象名称/method/方法名称/status
                        String path_root_server_ip_clazz_method_status = StrUtil.joinStr(path_root, Constants.Global.LINE, "server", Constants.Global.LINE, instruct.getSchedulerServerId(), Constants.Global.LINE, "ip", LINE, instruct.getIp(), LINE, "clazz", LINE, instruct.getBeanName(), LINE, "method", LINE, instruct.getMethodName(), "/status");
                        //执行命令 0关闭、1启动、2更新
                        Integer status = instruct.getStatus();
                        switch (status) {
                            case 0: // 关闭
                                cronTaskRegistrar.removeCronTask(instruct.getBeanName() + "_" + instruct.getMethodName());
                                // 重新将状态设置回去
                                setData(client, path_root_server_ip_clazz_method_status, "0");
                                logger.info("schedule task stop {} {}", instruct.getBeanName(), instruct.getMethodName());
                                break;
                            case 1: // 启动
                                cronTaskRegistrar.addCronTask(new SchedulingRunnable(scheduleBean, instruct.getBeanName(), instruct.getMethodName()), instruct.getCron());
                                setData(client, path_root_server_ip_clazz_method_status, "1");
                                logger.info("schedule task start {} {}", instruct.getBeanName(), instruct.getMethodName());
                                break;
                            case 2: // 更新
                                cronTaskRegistrar.removeCronTask(instruct.getBeanName() + "_" + instruct.getMethodName());
                                cronTaskRegistrar.addCronTask(new SchedulingRunnable(scheduleBean, instruct.getBeanName(), instruct.getMethodName()), instruct.getCron());
                                setData(client, path_root_server_ip_clazz_method_status, "1");
                                logger.info("schedule task refresh {} {}", instruct.getBeanName(), instruct.getMethodName());
                                break;
                        }
                    }
                    break;
                case NODE_REMOVED:
                    break;
                default:
                    break;
            }
        });
    }


并行任务执行


// 添加任务 & 启动任务
    public void addCronTask(SchedulingRunnable task, String cronExpression) {
        // 首先判断是否已经存在这个任务了,如果存在,那么先移除这个任务
        if (null != Constants.scheduledTasks.get(task.taskId())) {
            removeCronTask(task.taskId());
        }
        // 然后再启动
        CronTask cronTask = new CronTask(task, cronExpression);
        Constants.scheduledTasks.put(task.taskId(), scheduleCronTask(cronTask));
    }
    private ScheduledTask scheduleCronTask(CronTask cronTask) {
        ScheduledTask scheduledTask = new ScheduledTask();
        // 线程池去执行任务,然后使用 scheduledTask.future 同步接收
        scheduledTask.future = this.taskScheduler.schedule(cronTask.getRunnable(), cronTask.getTrigger());
        return scheduledTask;
    }
    // 移除任务
    public void removeCronTask(String taskId) {
        ScheduledTask scheduledTask = Constants.scheduledTasks.remove(taskId);
        if (scheduledTask == null) return;
        // 其内部调用 ScheduledFuture 的 cancel 方法
        scheduledTask.cancel();
    }


可扩展自定义AOP


@Aspect
@Component("nhs-schedule")
public class DoJoinPoint {
    private Logger logger = LoggerFactory.getLogger(DoJoinPoint.class);
    @Pointcut("@annotation(cn.nhs.schedule.annotation.DcsScheduled)")
    public void aopPoint() {
    }
    // 定义了一个环绕通知(@Around("aopPoint()")),在目标方法执行前后进行拦截和处理
    // 在doRouter方法中,获取目标方法的执行时间,并在执行结束后记录日志
    @Around("aopPoint()")
    public Object doRouter(ProceedingJoinPoint jp) throws Throwable {
        long begin = System.currentTimeMillis();
        Method method = getMethod(jp);
        try {
            return jp.proceed();
        } finally {
            long end = System.currentTimeMillis();
            logger.info("\nschedule method:{}.{} take time(m):{}", jp.getTarget().getClass().getSimpleName(), method.getName(), (end - begin));
        }
    }
    // getMethod方法用于获取目标方法的Method对象
    private Method getMethod(JoinPoint jp) throws NoSuchMethodException {
        Signature sig = jp.getSignature();
        MethodSignature methodSignature = (MethodSignature) sig;
        return getClass(jp).getMethod(methodSignature.getName(), methodSignature.getParameterTypes());
    }
    // getClass方法用于获取目标对象的Class对象
    private Class<? extends Object> getClass(JoinPoint jp) throws NoSuchMethodException {
        return jp.getTarget().getClass();
    }
}

目前这里的功能并没有扩展,基本只是打印执行耗时,如果需要监听任务执行的详细信息,可以在这里控制。


最终效果


测试


@SpringBootApplication
@EnableDcsScheduling
public class ApiTestApplication {
    public static void main(String[] args) {
        SpringApplication.run(ApiTestApplication.class, args);
    }
}
@Component("demoTaskOne")
public class DemoTaskOne {
    @DcsScheduled(cron = "0/3 * * * * *", desc = "01定时任务执行测试:taskMethod01", autoStartup = false)
    public void taskMethod01() {
        System.out.println("测试定时任务1");
    }
    @DcsScheduled(cron = "0/3 * * * * *", desc = "01定时任务执行测试:taskMethod02", autoStartup = false)
    public void taskMethod02() {
        System.out.println("测试定时任务2");
    }
}


管理后台


@SpringBootApplication
public class ImcApplication extends SpringBootServletInitializer {
    @Override
    protected SpringApplicationBuilder configure(SpringApplicationBuilder builder) {
        return builder.sources(ImcApplication.class);
    }
    public static void main(String[] args) {
        SpringApplication.run(ImcApplication.class, args);
    }
}


效果


当我们启动时

可以看到,我们启动的任务间隔是3s一次

在测试端日志显示符合我们的规律。


同时也可以做到不修改代码的情况下进行修改计划时间。

也可以实现不关闭服务的前提下关闭任务


总结


其实核心在于zk的watcher进行监听指定节点的变化,而通过管理后台的每次启动或者暂停,都将要改变节点的信息赋值到正在监听的节点,然后正在监听的节点发现指定节点发生变化,进行回调然后执行后续的全部动作,这也就是简易版本的分布式任务调度框架的时间了。


项目地址


schedule-springboot-starter-main: 实现分布式任务调度中间件,能够动态的开启、关闭任务,并且可以动态的修改参数 (gitee.com)

schedule-springboot-starter-test:实现分布式任务调度中间件的测试 (gitee.com)

schedule-springboot-controller:实现分布式任务调度中间件的后台管理 (gitee.com)

目录
相关文章
|
6天前
|
存储 运维 负载均衡
构建高可用性GraphRAG系统:分布式部署与容错机制
【10月更文挑战第28天】作为一名数据科学家和系统架构师,我在构建和维护大规模分布式系统方面有着丰富的经验。最近,我负责了一个基于GraphRAG(Graph Retrieval-Augmented Generation)模型的项目,该模型用于构建一个高可用性的问答系统。在这个过程中,我深刻体会到分布式部署和容错机制的重要性。本文将详细介绍如何在生产环境中构建一个高可用性的GraphRAG系统,包括分布式部署方案、负载均衡、故障检测与恢复机制等方面的内容。
48 4
构建高可用性GraphRAG系统:分布式部署与容错机制
|
25天前
|
消息中间件 中间件 数据库
NServiceBus:打造企业级服务总线的利器——深度解析这一面向消息中间件如何革新分布式应用开发与提升系统可靠性
【10月更文挑战第9天】NServiceBus 是一个面向消息的中间件,专为构建分布式应用程序设计,特别适用于企业级服务总线(ESB)。它通过消息队列实现服务间的解耦,提高系统的可扩展性和容错性。在 .NET 生态中,NServiceBus 提供了强大的功能,支持多种传输方式如 RabbitMQ 和 Azure Service Bus。通过异步消息传递模式,各组件可以独立运作,即使某部分出现故障也不会影响整体系统。 示例代码展示了如何使用 NServiceBus 发送和接收消息,简化了系统的设计和维护。
41 3
|
1月前
|
消息中间件 存储 监控
消息队列系统中的确认机制在分布式系统中如何实现
消息队列系统中的确认机制在分布式系统中如何实现
|
1月前
|
消息中间件 存储 监控
【10月更文挑战第2天】消息队列系统中的确认机制在分布式系统中如何实现
【10月更文挑战第2天】消息队列系统中的确认机制在分布式系统中如何实现
|
1月前
|
存储 开发框架 .NET
C#语言如何搭建分布式文件存储系统
C#语言如何搭建分布式文件存储系统
63 2
|
21天前
|
消息中间件 存储 监控
消息队列系统中的确认机制在分布式系统中如何实现?
消息队列系统中的确认机制在分布式系统中如何实现?
|
2月前
|
存储 块存储
ceph分布式存储系统常见术语篇
关于Ceph分布式存储系统的常见术语解释和概述。
95 1
ceph分布式存储系统常见术语篇
|
1月前
|
存储 分布式计算 监控
C# 创建一个分布式文件存储系统需要怎么设计??
C# 创建一个分布式文件存储系统需要怎么设计??
29 0
|
3月前
|
消息中间件 存储 监控
消息队列系统中的确认机制在分布式系统中如何实现?
消息队列系统中的确认机制在分布式系统中如何实现?
|
3月前
|
机器学习/深度学习 分布式计算 PyTorch
构建可扩展的深度学习系统:PyTorch 与分布式计算
【8月更文第29天】随着数据量和模型复杂度的增加,单个GPU或CPU已无法满足大规模深度学习模型的训练需求。分布式计算提供了一种解决方案,能够有效地利用多台机器上的多个GPU进行并行训练,显著加快训练速度。本文将探讨如何使用PyTorch框架实现深度学习模型的分布式训练,并通过一个具体的示例展示整个过程。
138 0

热门文章

最新文章