仿写@ScheduleLock 定时任务

简介: 最近公司在搞分布式的定时任务, 怎么满足分布式的定时任务锁。 我看了大量的开源的代码。 https://github.com/lukas-krecan/ShedLock 感觉老外写的非常的不错。

最近公司在搞分布式的定时任务, 怎么满足分布式的定时任务锁。 我看了大量的开源的代码。 https://github.com/lukas-krecan/ShedLock 感觉老外写的非常的不错。

其实底层也就是分布式锁+aop 的切片来实现的。那既然别人也能实现。 那我们也可以的。 这里分布式锁我们使用zookeeper 来实现。具体的客户端我使用zookeeper 的curator 来实现。 官网地址 http://curator.apache.org/getting-started.html , 写的比较详细,清晰。 分布式锁也有好几种形式。 下面我们就来写代码实现一下。

首先安装zookeeper

  1. 我们看到了是一个空的客户端, 现在已经连接上了。

1.png
3, 新建一个maven Springboot工程 导入curator 的客户端pom.xml 文件

  1. 编写客户端的配置类

@Component
public class ZkClientConfig {

 // 从配置文件拿取zk 的连接
 @Value("${zk.url}")
 private String zkConnection;

 private static CuratorFramework client;

 @PostConstruct
 public void startClient(){
     // 100 毫秒从新连接三次。
     RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
     client = CuratorFrameworkFactory.newClient(zkConnection, retryPolicy);
     client.start();
 }

}

要是看见出现这种,就说明连接成功了。
2.png

在编写可重入锁的service

package com.example.demo.service;

import com.example.demo.config.ZkClientConfig;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import org.springframework.stereotype.Service;
import org.springframework.util.ObjectUtils;

import java.util.concurrent.TimeUnit;

@Service
public class ZkClientService {

private static  InterProcessMutex lock;

/**
 * 尝试获取锁
 * @param time
 * @param timeUnit
 * @return
 */
public boolean lock(long time , TimeUnit timeUnit,String lockPath){
      try {
          lock = new InterProcessMutex(ZkClientConfig.client, lockPath);

       return    lock.acquire(time,timeUnit);
      }catch (Exception e){
          return  false;
      }
}

/**
 *  创建父节点和子节点
 * @param lockPath
 * @throws Exception
 */
public void create(String lockPath) throws Exception {
    ZkClientConfig.client.create().forPath(lockPath,"test".getBytes());

}

/**/ 遍历路径下面的节点

public int SizeForPath(String path) throws Exception {
   return ZkClientConfig.client.getChildren().forPath(path).size();
}

/**/ 判断路径是否存在
public Boolean isExist(String path) throws Exception {
    return !ObjectUtils.isEmpty(ZkClientConfig.client.checkExists().forPath(path));
}



/**
 * 尝试获取锁
 * @return
 */
public void  lockNotTime(String lockPath){
    try {
        lock = new InterProcessMutex(ZkClientConfig.client, lockPath);
        lock.acquire();
    }catch (Exception e){
        e.printStackTrace();
    }
}


/**
 *  释放锁
 */
public  void releaseLock(){
    try {
        lock.release();
    }catch (Exception e){
        e.printStackTrace();
    }
}

}

启动项目,调用初始化lock 的方法 看一下zk 的界面
3.png
4.png

但是这个节点释放锁就会消失, 关闭zk 也会消失。说明是临时的。 根据后缀来看应该是顺序的节点。

好了, 下面我们就来自定义注解了。

@Documented
@Retention(RUNTIME)
@Target(METHOD)
public @interface DCScheduleLock {

/**
 *  定时任务的名称,其实也就是zk的节点,具有唯一性,不能重复。
 * @return
 */
String name() ;

/**
 *  锁的时长
 * @return
 */
long time();

/**
 * 锁时长单位
 * @return
 */
TimeUnit unit();

}

在写一下AOP切片

@Component
@Aspect
public class SchedulelockAop {

// path 路径前缀
private final static String PATH_PREFIX = "/";

// 切片的注解
@Pointcut("@annotation(com.example.demo.annotation.DCScheduleLock)")
public void dcScheduleLock() {

}

@Autowired
private ZkClientService zkClientService;

//阻塞多长时间

private static ThreadLocal<Long> threadLocalLong = new ThreadLocal<>();

// 判断单个还是多个实列, 单个实例不走延迟
private static ThreadLocal<Boolean> single = new ThreadLocal<>();



@Around( value = "dcScheduleLock()")
public Object lockAround(ProceedingJoinPoint joinPoint) throws Throwable {
    MethodSignature signature = (MethodSignature)joinPoint.getSignature();
    Method method = signature.getMethod();
    DCScheduleLock dcScheduleLock = AnnotationUtils.findAnnotation(method, DCScheduleLock.class);
    Scheduled scheduled = AnnotationUtils.findAnnotation(method, Scheduled.class);
    if(!ObjectUtils.isEmpty(scheduled)) {
       // 判断是否存在根路径,不存在就创建
        if(!zkClientService.isExist(PATH_PREFIX+method+dcScheduleLock.name())){
            zkClientService.create(PATH_PREFIX+method+dcScheduleLock.name());
        }
        //  判断当前根路径,也就是子路径的个数
        int size =  zkClientService.SizeForPath(PATH_PREFIX+method+dcScheduleLock.name());
        // 判断是单个实列还是多个
        if(size<2){
            if(!zkClientService.isExist(PATH_PREFIX+method+dcScheduleLock.name()+PATH_PREFIX+String.valueOf(Thread.currentThread().getId()))){
                zkClientService.create(PATH_PREFIX+method+dcScheduleLock.name()+PATH_PREFIX+String.valueOf(Thread.currentThread().getId()));
            }
            single.set(true);
        }else {
            single.set(false);
        }
        System.out.println("实例的个数: "+size);
        Object obj = null;
        System.out.println(" 线程id "+ Thread.currentThread().getId());
        threadLocalLong.set(timeToLong(dcScheduleLock.unit(),dcScheduleLock.time(),scheduled.cron()));
        if(zkClientService.lock(dcScheduleLock.time(),dcScheduleLock.unit() ,PATH_PREFIX+ dcScheduleLock.name())){
             // 抢到锁就执行方法,没有就不做任何处理。
             obj = joinPoint.proceed();
        }
        return  obj;
         }else {
        System.out.println( Thread.currentThread().getName()+ "线程没有获取锁 ");
        return joinPoint.proceed();
    }
}

// 后置处理器,获取阻塞的时间才能释放锁
@After(value = "dcScheduleLock()")
public void after(JoinPoint joinPoint) throws InterruptedException {
    // 单室例节点不阻塞
    if(!single.get()){
        Thread.sleep(threadLocalLong.get());
        threadLocalLong.remove();
    }
   //切记一定要释放锁。 
    zkClientService.releaseLock();
    System.out.println( Thread.currentThread().getName()+ "线程释放锁成功 ");

}


/**
 *  这里先写死就是五秒钟, 其实是拿到cron  和 所得时间比较, 取最大的时间
 * @param timeUnit
 * @param time
 * @return
 */
public  long timeToLong(TimeUnit timeUnit,long time ,String cron){
    return 1000*5;
}

}

在写一下测试代码:

@Scheduled(cron = "0/5 ?")
@DCScheduleLock(name = "test",time =2,unit = TimeUnit.SECONDS)
public void test(){

System.out.println(" current  thread --->>>>"+ new Date());

}

单个机器测试:
5.png

没问题,两台机器测试我们看看啊。
6.png

这里的实列也是用节点来标识的,路径是schedulLock 的name 和方法名称。

看一下完美解决了, 一共两个实列。

后面还没写完,实列不应该是持久化的节点,也应该是临时的。

最后看一下zk

7.png

里面存放的使我们两个实列。

具体流程图
8.png

相关文章
|
6月前
|
缓存 Java 调度
使用scheduleAtFixedRate进行定时任务调度
使用scheduleAtFixedRate进行定时任务调度
|
8月前
|
消息中间件 安全 Java
一起来探究@Schedule定时任务在分布式产生的问题
一起来探究@Schedule定时任务在分布式产生的问题
419 0
springboot工程如何设置定时任务详解(@Scheduled)以及cron表达式
springboot工程如何设置定时任务详解(@Scheduled)以及cron表达式
|
Java 调度 Spring
SpringMVC定时任务注解实现@Schedule【良心文章】
SpringMVC定时任务注解实现@Schedule【良心文章】
298 0
SpringMVC定时任务注解实现@Schedule【良心文章】
|
存储 Java API
调度线程池ScheduledThreadPoolExecutor源码解析
调度线程池ScheduledThreadPoolExecutor源码解析
161 0
调度线程池ScheduledThreadPoolExecutor源码解析
|
Java
使用ScheduledExecutorService线程池创建定时任务
使用ScheduledExecutorService线程池创建定时任务
259 0
|
Python
Python编程:定时任务apscheduler框架
Python编程:定时任务apscheduler框架
293 0
Python编程:定时任务apscheduler框架
|
数据可视化 Linux Python
Schedule | 轻量化的定时任务模块
Schedule | 轻量化的定时任务模块
397 0
|
存储 NoSQL 关系型数据库