xxl-job的原理(2)—调度中心管理注册信息

简介: xxl-job的原理(2)—调度中心管理注册信息

一、调度中心管理注册信息

1.JobApiController

执行器调用调度中心的url来实现注册、下线、回调等操作;其主要的实现类是JobApiController,调用/api/registry接口注册执行器信息,调用/api/registryRemove接口下线执行器信息,调用/api/callback接口执行回调操作。

@Controller
@RequestMapping("/api")
public class JobApiController {
   
   

    @Resource
    private AdminBiz adminBiz;

    /**
     * api
     *
     * @param uri
     * @param data
     * @return
     */
    @RequestMapping("/{uri}")
    @ResponseBody
    @PermissionLimit(limit=false)
    public ReturnT<String> api(HttpServletRequest request, @PathVariable("uri") String uri, @RequestBody(required = false) String data) {
   
   

        // valid
        if (!"POST".equalsIgnoreCase(request.getMethod())) {
   
   
            return new ReturnT<String>(ReturnT.FAIL_CODE, "invalid request, HttpMethod not support.");
        }
        if (uri==null || uri.trim().length()==0) {
   
   
            return new ReturnT<String>(ReturnT.FAIL_CODE, "invalid request, uri-mapping empty.");
        }
        if (XxlJobAdminConfig.getAdminConfig().getAccessToken()!=null
                && XxlJobAdminConfig.getAdminConfig().getAccessToken().trim().length()>0
                && !XxlJobAdminConfig.getAdminConfig().getAccessToken().equals(request.getHeader(XxlJobRemotingUtil.XXL_JOB_ACCESS_TOKEN))) {
   
   
            return new ReturnT<String>(ReturnT.FAIL_CODE, "The access token is wrong.");
        }

        // services mapping
        if ("callback".equals(uri)) {
   
   
            List<HandleCallbackParam> callbackParamList = GsonTool.fromJson(data, List.class, HandleCallbackParam.class);
            return adminBiz.callback(callbackParamList);
        } else if ("registry".equals(uri)) {
   
   
            RegistryParam registryParam = GsonTool.fromJson(data, RegistryParam.class);
            return adminBiz.registry(registryParam);
        } else if ("registryRemove".equals(uri)) {
   
   
            RegistryParam registryParam = GsonTool.fromJson(data, RegistryParam.class);
            return adminBiz.registryRemove(registryParam);
        } else {
   
   
            return new ReturnT<String>(ReturnT.FAIL_CODE, "invalid request, uri-mapping("+ uri +") not found.");
        }

    }

}

2.AdminBizImpl

执行adminBiz.registry(registryParam)是调用实现类AdminBizImpl,在实现类中调用registry(registryParam)来实现。

@Service
public class AdminBizImpl implements AdminBiz {
   
   


    @Override
    public ReturnT<String> callback(List<HandleCallbackParam> callbackParamList) {
   
   
        return JobCompleteHelper.getInstance().callback(callbackParamList);
    }

    @Override
    public ReturnT<String> registry(RegistryParam registryParam) {
   
   
        return JobRegistryHelper.getInstance().registry(registryParam);
    }

    @Override
    public ReturnT<String> registryRemove(RegistryParam registryParam) {
   
   
        return JobRegistryHelper.getInstance().registryRemove(registryParam);
    }

}

3.JobRegistryHelper

在AdminBizImpl中的实现也不难理解,通过在初始化start()方法中创建的registryOrRemoveThreadPool线程池中执行异步注册任务,注册信息写入到数据表xxl_job_registry中。

//AdminBizImpl.java

    public void start(){
   
   

        // for registry or remove
        registryOrRemoveThreadPool = new ThreadPoolExecutor(
                2,
                10,
                30L,
                TimeUnit.SECONDS,
                new LinkedBlockingQueue<Runnable>(2000),
                new ThreadFactory() {
   
   
                    @Override
                    public Thread newThread(Runnable r) {
   
   
                        return new Thread(r, "xxl-job, admin JobRegistryMonitorHelper-registryOrRemoveThreadPool-" + r.hashCode());
                    }
                },
                new RejectedExecutionHandler() {
   
   
                    @Override
                    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
   
   
                        r.run();
                        logger.warn(">>>>>>>>>>> xxl-job, registry or remove too fast, match threadpool rejected handler(run now).");
                    }
                });

    //...省略

public ReturnT<String> registry(RegistryParam registryParam) {
   
   

   // valid
   if (!StringUtils.hasText(registryParam.getRegistryGroup())
         || !StringUtils.hasText(registryParam.getRegistryKey())
         || !StringUtils.hasText(registryParam.getRegistryValue())) {
   
   
      return new ReturnT<String>(ReturnT.FAIL_CODE, "Illegal Argument.");
   }

   // async execute
   //my-异步执行,将注册信息持久化到数据库
   registryOrRemoveThreadPool.execute(new Runnable() {
   
   
      @Override
      public void run() {
   
   
        //my-写入数据库
         int ret = XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().registryUpdate(registryParam.getRegistryGroup(), registryParam.getRegistryKey(), registryParam.getRegistryValue(), new Date());
         if (ret < 1) {
   
   
            XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().registrySave(registryParam.getRegistryGroup(), registryParam.getRegistryKey(), registryParam.getRegistryValue(), new Date());

            // fresh
            freshGroupRegistryInfo(registryParam);
         }
      }
   });

   return ReturnT.SUCCESS;
}

4.总结

  1. 执行器通过restful api形式注册到调度中心来,调度中心JobApiController对应有3个注册、下线和回调的方法实现;
  2. 通过AdminBizImpl的adminBiz.registry(registryParam)来实际执行注册方法,实际使用JobRegistryHelper类;
  3. 在JobRegistryHelper类中在初始化的时候会创建一个线程池,每次注册执行器的时候会创建一个异步线程来将注册信息持久化的数据库;

JobApiController_api

二、调度中心的配置和启动

1.添加权限控制

制定权限注解@PermissionLimit,其实现的逻辑在PermissionInterceptor中,首先判断是否需要鉴权,如果需要则根据cookie中拿到的用户信息查库判断是否有权限登录,如果没有权限则重定向到登录页面或提示没有权限。

@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
public @interface PermissionLimit {
   
   

   /**
    * 登录拦截 (默认拦截)
    */
   boolean limit() default true;

   /**
    * 要求管理员权限
    *
    * @return
    */
   boolean adminuser() default false;

}
@Component
public class PermissionInterceptor implements AsyncHandlerInterceptor {
   
   

   @Resource
   private LoginService loginService;

   @Override
   public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception {
   
   
      //my-处理登录权限的逻辑
      if (!(handler instanceof HandlerMethod)) {
   
   
         return true;   // proceed with the next interceptor
      }

      // if need login
      boolean needLogin = true;
      boolean needAdminuser = false;
      HandlerMethod method = (HandlerMethod)handler;
      PermissionLimit permission = method.getMethodAnnotation(PermissionLimit.class);
      if (permission!=null) {
   
   
         needLogin = permission.limit();
         needAdminuser = permission.adminuser();
      }

      if (needLogin) {
   
   
         XxlJobUser loginUser = loginService.ifLogin(request, response);
         if (loginUser == null) {
   
   
            response.setStatus(302);
            response.setHeader("location", request.getContextPath()+"/toLogin");
            return false;
         }
         if (needAdminuser && loginUser.getRole()!=1) {
   
   
            throw new RuntimeException(I18nUtil.getString("system_permission_limit"));
         }
         request.setAttribute(LoginService.LOGIN_IDENTITY_KEY, loginUser);
      }

      return true;   // proceed with the next interceptor
   }

}

将PermissionInterceptor添加到web配置文件中。

@Configuration
public class WebMvcConfig implements WebMvcConfigurer {

    @Resource
    private PermissionInterceptor permissionInterceptor;
    @Resource
    private CookieInterceptor cookieInterceptor;

    @Override
    public void addInterceptors(InterceptorRegistry registry) {
        registry.addInterceptor(permissionInterceptor).addPathPatterns("/**");
        registry.addInterceptor(cookieInterceptor).addPathPatterns("/**");
    }

}

2.配置中心初始化

xxljob的初始化和销毁动作在XxlJobAdminConfig中配置完成。

@Component
public class XxlJobAdminConfig implements InitializingBean, DisposableBean {
   
   

    private static XxlJobAdminConfig adminConfig = null;
    public static XxlJobAdminConfig getAdminConfig() {
   
   
        return adminConfig;
    }


    // ---------------------- XxlJobScheduler ----------------------

    private XxlJobScheduler xxlJobScheduler;

    @Override
    public void afterPropertiesSet() throws Exception {
   
   
        adminConfig = this;

        xxlJobScheduler = new XxlJobScheduler();
        xxlJobScheduler.init();
    }

    @Override
    public void destroy() throws Exception {
   
   
        xxlJobScheduler.destroy();
    }
}

具体初始化的操作。

public class XxlJobScheduler  {
   
   
    private static final Logger logger = LoggerFactory.getLogger(XxlJobScheduler.class);


    public void init() throws Exception {
   
   
        // init i18n
        initI18n();

        // admin trigger pool start
        JobTriggerPoolHelper.toStart();

        // admin registry monitor run
        JobRegistryHelper.getInstance().start();

        // admin fail-monitor run
        JobFailMonitorHelper.getInstance().start();

        // admin lose-monitor run ( depend on JobTriggerPoolHelper )
        JobCompleteHelper.getInstance().start();

        // admin log report start
        JobLogReportHelper.getInstance().start();

        // start-schedule  ( depend on JobTriggerPoolHelper )
        JobScheduleHelper.getInstance().start();

        logger.info(">>>>>>>>> init xxl-job admin success.");
    }


    public void destroy() throws Exception {
   
   

        // stop-schedule
        JobScheduleHelper.getInstance().toStop();

        // admin log report stop
        JobLogReportHelper.getInstance().toStop();

        // admin lose-monitor stop
        JobCompleteHelper.getInstance().toStop();

        // admin fail-monitor stop
        JobFailMonitorHelper.getInstance().toStop();

        // admin registry stop
        JobRegistryHelper.getInstance().toStop();

        // admin trigger pool stop
        JobTriggerPoolHelper.toStop();

    }

    // ---------------------- I18n ----------------------

    private void initI18n(){
   
   
        for (ExecutorBlockStrategyEnum item:ExecutorBlockStrategyEnum.values()) {
   
   
            item.setTitle(I18nUtil.getString("jobconf_block_".concat(item.name())));
        }
    }
目录
相关文章
|
5月前
|
Java 调度 Maven
【分布式任务调度平台 XXL-JOB 急速入门】从零开始将 XXL-JOB 接入到自己的项目(下)
【分布式任务调度平台 XXL-JOB 急速入门】从零开始将 XXL-JOB 接入到自己的项目(下)
250 0
|
存储 Java BI
XXL-JOB定时任务知识点和应用实例
XXL-JOB是一个分布式任务调度平台,其核心设计目标是开发迅速、学习简单、轻量级、易扩展。现已开放源代码并接入多家公司线上产品线,开箱即用。该处只是介绍xxl_job的一下基础知识和使用的实例,具体的安装调试请参照对应的最新的官方文档,中文开源地址:https://www.xuxueli.com/xxl-job
3491 0
|
2月前
|
存储 监控 算法
XXL-JOB内部机制大揭秘:让任务调度飞起来
【8月更文挑战第14天】在大数据时代,高效的任务调度系统是支撑业务稳定运行与快速迭代的基石。XXL-JOB,作为一款轻量级、分布式任务调度平台,凭借其灵活的配置、强大的扩展性和高可用特性,在众多任务调度框架中脱颖而出。今天,我们就来深入揭秘XXL-JOB的内部机制,看看它是如何让任务调度“飞起来”的。
89 0
|
运维 监控 算法
从定时任务-到任务调度系统xxl-job
定时任务的今生前世以及xxl-job调度系统
2404 0
从定时任务-到任务调度系统xxl-job
|
4月前
|
测试技术 Nacos Docker
xxl任务绑定一台服务器的两个项目
项目需在测试和生产环境自动注册到xxl服务。之前测试环境未注册,且手动注册。解决方案:修改xxl.nacos配置,设置xxl.job.executor.ip为特定IP,避免自动注册错误的IP。因同一机器上运行两个项目,需分配不同端口,如测试环境设为9997,并在docker运行命令中映射该端口。最后在阿里云开放9997端口并重启服务,实现自动注册。
|
5月前
|
Arthas Kubernetes 调度
一次线上Xxl-Job定时任务调度失败的排查与解决
在XXL-JOB系统中,每分钟执行一次的任务出现调度异常:首次调度成功,第二次调度失败,但第三次调度显示的是第二次任务的执行时间。问题源于调度中心发送的心跳请求未得到执行器响应,导致连接被Envoy代理关闭。Envoy在等待心跳响应时,后续调度请求被阻塞,直至连接因超时关闭。调度中心收到503响应后也会关闭连接,从而影响第三次调度。解决方案是更新执行器以处理心跳请求或配置Istio Sidecar以绕过Envoy代理特定流量。
971 0
|
5月前
|
SQL 负载均衡 监控
【分布式任务调度平台 XXL-JOB 急速入门】从零开始将 XXL-JOB 接入到自己的项目(上)
【分布式任务调度平台 XXL-JOB 急速入门】从零开始将 XXL-JOB 接入到自己的项目
330 0
|
5月前
|
SQL 调度 数据库
Docker部署Xxl-Job分布式任务调度中心(超详细)
Docker部署Xxl-Job分布式任务调度中心(超详细)
|
5月前
|
负载均衡 Java 调度
xxl-job与其他调度框架比较与部署
xxl-job与其他调度框架比较与部署
xxl-job与其他调度框架比较与部署
|
5月前
|
SQL Java 关系型数据库
【极光系列】springBoot集成xxl-job调度器
【极光系列】springBoot集成xxl-job调度器
79 2
下一篇
无影云桌面