最近公司在搞分布式的定时任务, 怎么满足分布式的定时任务锁。 我看了大量的开源的代码。 https://github.com/lukas-krecan/ShedLock 感觉老外写的非常的不错。
其实底层也就是分布式锁+aop 的切片来实现的。那既然别人也能实现。 那我们也可以的。 这里分布式锁我们使用zookeeper 来实现。具体的客户端我使用zookeeper 的curator 来实现。 官网地址 http://curator.apache.org/getting-started.html , 写的比较详细,清晰。 分布式锁也有好几种形式。 下面我们就来写代码实现一下。
首先安装zookeeper
- 我们看到了是一个空的客户端, 现在已经连接上了。
3, 新建一个maven Springboot工程 导入curator 的客户端pom.xml 文件
- 编写客户端的配置类
@Component
public class ZkClientConfig {
// 从配置文件拿取zk 的连接
@Value("${zk.url}")
private String zkConnection;
private static CuratorFramework client;
@PostConstruct
public void startClient(){
// 100 毫秒从新连接三次。
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
client = CuratorFrameworkFactory.newClient(zkConnection, retryPolicy);
client.start();
}
}
要是看见出现这种,就说明连接成功了。
在编写可重入锁的service
package com.example.demo.service;
import com.example.demo.config.ZkClientConfig;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import org.springframework.stereotype.Service;
import org.springframework.util.ObjectUtils;
import java.util.concurrent.TimeUnit;
@Service
public class ZkClientService {
private static InterProcessMutex lock;
/**
* 尝试获取锁
* @param time
* @param timeUnit
* @return
*/
public boolean lock(long time , TimeUnit timeUnit,String lockPath){
try {
lock = new InterProcessMutex(ZkClientConfig.client, lockPath);
return lock.acquire(time,timeUnit);
}catch (Exception e){
return false;
}
}
/**
* 创建父节点和子节点
* @param lockPath
* @throws Exception
*/
public void create(String lockPath) throws Exception {
ZkClientConfig.client.create().forPath(lockPath,"test".getBytes());
}
/**/ 遍历路径下面的节点
public int SizeForPath(String path) throws Exception {
return ZkClientConfig.client.getChildren().forPath(path).size();
}
/**/ 判断路径是否存在
public Boolean isExist(String path) throws Exception {
return !ObjectUtils.isEmpty(ZkClientConfig.client.checkExists().forPath(path));
}
/**
* 尝试获取锁
* @return
*/
public void lockNotTime(String lockPath){
try {
lock = new InterProcessMutex(ZkClientConfig.client, lockPath);
lock.acquire();
}catch (Exception e){
e.printStackTrace();
}
}
/**
* 释放锁
*/
public void releaseLock(){
try {
lock.release();
}catch (Exception e){
e.printStackTrace();
}
}
}
启动项目,调用初始化lock 的方法 看一下zk 的界面
但是这个节点释放锁就会消失, 关闭zk 也会消失。说明是临时的。 根据后缀来看应该是顺序的节点。
好了, 下面我们就来自定义注解了。
@Documented
@Retention(RUNTIME)
@Target(METHOD)
public @interface DCScheduleLock {
/**
* 定时任务的名称,其实也就是zk的节点,具有唯一性,不能重复。
* @return
*/
String name() ;
/**
* 锁的时长
* @return
*/
long time();
/**
* 锁时长单位
* @return
*/
TimeUnit unit();
}
在写一下AOP切片
@Component
@Aspect
public class SchedulelockAop {
// path 路径前缀
private final static String PATH_PREFIX = "/";
// 切片的注解
@Pointcut("@annotation(com.example.demo.annotation.DCScheduleLock)")
public void dcScheduleLock() {
}
@Autowired
private ZkClientService zkClientService;
//阻塞多长时间
private static ThreadLocal<Long> threadLocalLong = new ThreadLocal<>();
// 判断单个还是多个实列, 单个实例不走延迟
private static ThreadLocal<Boolean> single = new ThreadLocal<>();
@Around( value = "dcScheduleLock()")
public Object lockAround(ProceedingJoinPoint joinPoint) throws Throwable {
MethodSignature signature = (MethodSignature)joinPoint.getSignature();
Method method = signature.getMethod();
DCScheduleLock dcScheduleLock = AnnotationUtils.findAnnotation(method, DCScheduleLock.class);
Scheduled scheduled = AnnotationUtils.findAnnotation(method, Scheduled.class);
if(!ObjectUtils.isEmpty(scheduled)) {
// 判断是否存在根路径,不存在就创建
if(!zkClientService.isExist(PATH_PREFIX+method+dcScheduleLock.name())){
zkClientService.create(PATH_PREFIX+method+dcScheduleLock.name());
}
// 判断当前根路径,也就是子路径的个数
int size = zkClientService.SizeForPath(PATH_PREFIX+method+dcScheduleLock.name());
// 判断是单个实列还是多个
if(size<2){
if(!zkClientService.isExist(PATH_PREFIX+method+dcScheduleLock.name()+PATH_PREFIX+String.valueOf(Thread.currentThread().getId()))){
zkClientService.create(PATH_PREFIX+method+dcScheduleLock.name()+PATH_PREFIX+String.valueOf(Thread.currentThread().getId()));
}
single.set(true);
}else {
single.set(false);
}
System.out.println("实例的个数: "+size);
Object obj = null;
System.out.println(" 线程id "+ Thread.currentThread().getId());
threadLocalLong.set(timeToLong(dcScheduleLock.unit(),dcScheduleLock.time(),scheduled.cron()));
if(zkClientService.lock(dcScheduleLock.time(),dcScheduleLock.unit() ,PATH_PREFIX+ dcScheduleLock.name())){
// 抢到锁就执行方法,没有就不做任何处理。
obj = joinPoint.proceed();
}
return obj;
}else {
System.out.println( Thread.currentThread().getName()+ "线程没有获取锁 ");
return joinPoint.proceed();
}
}
// 后置处理器,获取阻塞的时间才能释放锁
@After(value = "dcScheduleLock()")
public void after(JoinPoint joinPoint) throws InterruptedException {
// 单室例节点不阻塞
if(!single.get()){
Thread.sleep(threadLocalLong.get());
threadLocalLong.remove();
}
//切记一定要释放锁。
zkClientService.releaseLock();
System.out.println( Thread.currentThread().getName()+ "线程释放锁成功 ");
}
/**
* 这里先写死就是五秒钟, 其实是拿到cron 和 所得时间比较, 取最大的时间
* @param timeUnit
* @param time
* @return
*/
public long timeToLong(TimeUnit timeUnit,long time ,String cron){
return 1000*5;
}
}
在写一下测试代码:
@Scheduled(cron = "0/5 ?")
@DCScheduleLock(name = "test",time =2,unit = TimeUnit.SECONDS)
public void test(){
System.out.println(" current thread --->>>>"+ new Date());
}
单个机器测试:
没问题,两台机器测试我们看看啊。
这里的实列也是用节点来标识的,路径是schedulLock 的name 和方法名称。
看一下完美解决了, 一共两个实列。
后面还没写完,实列不应该是持久化的节点,也应该是临时的。
最后看一下zk
里面存放的使我们两个实列。
具体流程图