分布式锁—7.Curator的分布式锁

简介: 本文详细解析了Apache Curator库中多种分布式锁的实现机制,包括可重入锁、非可重入锁、可重入读写锁、MultiLock和Semaphore。可重入锁通过InterProcessMutex实现,支持同一线程多次加锁,锁的获取和释放通过Zookeeper的临时顺序节点实现。非可重入锁InterProcessSemaphoreMutex基于Semaphore实现,确保同一时间只有一个线程获取锁。可重入读写锁InterProcessReadWriteLock通过组合读锁和写锁实现,支持读写分离。Multi

大纲

1.Curator的可重入锁的源码

2.Curator的非可重入锁的源码

3.Curator的可重入读写锁的源码

4.Curator的MultiLock源码

5.Curator的Semaphore源码

 

1.Curator的可重入锁的源码

(1)InterProcessMutex获取分布式锁

(2)InterProcessMutex的初始化

(3)InterProcessMutex.acquire()尝试获取锁

(4)LockInternals.attemptLock()尝试获取锁

(5)不同客户端线程获取锁时的互斥实现

(6)同一客户端线程可重入加锁的实现

(7)客户端线程释放锁的实现

(8)客户端线程释放锁后其他线程获取锁的实现

(9)InterProcessMutex就是一个公平锁

 

(1)InterProcessMutex获取分布式锁

public class Demo {
    public static void main(String[] args) throws Exception {
        RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
        CuratorFramework client = CuratorFrameworkFactory.newClient(
            "127.0.0.1:2181", 
            5000, 
            3000, 
            retryPolicy
        );
        client.start();
        System.out.println("已经启动Curator客户端");
        
        //获取分布式锁
        InterProcessMutex lock = new InterProcessMutex(client, "/locks/myLock");
        lock.acquire();
        Thread.sleep(1000);
        lock.release();
    }
}

(2)InterProcessMutex的初始化

设置锁的节点路径basePath + 初始化一个LockInternals对象实例。

public class InterProcessMutex implements InterProcessLock, Revocable<InterProcessMutex> {
    private final LockInternals internals;
    private final String basePath;
    private static final String LOCK_NAME = "lock-";
    ...
    public InterProcessMutex(CuratorFramework client, String path) {
        this(client, path, new StandardLockInternalsDriver());
    }
    
    public InterProcessMutex(CuratorFramework client, String path, LockInternalsDriver driver) {
        this(client, path, LOCK_NAME, 1, driver);
    }
    
    //初始化InterProcessMutex
    InterProcessMutex(CuratorFramework client, String path, String lockName, int maxLeases, LockInternalsDriver driver) {
        //1.设置锁的节点路径
        basePath = PathUtils.validatePath(path);
        //2.初始化一个LockInternals对象实例
        internals = new LockInternals(client, driver, path, lockName, maxLeases);
    }
}
public class LockInternals {
    private final LockInternalsDriver driver;
    private final String lockName;
    private volatile int maxLeases;
    private final WatcherRemoveCuratorFramework client;
    private final String basePath;
    private final String path;
    ...
    LockInternals(CuratorFramework client, LockInternalsDriver driver, String path, String lockName, int maxLeases) {
        this.driver = driver;
        this.lockName = lockName;
        this.maxLeases = maxLeases;
        this.client = client.newWatcherRemoveCuratorFramework();
        this.basePath = PathUtils.validatePath(path);
        this.path = ZKPaths.makePath(path, lockName);
    }
    ...
}

(3)InterProcessMutex.acquire()尝试获取锁

LockData是InterProcessMutex的一个静态内部类。一个线程对应一个LockData实例对象,用来描述线程持有的锁的具体情况。多个线程对应的LockData存放在一个叫threadData的ConcurrentMap中。LockData中有一个原子变量lockCount,用于锁的重入次数计数。

 

在执行InterProcessMutex的acquire()方法尝试获取锁时:首先会尝试取出当前线程对应的LockData数据,判断是否存在。如果存在,则说明锁正在被当前线程重入,重入次数自增后直接返回。如果不存在,则调用LockInternals的attemptLock()方法尝试获取锁。默认情况下,attemptLock()方法传入的等待获取锁的时间time = -1。

public class InterProcessMutex implements InterProcessLock, Revocable<InterProcessMutex> {
    private final LockInternals internals;
    private final String basePath;
    private static final String LOCK_NAME = "lock-";
    //一个线程对应一个LockData数据对象
    private final ConcurrentMap<Thread, LockData> threadData = Maps.newConcurrentMap();
    ...
    //初始化InterProcessMutex
    InterProcessMutex(CuratorFramework client, String path, String lockName, int maxLeases, LockInternalsDriver driver) {
        //设置锁的路径
        basePath = PathUtils.validatePath(path);
        //初始化LockInternals
        internals = new LockInternals(client, driver, path, lockName, maxLeases);
    }
    
    @Override
    public void acquire() throws Exception {
        //获取分布式锁,会一直阻塞等待直到获取成功
        //相同的线程可以重入锁,每一次调用acquire()方法都要匹配一个release()方法的调用
        if (!internalLock(-1, null)) {
            throw new IOException("Lost connection while trying to acquire lock: " + basePath);
        }
    }
    
    private boolean internalLock(long time, TimeUnit unit) throws Exception {
        //获取当前线程
        Thread currentThread = Thread.currentThread();
        //获取当前线程对应的LockData数据
        LockData lockData = threadData.get(currentThread);
        if (lockData != null) {
            //可重入计算
            lockData.lockCount.incrementAndGet();
            return true;
        }
        //调用LockInternals.attemptLock()方法尝试获取锁,默认情况下,传入的time=-1,表示等待获取锁的时间
        String lockPath = internals.attemptLock(time, unit, getLockNodeBytes());
        if (lockPath != null) {
            //获取锁成功,将当前线程 + 其创建的临时顺序节点路径,封装成一个LockData对象
            LockData newLockData = new LockData(currentThread, lockPath);
            //然后把该LockData对象存放到InterProcessMutex.threadData这个Map中
            threadData.put(currentThread, newLockData);
            return true;
        }
        return false;
    }
    
    //LockData是InterProcessMutex的一个静态内部类
    private static class LockData {
        final Thread owningThread;
        final String lockPath;
        final AtomicInteger lockCount = new AtomicInteger(1);//用于锁的重入次数计数
        private LockData(Thread owningThread, String lockPath) {
            this.owningThread = owningThread;
            this.lockPath = lockPath;
        }
    }
    
    protected byte[] getLockNodeBytes() {
        return null;
    }
    ...
}

(4)LockInternals.attemptLock()尝试获取锁

先创建临时节点,再判断是否满足获取锁的条件。

 

步骤一:首先调用LockInternalsDriver的createsTheLock()方法创建一个临时顺序节点。其中creatingParentContainersIfNeeded()表示级联创建,forPath(path)表示创建的节点路径名称,withMode(CreateMode.EPHEMERAL_SEQUENTIAL)表示临时顺序节点。

 

步骤二:然后调用LockInternals的internalLockLoop()方法检查是否获取到了锁。在LockInternals的internalLockLoop()方法的while循环中,会先获取排好序的客户端线程尝试获取锁时创建的临时顺序节点名称列表。然后获取当前客户端线程尝试获取锁时创建的临时顺序节点的名称,再根据名称获取在节点列表中的位置 + 是否可以获取锁 + 前一个节点的路径,也就是获取一个封装好这些信息的PredicateResults对象。

 

具体会根据节点名称获取当前线程创建的临时顺序节点在节点列表的位置,然后会比较当前线程创建的节点的位置和maxLeases的大小。其中maxLeases代表了同时允许多少个客户端可以获取到锁,默认是1。如果当前线程创建的节点的位置小,则表示可以获取锁。如果当前线程创建的节点的位置大,则表示获取锁失败。

 

获取锁成功,则会中断LockInternals的internalLockLoop()方法的while循环,然后向外返回当前客户端线程创建的临时顺序节点路径。接着在InterProcessMutex的internalLock()方法中,会将当前线程 + 其创建的临时顺序节点路径,封装成一个LockData对象,然后把该LockData对象存放到InterProcessMutex.threadData这个Map中。

 

获取锁失败,则通过PredicateResults对象先获取前一个节点路径名称。然后通过getData()方法获取前一个节点路径在zk的信息,并添加Watcher监听。该Watcher监听主要是用来唤醒在LockInternals中被wait()阻塞的线程。添加完Watcher监听后,便会调用wait()方法将当前线程挂起。

 

所以前一个节点发生变化时,便会通知添加的Watcher监听。然后便会唤醒阻塞的线程,继续执行internalLockLoop()方法的while循环。while循环又会继续获取排序的节点列表 + 判断当前线程是否已获取锁。

public class LockInternals {
    private final LockInternalsDriver driver;
    LockInternals(CuratorFramework client, LockInternalsDriver driver, String path, String lockName, int maxLeases) {
        this.driver = driver;
        this.path = ZKPaths.makePath(path, lockName);//生成要创建的临时节点路径名称
        ...
    }
    ...
    String attemptLock(long time, TimeUnit unit, byte[] lockNodeBytes) throws Exception {
        //获取当前时间
        final long startMillis = System.currentTimeMillis();
        //默认情况下millisToWait=null
        final Long millisToWait = (unit != null) ? unit.toMillis(time) : null;
        //默认情况下localLockNodeBytes也是null
        final byte[] localLockNodeBytes = (revocable.get() != null) ? new byte[0] : lockNodeBytes;
        int retryCount = 0;
     
        String ourPath = null;
        boolean hasTheLock = false;//是否已经获取到锁
        boolean isDone = false;//是否正在获取锁
        while (!isDone) {
            isDone = true;
            //1.这里是关键性的加锁代码,会去级联创建一个临时顺序节点
            ourPath = driver.createsTheLock(client, path, localLockNodeBytes);
            //2.检查是否获取到了锁
            hasTheLock = internalLockLoop(startMillis, millisToWait, ourPath);
        }
        if (hasTheLock) {
            return ourPath;
        }
        return null;
    }
    
    private final Watcher watcher = new Watcher() {
        @Override
        public void process(WatchedEvent event) {
            //唤醒LockInternals中被wait()阻塞的线程
            client.postSafeNotify(LockInternals.this);
        }
    };
    
    //检查是否获取到了锁
    private boolean internalLockLoop(long startMillis, Long millisToWait, String ourPath) throws Exception {
        boolean haveTheLock = false;
        boolean doDelete = false;
        ...
        while ((client.getState() == CuratorFrameworkState.STARTED) && !haveTheLock) {
            //3.获取排好序的各个客户端线程尝试获取分布式锁时创建的临时顺序节点名称列表
            List<String> children = getSortedChildren();
            //4.获取当前客户端线程尝试获取分布式锁时创建的临时顺序节点的名称
            String sequenceNodeName = ourPath.substring(basePath.length() + 1); // +1 to include the slash
            //5.获取当前线程创建的节点在节点列表中的位置 + 是否可以获取锁 + 前一个节点的路径名称
            PredicateResults predicateResults = driver.getsTheLock(client, children, sequenceNodeName, maxLeases);
            if (predicateResults.getsTheLock()) {//获取锁成功
                //返回true
                haveTheLock = true;
            } else {//获取锁失败
                //获取前一个节点路径名称
                String previousSequencePath = basePath + "/" + predicateResults.getPathToWatch();
                synchronized(this) {
                    //use getData() instead of exists() to avoid leaving unneeded watchers which is a type of resource leak
                    //通过getData()获取前一个节点路径在zk的信息,并添加watch监听
                    client.getData().usingWatcher(watcher).forPath(previousSequencePath);
                    //默认情况下,millisToWait = null
                    if (millisToWait != null) {
                        millisToWait -= (System.currentTimeMillis() - startMillis);
                        startMillis = System.currentTimeMillis();
                        if (millisToWait <= 0) {
                            doDelete = true;//timed out - delete our node
                            break;
                        }
                        wait(millisToWait);//阻塞
                    } else {
                        wait();//阻塞
                    }
                }
            }
        }
        ...
        return haveTheLock;
    }
    
    List<String> getSortedChildren() throws Exception {
        //获取排好序的各个客户端线程尝试获取分布式锁时创建的临时顺序节点名称列表
        return getSortedChildren(client, basePath, lockName, driver);
    }
    
    public static List<String> getSortedChildren(CuratorFramework client, String basePath, final String lockName, final LockInternalsSorter sorter) throws Exception {
        //获取各个客户端线程尝试获取分布式锁时创建的临时顺序节点名称列表
        List<String> children = client.getChildren().forPath(basePath);
        //对节点名称进行排序
        List<String> sortedList = Lists.newArrayList(children);
        Collections.sort(
            sortedList,
            new Comparator<String>() {
                @Override
                public int compare(String lhs, String rhs) {
                    return sorter.fixForSorting(lhs, lockName).compareTo(sorter.fixForSorting(rhs, lockName));
                }
            }
        );
        return sortedList;
    }
    ...
}
public class StandardLockInternalsDriver implements LockInternalsDriver {
    ...
    //级联创建一个临时顺序节点
    @Override
    public String createsTheLock(CuratorFramework client, String path, byte[] lockNodeBytes) throws Exception {
        String ourPath;
        //默认情况下传入的lockNodeBytes=null
        if (lockNodeBytes != null) {
            ourPath = client.create().creatingParentContainersIfNeeded().withProtection().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(path, lockNodeBytes);
        } else {
            //创建临时顺序节点
            ourPath = client.create().creatingParentContainersIfNeeded().withProtection().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(path);
        }
        return ourPath;
    }
    
    //获取当前线程创建的节点在节点列表中的位置以及是否可以获取锁
    @Override
    public PredicateResults getsTheLock(CuratorFramework client, List<String> children, String sequenceNodeName, int maxLeases) throws Exception {
        //根据节点名称获取当前线程创建的临时顺序节点在节点列表中的位置
        int ourIndex = children.indexOf(sequenceNodeName);
        validateOurIndex(sequenceNodeName, ourIndex);
        //maxLeases代表的是同时允许多少个客户端可以获取到锁
        //getsTheLock为true表示可以获取锁,getsTheLock为false表示获取锁失败
        boolean getsTheLock = ourIndex < maxLeases;
        //获取当前节点需要watch的前一个节点路径
        String pathToWatch = getsTheLock ? null : children.get(ourIndex - maxLeases);
        return new PredicateResults(pathToWatch, getsTheLock);
    }
    ...
}

(5)不同客户端线程获取锁时的互斥实现

maxLeases代表了同时允许多少个客户端可以获取到锁,默认值是1。能否获取锁的判断就是:线程创建的节点的位置outIndex < maxLeases。当线程1创建的节点在节点列表中排第一时,满足outIndex = 0 < maxLeases = 1,可以获取锁。当线程2创建的节点再节点列表中排第二时,不满足outIndex = 1 < maxLeases = 1,所以不能获取锁。从而实现线程1和线程2获取锁时的互斥。

 

(6)同一客户端线程可重入加锁的实现

客户端线程重复获取锁时,会重复调用InterProcessMutex的internalLock()方法。在InterProcessMutex的internalLock()方法中:线程第一次获取锁成功会创建一个LockData对象,并存放在一个Map中。线程第二次获取锁时,便会从这个Map中取出这个LockData对象,并对LockData对象中的重入计数器lockCount进行递增,接着就返回true。以此实现可重入加锁。

 

(7)客户端线程释放锁的实现

客户端线程释放锁时会调用InterProcessMutex的release()方法。

 

首先对LockData里的重入计数器进行递减。当重入计数器大于0时,直接返回。当重入计数器为0时才执行下一步删除节点的操作。

 

然后删除客户端线程创建的临时顺序节点,client.delete().guaranteed().forPath(ourPath)。

public class InterProcessMutex implements InterProcessLock, Revocable<InterProcessMutex> {
    private final LockInternals internals;
    private final ConcurrentMap<Thread, LockData> threadData = Maps.newConcurrentMap();
    ...
    @Override
    public void release() throws Exception {
        //获取当前线程
        Thread currentThread = Thread.currentThread();
        //获取当前线程对应的LockData对象
        LockData lockData = threadData.get(currentThread);
        if (lockData == null) {
            throw new IllegalMonitorStateException("You do not own the lock: " + basePath);
        }
        //1.首先对LockData里的重入计数器lockCount进行递减
        int newLockCount = lockData.lockCount.decrementAndGet();
        if (newLockCount > 0) {
            //当重入计数器大于0时,直接返回
            return;
        }
        if (newLockCount < 0) {
            throw new IllegalMonitorStateException("Lock count has gone negative for lock: " + basePath);
        }
        try {
            //2.当重入计数器为0时执行删除节点的操作
            internals.releaseLock(lockData.lockPath);
        } finally {
            threadData.remove(currentThread);
        }
    }
    ...
}
public class LockInternals {
    ...
    final void releaseLock(String lockPath) throws Exception {
        client.removeWatchers();
        revocable.set(null);
        deleteOurPath(lockPath);
    }
    
    private void deleteOurPath(String ourPath) throws Exception {
        //删除节点
        client.delete().guaranteed().forPath(ourPath);
    }
    ...
}

(8)客户端线程释放锁后其他线程获取锁的实现

由于在节点列表里排第二的节点对应的线程会监听排第一的节点,而当持有锁的客户端线程释放锁后,排第一的节点会被删除掉。所以在节点列表里排第二的节点对应的客户端,便会收到zk的通知。于是会回调执行该线程添加的Watcher的process()方法,也就是唤醒该线程,让其继续执行while循环获取锁。

public class LockInternals {
    ...
    private final Watcher watcher = new Watcher() {
        @Override
        public void process(WatchedEvent event) {
            //唤醒LockInternals中被wait()阻塞的线程
            client.postSafeNotify(LockInternals.this);
        }
    };
    
    //检查是否获取到了锁
    private boolean internalLockLoop(long startMillis, Long millisToWait, String ourPath) throws Exception {
        boolean haveTheLock = false;
        boolean doDelete = false;
        ...
        while ((client.getState() == CuratorFrameworkState.STARTED) && !haveTheLock) {
            //3.获取排好序的各个客户端线程尝试获取分布式锁时创建的临时顺序节点名称列表
            List<String> children = getSortedChildren();
            //4.获取当前客户端线程尝试获取分布式锁时创建的临时顺序节点的名称
            String sequenceNodeName = ourPath.substring(basePath.length() + 1); // +1 to include the slash
            //5.获取当前线程创建的节点在节点列表中的位置+是否可以获取锁+前一个节点的路径名称
            PredicateResults predicateResults = driver.getsTheLock(client, children, sequenceNodeName, maxLeases);
            if (predicateResults.getsTheLock()) {//获取锁成功
                //返回true
                haveTheLock = true;
            } else {//获取锁失败
                //获取前一个节点路径名称
                String previousSequencePath = basePath + "/" + predicateResults.getPathToWatch();
                synchronized(this) {
                    //use getData() instead of exists() to avoid leaving unneeded watchers which is a type of resource leak
                    //通过getData()获取前一个节点路径在zk的信息,并添加watch监听
                    client.getData().usingWatcher(watcher).forPath(previousSequencePath);
                    //默认情况下,millisToWait = null
                    if (millisToWait != null) {
                        millisToWait -= (System.currentTimeMillis() - startMillis);
                        startMillis = System.currentTimeMillis();
                        if (millisToWait <= 0) {
                            doDelete = true;//timed out - delete our node
                            break;
                        }
                        wait(millisToWait);//阻塞
                    } else {
                        wait();//阻塞
                    }
                }
            }
        }
        ...
        return haveTheLock;
    }
    ...
}

(9)InterProcessMutex就是一个公平锁

因为所有客户端线程都会创建一个顺序节点,然后按申请锁的顺序进行排序。最后会依次按自己所在的排序来尝试获取锁,实现了所有客户端排队获取锁。

 

2.Curator的非可重入锁的源码

(1)Curator的非可重入锁InterProcessSemaphoreMutex的使用

(2)Curator的非可重入锁InterProcessSemaphoreMutex的源码

 

(1)Curator的非可重入锁InterProcessSemaphoreMutex的使用

非可重入锁:同一个时间只能有一个客户端线程获取到锁,其他线程都要排队,而且同一个客户端线程是不可重入加锁的。

public class Demo {
    public static void main(String[] args) throws Exception {
        RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
        final CuratorFramework client = CuratorFrameworkFactory.newClient(
            "127.0.0.1:2181",//zk的地址
            5000,//客户端和zk的心跳超时时间,超过该时间没心跳,Session就会被断开
            3000,//连接zk时的超时时间
            retryPolicy
        );
        client.start();
        System.out.println("已经启动Curator客户端,完成zk的连接");
        //非可重入锁
        InterProcessSemaphoreMutex lock = new InterProcessSemaphoreMutex(client, "/locks");
        lock.acquire();
        Thread.sleep(3000);
        lock.release();
    }
}

(2)Curator的非可重入锁InterProcessSemaphoreMutex的源码

Curator的非可重入锁是基于Semaphore来实现的,也就是将Semaphore允许获取Lease的客户端线程数设置为1,从而实现同一时间只能有一个客户端线程获取到Lease。

public class InterProcessSemaphoreMutex implements InterProcessLock {
    private final InterProcessSemaphoreV2 semaphore;
    private final WatcherRemoveCuratorFramework watcherRemoveClient;
    private volatile Lease lease;
    public InterProcessSemaphoreMutex(CuratorFramework client, String path) {
        watcherRemoveClient = client.newWatcherRemoveCuratorFramework();
        this.semaphore = new InterProcessSemaphoreV2(watcherRemoveClient, path, 1);
    }
    @Override
    public void acquire() throws Exception {
        //获取非可重入锁就是获取Semaphore的Lease
        lease = semaphore.acquire();
    }
    @Override
    public boolean acquire(long time, TimeUnit unit) throws Exception {
        Lease acquiredLease = semaphore.acquire(time, unit);
        if (acquiredLease == null) {
            return false;
        }
        lease = acquiredLease;
        return true;
    }
    @Override
    public void release() throws Exception {
        //释放非可重入锁就是释放Semaphore的Lease
        Lease lease = this.lease;
        Preconditions.checkState(lease != null, "Not acquired");
        this.lease = null;
        lease.close();
        watcherRemoveClient.removeWatchers();
    }
}

 

3.Curator的可重入读写锁的源码

(1)Curator的可重入读写锁InterProcessReadWriteLock的使用

(2)Curator的可重入读写锁InterProcessReadWriteLock的初始化

(3)InterProcessMutex获取锁的源码

(4)先获取读锁 + 后获取读锁的情形分析

(5)先获取读锁 + 后获取写锁的情形分析

(6)先获取写锁 + 后获取读锁的情形分析

(7)先获取写锁 + 再获取写锁的情形分析

 

(1)Curator的可重入读写锁InterProcessReadWriteLock的使用

public class Demo {
    public static void main(String[] args) throws Exception {
        RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
        final CuratorFramework client = CuratorFrameworkFactory.newClient(
            "127.0.0.1:2181",//zk的地址
            5000,//客户端和zk的心跳超时时间,超过该时间没心跳,Session就会被断开
            3000,//连接zk时的超时时间
            retryPolicy
        );
        client.start();
        System.out.println("已经启动Curator客户端,完成zk的连接");
        //读写锁
        InterProcessReadWriteLock lock = new InterProcessReadWriteLock(client, "/locks");
        lock.readLock().acquire();
        lock.readLock().release();
        lock.writeLock().acquire();
        lock.writeLock().release();
    }
}

(2)Curator的可重入读写锁InterProcessReadWriteLock的初始化

读锁和写锁都是基于可重入锁InterProcessMutex的子类来实现的。读锁和写锁的获取锁和释放锁逻辑,就是使用InterProcessMutex的逻辑。

public class InterProcessReadWriteLock {
    private final InterProcessMutex readMutex;//读锁
    private final InterProcessMutex writeMutex;//写锁
    //must be the same length. LockInternals depends on it
    private static final String READ_LOCK_NAME  = "__READ__";
    private static final String WRITE_LOCK_NAME = "__WRIT__";
    ...
    //InterProcessReadWriteLock的初始化
    public InterProcessReadWriteLock(CuratorFramework client, String basePath, byte[] lockData) {
        lockData = (lockData == null) ? null : Arrays.copyOf(lockData, lockData.length);
        //写锁的初始化
        writeMutex = new InternalInterProcessMutex(
            client,
            basePath,
            WRITE_LOCK_NAME,//写锁的lockName='__WRIT__'
            lockData,
            1,//写锁的maxLeases
            new SortingLockInternalsDriver() {
                @Override
                public PredicateResults getsTheLock(CuratorFramework client, List<String> children, String sequenceNodeName, int maxLeases) throws Exception {
                    return super.getsTheLock(client, children, sequenceNodeName, maxLeases);
                }
            }
        );
        //读锁的初始化
        readMutex = new InternalInterProcessMutex(
            client,
            basePath,
            READ_LOCK_NAME,//读锁的lockName='__READ__'
            lockData,
            Integer.MAX_VALUE,//读锁的maxLeases
            new SortingLockInternalsDriver() {
                @Override
                public PredicateResults getsTheLock(CuratorFramework client, List<String> children, String sequenceNodeName, int maxLeases) throws Exception {
                    return readLockPredicate(children, sequenceNodeName);
                }
            }
        );
    }
    
    private static class InternalInterProcessMutex extends InterProcessMutex {
        private final String lockName;
        private final byte[] lockData;
        InternalInterProcessMutex(CuratorFramework client, String path, String lockName, byte[] lockData, int maxLeases, LockInternalsDriver driver) {
            super(client, path, lockName, maxLeases, driver);
            this.lockName = lockName;
            this.lockData = lockData;
        }
         ...
    }
    
    public InterProcessMutex readLock() {
        return readMutex;
    }
    
    public InterProcessMutex writeLock() {
        return writeMutex;
    }
    ...
}

(3)InterProcessMutex获取锁的源码

public class InterProcessMutex implements InterProcessLock, Revocable<InterProcessMutex> {
    private final LockInternals internals;
    private final String basePath;
    private static final String LOCK_NAME = "lock-";
    //一个线程对应一个LockData数据对象
    private final ConcurrentMap<Thread, LockData> threadData = Maps.newConcurrentMap();
    ...
    //初始化InterProcessMutex
    InterProcessMutex(CuratorFramework client, String path, String lockName, int maxLeases, LockInternalsDriver driver) {
        //设置锁的路径
        basePath = PathUtils.validatePath(path);
        //初始化LockInternals
        internals = new LockInternals(client, driver, path, lockName, maxLeases);
    }
    
    @Override
    public void acquire() throws Exception {
        //获取分布式锁,会一直阻塞等待直到获取成功
        //相同的线程可以重入锁,每一次调用acquire()方法都要匹配一个release()方法的调用
        if (!internalLock(-1, null)) {
            throw new IOException("Lost connection while trying to acquire lock: " + basePath);
        }
    }
    
    private boolean internalLock(long time, TimeUnit unit) throws Exception {
        //获取当前线程
        Thread currentThread = Thread.currentThread();
        //获取当前线程对应的LockData数据
        LockData lockData = threadData.get(currentThread);
        if (lockData != null) {
            //可重入计算
            lockData.lockCount.incrementAndGet();
            return true;
        }
        //调用LockInternals.attemptLock()方法尝试获取锁,默认情况下,传入的time=-1,表示等待获取锁的时间
        String lockPath = internals.attemptLock(time, unit, getLockNodeBytes());
        if (lockPath != null) {
            //获取锁成功,将当前线程 + 其创建的临时顺序节点路径,封装成一个LockData对象
            LockData newLockData = new LockData(currentThread, lockPath);
            //然后把该LockData对象存放到InterProcessMutex.threadData这个Map中
            threadData.put(currentThread, newLockData);
            return true;
        }
        return false;
    }
    
    //LockData是InterProcessMutex的一个静态内部类
    private static class LockData {
        final Thread owningThread;
        final String lockPath;
        final AtomicInteger lockCount = new AtomicInteger(1);//用于锁的重入次数计数
        private LockData(Thread owningThread, String lockPath) {
            this.owningThread = owningThread;
            this.lockPath = lockPath;
        }
    }
    
    protected byte[] getLockNodeBytes() {
        return null;
    }
    ...
}
public class LockInternals {
    private final LockInternalsDriver driver;
    LockInternals(CuratorFramework client, LockInternalsDriver driver, String path, String lockName, int maxLeases) {
        this.driver = driver;
        this.path = ZKPaths.makePath(path, lockName);//生成要创建的临时节点路径名称
        ...
    }
    ...
    String attemptLock(long time, TimeUnit unit, byte[] lockNodeBytes) throws Exception {
        //获取当前时间
        final long startMillis = System.currentTimeMillis();
        //默认情况下millisToWait=null
        final Long millisToWait = (unit != null) ? unit.toMillis(time) : null;
        //默认情况下localLockNodeBytes也是null
        final byte[] localLockNodeBytes = (revocable.get() != null) ? new byte[0] : lockNodeBytes;
        int retryCount = 0;
     
        String ourPath = null;
        boolean hasTheLock = false;//是否已经获取到锁
        boolean isDone = false;//是否正在获取锁
        while (!isDone) {
            isDone = true;
            //1.这里是关键性的加锁代码,会去级联创建一个临时顺序节点
            ourPath = driver.createsTheLock(client, path, localLockNodeBytes);
            //2.检查是否获取到了锁
            hasTheLock = internalLockLoop(startMillis, millisToWait, ourPath);
        }
        if (hasTheLock) {
            return ourPath;
        }
        return null;
    }
    
    private final Watcher watcher = new Watcher() {
        @Override
        public void process(WatchedEvent event) {
            //唤醒LockInternals中被wait()阻塞的线程
            client.postSafeNotify(LockInternals.this);
        }
    };
    
    //检查是否获取到了锁
    private boolean internalLockLoop(long startMillis, Long millisToWait, String ourPath) throws Exception {
        boolean haveTheLock = false;
        boolean doDelete = false;
        ...
        while ((client.getState() == CuratorFrameworkState.STARTED) && !haveTheLock) {
            //3.获取排好序的各个客户端线程尝试获取分布式锁时创建的临时顺序节点名称列表
            List<String> children = getSortedChildren();
            //4.获取当前客户端线程尝试获取分布式锁时创建的临时顺序节点的名称
            String sequenceNodeName = ourPath.substring(basePath.length() + 1); // +1 to include the slash
            //5.获取当前线程创建的节点在节点列表中的位置 + 是否可以获取锁 + 前一个节点的路径名称
            PredicateResults predicateResults = driver.getsTheLock(client, children, sequenceNodeName, maxLeases);
            if (predicateResults.getsTheLock()) {//获取锁成功
                //返回true
                haveTheLock = true;
            } else {//获取锁失败
                //获取前一个节点路径名称
                String previousSequencePath = basePath + "/" + predicateResults.getPathToWatch();
                synchronized(this) {
                    //use getData() instead of exists() to avoid leaving unneeded watchers which is a type of resource leak
                    //通过getData()获取前一个节点路径在zk的信息,并添加watch监听
                    client.getData().usingWatcher(watcher).forPath(previousSequencePath);
                    //默认情况下,millisToWait = null
                    if (millisToWait != null) {
                        millisToWait -= (System.currentTimeMillis() - startMillis);
                        startMillis = System.currentTimeMillis();
                        if (millisToWait <= 0) {
                            doDelete = true;//timed out - delete our node
                            break;
                        }
                        wait(millisToWait);//阻塞
                    } else {
                        wait();//阻塞
                    }
                }
            }
        }
        ...
        return haveTheLock;
    }
    
    List<String> getSortedChildren() throws Exception {
        //获取排好序的各个客户端线程尝试获取分布式锁时创建的临时顺序节点名称列表
        return getSortedChildren(client, basePath, lockName, driver);
    }
    
    public static List<String> getSortedChildren(CuratorFramework client, String basePath, final String lockName, final LockInternalsSorter sorter) throws Exception {
        //获取各个客户端线程尝试获取分布式锁时创建的临时顺序节点名称列表
        List<String> children = client.getChildren().forPath(basePath);
        //对节点名称进行排序
        List<String> sortedList = Lists.newArrayList(children);
        Collections.sort(
            sortedList,
            new Comparator<String>() {
                @Override
                public int compare(String lhs, String rhs) {
                    return sorter.fixForSorting(lhs, lockName).compareTo(sorter.fixForSorting(rhs, lockName));
                }
            }
        );
        return sortedList;
    }
    ...
}
public class StandardLockInternalsDriver implements LockInternalsDriver {
    ...
    //级联创建一个临时顺序节点
    @Override
    public String createsTheLock(CuratorFramework client, String path, byte[] lockNodeBytes) throws Exception {
        String ourPath;
        //默认情况下传入的lockNodeBytes=null
        if (lockNodeBytes != null) {
            ourPath = client.create().creatingParentContainersIfNeeded().withProtection().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(path, lockNodeBytes);
        } else {
            //创建临时顺序节点
            ourPath = client.create().creatingParentContainersIfNeeded().withProtection().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(path);
        }
        return ourPath;
    }
    
    //获取当前线程创建的节点在节点列表中的位置以及是否可以获取锁
    @Override
    public PredicateResults getsTheLock(CuratorFramework client, List<String> children, String sequenceNodeName, int maxLeases) throws Exception {
        //根据节点名称获取当前线程创建的临时顺序节点在节点列表中的位置
        int ourIndex = children.indexOf(sequenceNodeName);
        validateOurIndex(sequenceNodeName, ourIndex);
        //maxLeases代表的是同时允许多少个客户端可以获取到锁
        //getsTheLock为true表示可以获取锁,getsTheLock为false表示获取锁失败
        boolean getsTheLock = ourIndex < maxLeases;
        //获取当前节点需要watch的前一个节点路径
        String pathToWatch = getsTheLock ? null : children.get(ourIndex - maxLeases);
        return new PredicateResults(pathToWatch, getsTheLock);
    }
    ...
}

(4)先获取读锁 + 后获取读锁的情形分析

当线程创建完临时顺序节点,并获取到排好序的节点列表children后,执行LockInternalsDriver的getsTheLock()方法获取能否成功加锁的信息时,会执行到InterProcessReadWriteLock的readLockPredicate()方法。

 

由于此时firstWriteIndex = Integer.MAX_VALUE,所以无论多少线程尝试获取读锁,都能满足ourIndex < firstWriteIndex,也就是getsTheLock的值会为true,即表示可以获取读锁。

 

所以读读不互斥。

public class InterProcessReadWriteLock {
    ...
    //sequenceNodeName是当前线程创建的临时顺序节点的路径名称
    private PredicateResults readLockPredicate(List<String> children, String sequenceNodeName) throws Exception {
        if (writeMutex.isOwnedByCurrentThread()) {
            return new PredicateResults(null, true);
        }
        int index = 0;
        int firstWriteIndex = Integer.MAX_VALUE;
        int ourIndex = -1;
        for (String node : children) {
            if (node.contains(WRITE_LOCK_NAME)) {
                firstWriteIndex = Math.min(index, firstWriteIndex);
            } else if (node.startsWith(sequenceNodeName)) {
                //找出当前线程创建的临时顺序节点在节点列表中的位置,用ourIndex表示
                ourIndex = index;
                break;
            }
            ++index;
        }
        StandardLockInternalsDriver.validateOurIndex(sequenceNodeName, ourIndex);
        boolean getsTheLock = (ourIndex < firstWriteIndex);
        String pathToWatch = getsTheLock ? null : children.get(firstWriteIndex);
        return new PredicateResults(pathToWatch, getsTheLock);
    }
    ...
}

(5)先获取读锁 + 后获取写锁的情形分析

一.假设客户端线程1首先成功获取了读锁

那么在/locks目录下,此时已经有了如下这个读锁的临时顺序节点。

/locks/43f3-4c2f-ba98-07a641d351f2-__READ__0000000004

二.然后另一个客户端线程2过来尝试获取写锁

于是该线程2会也会先在/locks目录下创建出如下写锁的临时顺序节点:

/locks/9361-4fb7-8420-a8d4911d2c99-__WRIT__0000000005

接着该线程会获取/locks目录的当前子节点列表并进行排序,结果如下:

[43f3-4c2f-ba98-07a641d351f2-__READ__0000000004,
9361-4fb7-8420-a8d4911d2c99-__WRIT__0000000005]

然后会执行StandardLockInternalsDriver的getsTheLock()方法。由于初始化写锁时,设置了其maxLeases是1,而在StandardLockInternalsDriver的getsTheLock()方法中,判断线程能成功获取写锁的依据是:ourIndex < maxLeases。即如果要成功获取写锁,那么线程创建的节点在子节点列表里必须排第一。

 

而此时,由于之前已有线程获取过一个读锁,而后来又有其他线程往里面创建一个写锁的临时顺序节点。所以写锁的临时顺序节点在子节点列表children里排第二,ourIndex是1。所以index = 1 < maxLeases = 1,条件不成立。

 

因此,此时客户端线程2获取写锁失败。于是该线程便会给前一个节点添加一个监听器,并调用wait()方法把自己挂起。如果前面一个节点被删除释放了锁,那么该线程就会被唤醒,从而再次尝试判断自己创建的节点是否在当前子节点列表中排第一。如果是,那么就表示获取写锁成功。

public class StandardLockInternalsDriver implements LockInternalsDriver {
    ...
    //获取当前线程创建的节点在节点列表中的位置以及是否可以获取锁
    @Override
    public PredicateResults getsTheLock(CuratorFramework client, List<String> children, String sequenceNodeName, int maxLeases) throws Exception {
        //根据节点名称获取当前线程创建的临时顺序节点在节点列表中的位置
        int ourIndex = children.indexOf(sequenceNodeName);
        validateOurIndex(sequenceNodeName, ourIndex);
        //maxLeases代表的是同时允许多少个客户端可以获取到锁
        //getsTheLock为true表示可以获取锁,getsTheLock为false表示获取锁失败
        boolean getsTheLock = ourIndex < maxLeases;
        //获取当前节点需要watch的前一个节点路径
        String pathToWatch = getsTheLock ? null : children.get(ourIndex - maxLeases);
        return new PredicateResults(pathToWatch, getsTheLock);
    }
    ...
}

(6)先获取写锁 + 后获取读锁的情形分析

一.假设客户端线程1先获取了写锁

那么在/locks目录下,此时已经有了如下这个写锁的临时顺序节点。

/locks/4383-466e-9b86-fda522ea061a-__WRIT__0000000006

二.然后另一个客户端线程2过来尝试获取读锁

于是该线程2会也会先在/locks目录下创建出如下读锁的临时顺序节点:

/locks/5ba2-488f-93a4-f85fafd5cc32-__READ__0000000007

接着该线程会获取/locks目录的当前子节点列表并进行排序,结果如下:

[4383-466e-9b86-fda522ea061a-__WRIT__0000000006,
5ba2-488f-93a4-f85fafd5cc32-__READ__0000000007]

然后会执行LockInternalsDriver的getsTheLock()方法获取能否加锁的信息,也就是会执行InterProcessReadWriteLock的readLockPredicate()方法。

public class InterProcessReadWriteLock {
    ...
    //sequenceNodeName是当前线程创建的临时顺序节点的路径名称
    private PredicateResults readLockPredicate(List<String> children, String sequenceNodeName) throws Exception {
        //如果是同一个客户端线程,先加写锁,再加读锁,是可以成功的,不会互斥
        if (writeMutex.isOwnedByCurrentThread()) {
            return new PredicateResults(null, true);
        }
        int index = 0;
        int firstWriteIndex = Integer.MAX_VALUE;
        int ourIndex = -1;
        for (String node : children) {
            if (node.contains(WRITE_LOCK_NAME)) {
                firstWriteIndex = Math.min(index, firstWriteIndex);
            } else if (node.startsWith(sequenceNodeName)) {
                //找出当前线程创建的临时顺序节点在节点列表中的位置,用ourIndex表示
                ourIndex = index;
                break;
            }
            ++index;
        }
        StandardLockInternalsDriver.validateOurIndex(sequenceNodeName, ourIndex);
        boolean getsTheLock = (ourIndex < firstWriteIndex);
        String pathToWatch = getsTheLock ? null : children.get(firstWriteIndex);
        return new PredicateResults(pathToWatch, getsTheLock);
    }
    ...
}

在InterProcessReadWriteLock的readLockPredicate()方法中,如果是同一个客户端线程,先获取写锁,再获取读锁,是不会互斥的。如果是不同的客户端线程,线程1先获取写锁,线程2再获取读锁,则互斥。因为线程2执行readLockPredicate()方法在遍历子节点列表(children)时,如果在子节点列表(children)中发现了一个写锁,会设置firstWriteIndex=0。而此时线程2创建的临时顺序节点的ourIndex=1,所以不满足ourIndex(1) < firstWriteIndex(0),于是线程2获取读锁失败。

 

总结,获取读锁时,在当前线程创建的节点前面:如果还有写锁对应的节点,那么firstWriteIndex就会被重置为具体位置。如果没有写锁对应的节点,那么firstWriteIndex就是MAX_VALUE。而只要firstWriteIndex为MAX_VALUE,那么就可以不断允许获取读锁。

 

(7)先获取写锁 + 再获取写锁的情形分析

如果客户端线程1先获取了写锁,然后后面客户端线程2来获取这个写锁。此时线程2会发现自己创建的节点排在节点列表中的第二,不是第一。于是获取写锁失败,进行阻塞挂起。等线程1释放了写锁后,才会唤醒线程2继续尝试获取写锁。

 

4.Curator的MultiLock源码

(1)Curator的MultiLock的使用

(2)Curator的MultiLock的源码

 

(1)Curator的MultiLock的使用

public class Demo {
    public static void main(String[] args) throws Exception {
        RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
        final CuratorFramework client = CuratorFrameworkFactory.newClient(
            "127.0.0.1:2181",//zk的地址
            5000,//客户端和zk的心跳超时时间,超过该时间没心跳,Session就会被断开
            3000,//连接zk时的超时时间
            retryPolicy
        );
        client.start();
        System.out.println("已经启动Curator客户端,完成zk的连接");
        //MultiLock
        InterProcessLock lock1 = new InterProcessMutex(client, "/locks/lock_01");
        InterProcessLock lock2 = new InterProcessMutex(client, "/locks/lock_02");
        InterProcessLock lock3 = new InterProcessMutex(client, "/locks/lock_03");
        List<InterProcessLock> locks = new ArrayList<InterProcessLock>();
        locks.add(lock1);
        locks.add(lock2);
        locks.add(lock3);
        InterProcessMultiLock multiLock = new InterProcessMultiLock(locks);
    }
}

(2)Curator的MultiLock的源码

MultiLock原理:依次遍历获取每个锁,阻塞直到获取每个锁为止,然后返回true。如果过程中有报错,依次释放已经获取到的锁,然后返回false。

public class InterProcessMultiLock implements InterProcessLock {
    private final List<InterProcessLock> locks;
    public InterProcessMultiLock(List<InterProcessLock> locks) {
        this.locks = ImmutableList.copyOf(locks);
    }
    
    //获取锁
    @Override
    public void acquire() throws Exception {
        acquire(-1, null);
    }
    
    @Override
    public boolean acquire(long time, TimeUnit unit) throws Exception {
        Exception exception = null;
        List<InterProcessLock> acquired = Lists.newArrayList();
        boolean success = true;
        //依次遍历获取每个锁,阻塞直到获取每个锁为止
        for (InterProcessLock lock : locks) {
            try {
                if (unit == null) {
                    lock.acquire();
                    acquired.add(lock);
                } else  {
                    if (lock.acquire(time, unit)) {
                        acquired.add(lock);
                    } else {
                        success = false;
                        break;
                    }
                }
            } catch (Exception e) {
                ThreadUtils.checkInterrupted(e);
                success = false;
                exception = e;
            }
        }
        if (!success) {
            for (InterProcessLock lock : reverse(acquired)) {
                try {
                    lock.release();
                } catch (Exception e) {
                    ThreadUtils.checkInterrupted(e);
                    // ignore
                }
            }
        }
        if (exception != null) {
            throw exception;
        }
        return success;
    }
    
    @Override
    public synchronized void release() throws Exception {
        Exception baseException = null;
        for (InterProcessLock lock : reverse(locks)) {
            try {
                lock.release();
            } catch (Exception e) {
                ThreadUtils.checkInterrupted(e);
                if (baseException == null) {
                    baseException = e;
                } else {
                    baseException = new Exception(baseException);
                }
            }
        }
        if (baseException != null) {
            throw baseException;
        }
    }
    ...
}

 

5.Curator的Semaphore源码

(1)基于InterProcessSemaphoreV2使用Semaphore

(2)InterProcessSemaphoreV2的初始化

(3)InterProcessSemaphoreV2.acquire()方法获取Semaphore的Lease

(4)InterProcessSemaphoreV2.returnLease()方法释放Semaphore的Lease

 

Semaphore信号量,就是指定同时可以有多个线程获取到锁。

 

(1)基于InterProcessSemaphoreV2使用Semaphore

public class Demo {
    public static void main(String[] args) throws Exception {
        RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
        final CuratorFramework client = CuratorFrameworkFactory.newClient(
            "127.0.0.1:2181",//zk的地址
            5000,//客户端和zk的心跳超时时间,超过该时间没心跳,Session就会被断开
            3000,//连接zk时的超时时间
            retryPolicy
        );
        client.start();
        System.out.println("已经启动Curator客户端,完成zk的连接");
        //获取Semaphore
        InterProcessSemaphoreV2 semaphore = new InterProcessSemaphoreV2(client, "/semaphore", 3);
        Lease lease = semaphore.acquire();//获取Semaphore的一个锁
        Thread.sleep(3000);
        semaphore.returnLease(lease);//向Semaphore返还一个锁
    }
}

(2)InterProcessSemaphoreV2的初始化

public class InterProcessSemaphoreV2 {
    private final WatcherRemoveCuratorFramework client;
    private final InterProcessMutex lock;
    private final String leasesPath;
    private volatile int maxLeases;
    ...
    //maxLeases表示该实例可以允许获取的lease数量
    public InterProcessSemaphoreV2(CuratorFramework client, String path, int maxLeases) {
        this(client, path, maxLeases, null);
    }
    
    //初始化InterProcessSemaphoreV2时,传入的参数path = "/semaphore",参数maxLeases = 3
    private InterProcessSemaphoreV2(CuratorFramework client, String path, int maxLeases, SharedCountReader count) {
        this.client = client.newWatcherRemoveCuratorFramework();
        path = PathUtils.validatePath(path);
        //锁的path是ZKPaths.makePath(path, LOCK_PARENT) => '/semaphore/locks'
        //初始化一个InterProcessMutex分布式锁
        this.lock = new InterProcessMutex(client, ZKPaths.makePath(path, LOCK_PARENT));
        this.maxLeases = (count != null) ? count.getCount() : maxLeases;
        //lease的path是:'/semaphore/leases'
        this.leasesPath = ZKPaths.makePath(path, LEASE_PARENT);
        ...
    }
    ...
}

(3)InterProcessSemaphoreV2.acquire()方法获取Semaphore的Lease

客户端线程尝试获取Semaphore的一个Lease。

 

步骤一:首先会获取初始化时创建的锁InterProcessMutex

锁的路径是:/semaphore/locks。当多个客户端线程同时执行acquire()获取Lease时只会有一个线程成功,而其他线程会基于锁路径下的临时顺序节点来排队获取锁。

 

步骤二:获取锁成功后才会尝试获取Semaphore的Lease

Lease的路径是:/semaphore/leases。此时会先到'/semaphore/leases'目录下创建一个临时顺序节点,然后会调用InterProcessSemaphoreV2的makeLease()方法创建一个Lease。这个Lease对象就是客户端线程成功获取Semaphore的一个Lease。

 

创建完Lease对象后,接着会进入一个for循环,会先获取/semaphore/leases目录下的所有临时顺序节点,并添加监听。然后判断/semaphore/leases目录下节点的数量是否大于maxLeases。如果临时顺序节点的数量小于maxLeases,那么说明当前客户端线程成功获取Semaphore的Lease,于是退出循环。如果临时顺序节点的数量大于maxLeases,那么当前客户端线程就要调用wait()进行阻塞等待。

public class InterProcessSemaphoreV2 {
    private final InterProcessMutex lock;
    private final Watcher watcher = new Watcher() {
        @Override
        public void process(WatchedEvent event) {
            //唤醒在InterProcessSemaphoreV2对象中执行wait()而被阻塞的线程
            client.postSafeNotify(InterProcessSemaphoreV2.this);
        }
    };
    ...
    public Lease acquire() throws Exception {
        Collection<Lease> leases = acquire(1, 0, null);
        return leases.iterator().next();
    }
    
    public Collection<Lease> acquire(int qty, long time, TimeUnit unit) throws Exception {
        long startMs = System.currentTimeMillis();
        boolean hasWait = (unit != null);
        long waitMs = hasWait ? TimeUnit.MILLISECONDS.convert(time, unit) : 0;
        Preconditions.checkArgument(qty > 0, "qty cannot be 0");
        ImmutableList.Builder<Lease> builder = ImmutableList.builder();
        boolean success = false;
        try {
            while (qty-- > 0) {
                int retryCount = 0;
                long startMillis = System.currentTimeMillis();
                boolean isDone = false;
                while (!isDone) {
                    switch (internalAcquire1Lease(builder, startMs, hasWait, waitMs)) {
                        case CONTINUE: {
                            isDone = true;
                            break;
                        }
                        case RETURN_NULL: {
                            return null;
                        }
                        case RETRY_DUE_TO_MISSING_NODE: {
                            if (!client.getZookeeperClient().getRetryPolicy().allowRetry(retryCount++, System.currentTimeMillis() - startMillis, RetryLoop.getDefaultRetrySleeper())) {
                                throw new KeeperException.NoNodeException("Sequential path not found - possible session loss");
                            }
                            //try again
                            break;
                        }
                    }
                }
            }
            success = true;
        } finally {
            if (!success) {
                returnAll(builder.build());
            }
        }
        return builder.build();
    }
    
    private InternalAcquireResult internalAcquire1Lease(ImmutableList.Builder<Lease> builder, long startMs, boolean hasWait, long waitMs) throws Exception {
        if (client.getState() != CuratorFrameworkState.STARTED) {
            return InternalAcquireResult.RETURN_NULL;
        }
        if (hasWait) {
            long thisWaitMs = getThisWaitMs(startMs, waitMs);
            if (!lock.acquire(thisWaitMs, TimeUnit.MILLISECONDS)) {
                return InternalAcquireResult.RETURN_NULL;
            }
        } else {
            //1.首先获取一个分布式锁
            lock.acquire();
        }
        Lease lease = null;
        boolean success = false;
        try {
            //2.尝试获取Semaphore的Lease:创建一个临时顺序节点
            PathAndBytesable<String> createBuilder = client.create().creatingParentContainersIfNeeded().withProtection().withMode(CreateMode.EPHEMERAL_SEQUENTIAL);
            String path = (nodeData != null) ? createBuilder.forPath(ZKPaths.makePath(leasesPath, LEASE_BASE_NAME), nodeData) : createBuilder.forPath(ZKPaths.makePath(leasesPath, LEASE_BASE_NAME));
            String nodeName = ZKPaths.getNodeFromPath(path);
            lease = makeLease(path);
            ...
            try {
                synchronized(this) {
                    for(;;) {
                        List<String> children;
                        //3.获取./lease目录下的所有临时顺序节点,并添加watcher监听
                        children = client.getChildren().usingWatcher(watcher).forPath(leasesPath);
                        ...
                        //4.判断临时顺序节点的数量是否大于maxLeases
                        //maxLeases表示最多允许多少个客户端线程获取Semaphore的Lease
                        if (children.size() <= maxLeases) {
                            //如果临时顺序节点的数量小于maxLeases
                            //那么说明当前客户端线程成功获取Semaphore的Lease,于是退出循环
                            break;
                        }
                        //如果临时顺序节点的数量大于maxLeases
                        //那么当前客户端线程就要调用wait()进行阻塞等待
                        if (hasWait) {
                            long thisWaitMs = getThisWaitMs(startMs, waitMs);
                            if (thisWaitMs <= 0) {
                                return InternalAcquireResult.RETURN_NULL;
                            }
                            ...
                            wait(thisWaitMs);
                        } else {
                            ...
                            wait();
                        }
                    }
                    success = true;
                }
            } finally {
                if (!success) {
                    returnLease(lease);
                }
                client.removeWatchers();
            }
        } finally {
            //释放掉之前获取的锁
            lock.release();
        }
        builder.add(Preconditions.checkNotNull(lease));
        return InternalAcquireResult.CONTINUE;
    }
    ...
}

(4)InterProcessSemaphoreV2.returnLease()方法释放Semaphore的Lease

执行InterProcessSemaphoreV2的returnLease()方法时,最终会执行makeLease()生成的Lease对象的close()方法,而close()方法会删除在/semaphore/leases目录下创建的临时顺序节点。

 

当/semaphore/leases目录下的节点发生变化时,那些对该目录进行Watcher监听的客户端就会收到通知,于是就会执行Watcher里的process()方法,唤醒执行wait()时被阻塞的线程,从而让这些没有成功获取Semaphore的Lease的线程继续尝试获取Lease。

public class InterProcessSemaphoreV2 {
    ...
    public void returnLease(Lease lease) {
        //执行Lease的close()方法
        CloseableUtils.closeQuietly(lease);
    }
    
    private Lease makeLease(final String path) {
        return new Lease() {
            @Override
            public void close() throws IOException {
                try {
                    client.delete().guaranteed().forPath(path);
                } catch (KeeperException.NoNodeException e) {
                    log.warn("Lease already released", e);
                } catch (Exception e) {
                    ThreadUtils.checkInterrupted(e);
                    throw new IOException(e);
                }
            }
            
            @Override
            public byte[] getData() throws Exception {
                return client.getData().forPath(path);
            }
            
            @Override
            public String getNodeName() {
                return ZKPaths.getNodeFromPath(path);
            }
        };
    }
    ...
}

 

相关文章
|
4月前
|
NoSQL Java 中间件
【📕分布式锁通关指南 02】基于Redis实现的分布式锁
本文介绍了从单机锁到分布式锁的演变,重点探讨了使用Redis实现分布式锁的方法。分布式锁用于控制分布式系统中多个实例对共享资源的同步访问,需满足互斥性、可重入性、锁超时防死锁和锁释放正确防误删等特性。文章通过具体示例展示了如何利用Redis的`setnx`命令实现加锁,并分析了简化版分布式锁存在的问题,如锁超时和误删。为了解决这些问题,文中提出了设置锁过期时间和在解锁前验证持有锁的线程身份的优化方案。最后指出,尽管当前设计已解决部分问题,但仍存在进一步优化的空间,将在后续章节继续探讨。
726 131
【📕分布式锁通关指南 02】基于Redis实现的分布式锁
|
4月前
|
NoSQL Java 测试技术
【📕分布式锁通关指南 05】通过redisson实现分布式锁
本文介绍了如何使用Redisson框架在SpringBoot中实现分布式锁,简化了之前通过Redis手动实现分布式锁的复杂性和不完美之处。Redisson作为Redis的高性能客户端,封装了多种锁的实现,使得开发者只需关注业务逻辑。文中详细展示了引入依赖、配置Redisson客户端、实现扣减库存功能的代码示例,并通过JMeter压测验证了其正确性。后续篇章将深入解析Redisson锁实现的源码。
96 0
【📕分布式锁通关指南 05】通过redisson实现分布式锁
|
4月前
|
运维 NoSQL 算法
【📕分布式锁通关指南 04】redis分布式锁的细节问题以及RedLock算法原理
本文深入探讨了基于Redis实现分布式锁时遇到的细节问题及解决方案。首先,针对锁续期问题,提出了通过独立服务、获取锁进程自己续期和异步线程三种方式,并详细介绍了如何利用Lua脚本和守护线程实现自动续期。接着,解决了锁阻塞问题,引入了带超时时间的`tryLock`机制,确保在高并发场景下不会无限等待锁。最后,作为知识扩展,讲解了RedLock算法原理及其在实际业务中的局限性。文章强调,在并发量不高的场景中手写分布式锁可行,但推荐使用更成熟的Redisson框架来实现分布式锁,以保证系统的稳定性和可靠性。
169 0
【📕分布式锁通关指南 04】redis分布式锁的细节问题以及RedLock算法原理
|
5月前
|
NoSQL 关系型数据库 MySQL
分布式系统学习9:分布式锁
本文介绍了分布式系统中分布式锁的概念、实现方式及其应用场景。分布式锁用于在多个独立的JVM进程间确保资源的互斥访问,具备互斥、高可用、可重入和超时机制等特点。文章详细讲解了三种常见的分布式锁实现方式:基于Redis、Zookeeper和关系型数据库(如MySQL)。其中,Redis适合高性能场景,推荐使用Redisson库;Zookeeper适用于对一致性要求较高的场景,建议基于Curator框架实现;而基于数据库的方式性能较低,实际开发中较少使用。此外,还探讨了乐观锁和悲观锁的区别及适用场景,并介绍了如何通过Lua脚本和Redis的`SET`命令实现原子操作,以及Redisson的自动续期机
624 7
|
3月前
|
数据采集 存储 数据可视化
分布式爬虫框架Scrapy-Redis实战指南
本文介绍如何使用Scrapy-Redis构建分布式爬虫系统,采集携程平台上热门城市的酒店价格与评价信息。通过代理IP、Cookie和User-Agent设置规避反爬策略,实现高效数据抓取。结合价格动态趋势分析,助力酒店业优化市场策略、提升服务质量。技术架构涵盖Scrapy-Redis核心调度、代理中间件及数据解析存储,提供完整的技术路线图与代码示例。
337 0
分布式爬虫框架Scrapy-Redis实战指南
|
1月前
|
数据采集 存储 NoSQL
基于Scrapy-Redis的分布式景点数据爬取与热力图生成
基于Scrapy-Redis的分布式景点数据爬取与热力图生成
190 67
|
24天前
|
NoSQL 算法 安全
redis分布式锁在高并发场景下的方案设计与性能提升
本文探讨了Redis分布式锁在主从架构下失效的问题及其解决方案。首先通过CAP理论分析,Redis遵循AP原则,导致锁可能失效。针对此问题,提出两种解决方案:Zookeeper分布式锁(追求CP一致性)和Redlock算法(基于多个Redis实例提升可靠性)。文章还讨论了可能遇到的“坑”,如加从节点引发超卖问题、建议Redis节点数为奇数以及持久化策略对锁的影响。最后,从性能优化角度出发,介绍了减少锁粒度和分段锁的策略,并结合实际场景(如下单重复提交、支付与取消订单冲突)展示了分布式锁的应用方法。
93 3
|
1月前
|
缓存 监控 NoSQL
Redis设计与实现——分布式Redis
Redis Sentinel 和 Cluster 是 Redis 高可用与分布式架构的核心组件。Sentinel 提供主从故障检测与自动切换,通过主观/客观下线判断及 Raft 算法选举领导者完成故障转移,但存在数据一致性和复杂度问题。Cluster 支持数据分片和水平扩展,基于哈希槽分配数据,具备自动故障转移和节点发现机制,适合大规模高并发场景。复制机制包括全量同步和部分同步,通过复制积压缓冲区优化同步效率,但仍面临延迟和资源消耗挑战。两者各有优劣,需根据业务需求选择合适方案。
|
1月前
|
数据采集 存储 NoSQL
分布式爬虫去重:Python + Redis实现高效URL去重
分布式爬虫去重:Python + Redis实现高效URL去重
|
4月前
|
NoSQL Java Redis
Springboot使用Redis实现分布式锁
通过这些步骤和示例,您可以系统地了解如何在Spring Boot中使用Redis实现分布式锁,并在实际项目中应用。希望这些内容对您的学习和工作有所帮助。
291 83