专栏|分布式锁系列

简介: zookeeper

## Zookeeper


### Apache-Curator


![InterProcessMutex](InterProcessMutex.png)


如上借助于临时顺序节点,可以避免同时多个节点的并发竞争锁,缓解了服务端压力。这种实现方式所有加锁请求都进行排队加锁,是公平锁的具体实现。Apache-Curator中提供的常见锁有如下:


- **InterProcessMutex**:就是公平锁的实现。可重入、独占锁

- **InterProcessSemaphoreMutex**:不可重入、独占锁

- **InterProcessReadWriteLock**:读写锁

- **InterProcessSemaphoreV2**:共享信号量

- **InterProcessMultiLock**:多重共享锁 (将多个锁作为单个实体管理的容器)




### 使用案例


```java

import java.util.Arrays;

import java.util.Collection;

import java.util.HashSet;

import java.util.Set;

import java.util.concurrent.TimeUnit;


import org.apache.curator.RetryPolicy;

import org.apache.curator.framework.CuratorFramework;

import org.apache.curator.framework.CuratorFrameworkFactory;

import org.apache.curator.framework.recipes.locks.InterProcessLock;

import org.apache.curator.framework.recipes.locks.InterProcessMultiLock;

import org.apache.curator.framework.recipes.locks.InterProcessMutex;

import org.apache.curator.framework.recipes.locks.InterProcessReadWriteLock;

import org.apache.curator.framework.recipes.locks.InterProcessSemaphoreMutex;

import org.apache.curator.framework.recipes.locks.InterProcessSemaphoreV2;

import org.apache.curator.framework.recipes.locks.Lease;

import org.apache.curator.retry.ExponentialBackoffRetry;

import org.apache.curator.utils.CloseableUtils;

import org.junit.After;

import org.junit.Assert;

import org.junit.Before;

import org.junit.Test;


public class DistributedLockDemo {


// ZooKeeper 锁节点路径, 分布式锁的相关操作都是在这个节点上进行

private final String lockPath = "/distributed-lock";

// ZooKeeper 服务地址, 单机格式为:(127.0.0.1:2181),

// 集群格式为:(127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183)

private String connectString="127.0.0.1:2181";

// Curator 客户端重试策略

private RetryPolicy retry;

// Curator 客户端对象

private CuratorFramework client1;

// client2 用户模拟其他客户端

private CuratorFramework client2;


// 初始化资源

@Before

public void init() throws Exception {

 // 重试策略

 // 初始休眠时间为 1000ms, 最大重试次数为 3

 retry = new ExponentialBackoffRetry(1000, 3);

 // 创建一个客户端, 60000(ms)为 session 超时时间, 15000(ms)为链接超时时间

 client1 = CuratorFrameworkFactory.newClient(connectString, 60000, 15000, retry);

 client2 = CuratorFrameworkFactory.newClient(connectString, 60000, 15000, retry);

 // 创建会话

 client1.start();

 client2.start();

}


// 释放资源

@After

public void close() {

 CloseableUtils.closeQuietly(client1);

}


/**

 * InterProcessMutex:可重入、独占锁

 */

@Test

public void sharedReentrantLock() throws Exception {

 // 创建可重入锁

 InterProcessMutex lock1 = new InterProcessMutex(client1, lockPath);

 // lock2 用于模拟其他客户端

 InterProcessMutex lock2 = new InterProcessMutex(client2, lockPath);

 

 // lock1 获取锁

 lock1.acquire();

 try {

  // lock1 第2次获取锁

  lock1.acquire();

  try {

   // lock2 超时获取锁, 因为锁已经被 lock1 客户端占用, 所以lock2获取锁失败, 需要等 lock1 释放

   Assert.assertFalse(lock2.acquire(2, TimeUnit.SECONDS));

  } finally {

   lock1.release();

  }

 } finally {

  // 重入锁获取与释放需要一一对应, 如果获取 2 次, 释放 1 次, 那么该锁依然是被占用,

  // 如果将下面这行代码注释, 那么会发现下面的 lock2

  // 获取锁失败

  lock1.release();

 }

 

 // 在 lock1 释放后, lock2 能够获取锁

 Assert.assertTrue(lock2.acquire(2, TimeUnit.SECONDS));

 lock2.release();

}


/**

 * InterProcessSemaphoreMutex: 不可重入、独占锁

 */

@Test

public void sharedLock() throws Exception {

 InterProcessSemaphoreMutex lock1 = new InterProcessSemaphoreMutex(client1, lockPath);

 // lock2 用于模拟其他客户端

 InterProcessSemaphoreMutex lock2 = new InterProcessSemaphoreMutex(client2, lockPath);


 // 获取锁对象

 lock1.acquire();


 // 测试是否可以重入

 // 因为锁已经被获取, 所以返回 false

 Assert.assertFalse(lock1.acquire(2, TimeUnit.SECONDS));// lock1 返回是false

 Assert.assertFalse(lock2.acquire(2, TimeUnit.SECONDS));// lock2 返回是false


 // lock1 释放锁

 lock1.release();


 // lock2 尝试获取锁成功, 因为锁已经被释放

 Assert.assertTrue(lock2.acquire(2, TimeUnit.SECONDS));// 返回是true

 lock2.release();

 System.out.println("测试结束");

}


/**

 * InterProcessReadWriteLock:读写锁.

 * 特点:读写锁、可重入

 */

@Test

public void sharedReentrantReadWriteLock() throws Exception {

 // 创建读写锁对象, Curator 以公平锁的方式进行实现

 InterProcessReadWriteLock lock1 = new InterProcessReadWriteLock(client1, lockPath);

 // lock2 用于模拟其他客户端

 InterProcessReadWriteLock lock2 = new InterProcessReadWriteLock(client2, lockPath);

 

 // 使用 lock1 模拟读操作

 // 使用 lock2 模拟写操作

 // 获取读锁(使用 InterProcessMutex 实现, 所以是可以重入的)

 final InterProcessLock readLock = lock1.readLock();

 // 获取写锁(使用 InterProcessMutex 实现, 所以是可以重入的)

 final InterProcessLock writeLock = lock2.writeLock();


 /**

  * 读写锁测试对象

  */

 class ReadWriteLockTest {

  // 测试数据变更字段

  private Integer testData = 0;

  private Set<Thread> threadSet = new HashSet<>();


  // 写入数据

  private void write() throws Exception {

   writeLock.acquire();

   try {

    Thread.sleep(10);

    testData++;

    System.out.println("写入数据 \t" + testData);

   } finally {

    writeLock.release();

   }

  }


  // 读取数据

  private void read() throws Exception {

   readLock.acquire();

   try {

    Thread.sleep(10);

    System.out.println("读取数据 \t" + testData);

   } finally {

    readLock.release();

   }

  }


  // 等待线程结束, 防止 test 方法调用完成后, 当前线程直接退出, 导致控制台无法输出信息

  public void waitThread() throws InterruptedException {

   for (Thread thread : threadSet) {

    thread.join();

   }

  }


  // 创建线程方法

  private void createThread(final int type) {

   Thread thread = new Thread(new Runnable() {

    @Override

    public void run() {

     try {

      if (type == 1) {

       write();

      } else {

       read();

      }

     } catch (Exception e) {

      e.printStackTrace();

     }

    }

   });

   threadSet.add(thread);

   thread.start();

  }


  // 测试方法

  public void test() {

   for (int i = 0; i < 5; i++) {

    createThread(1);

   }

   for (int i = 0; i < 5; i++) {

    createThread(2);

   }

  }

 }


 ReadWriteLockTest readWriteLockTest = new ReadWriteLockTest();

 readWriteLockTest.test();

 readWriteLockTest.waitThread();

}


/**

 * InterProcessSemaphoreV2 共享信号量

 */

@Test

public void semaphore() throws Exception {

 // 创建一个信号量, Curator 以公平锁的方式进行实现

 InterProcessSemaphoreV2 semaphore1 = new InterProcessSemaphoreV2(client1, lockPath, 6);

 // semaphore2 用于模拟其他客户端

 InterProcessSemaphoreV2 semaphore2 = new InterProcessSemaphoreV2(client2, lockPath, 6);


 // 获取一个许可

 Lease lease1 = semaphore1.acquire();

 Assert.assertNotNull(lease1);

 // semaphore.getParticipantNodes() 会返回当前参与信号量的节点列表, 俩个客户端所获取的信息相同

 Assert.assertEquals(semaphore1.getParticipantNodes(), semaphore2.getParticipantNodes());


 // 超时获取一个许可

 Lease lease2 = semaphore2.acquire(2, TimeUnit.SECONDS);

 Assert.assertNotNull(lease2);

 Assert.assertEquals(semaphore1.getParticipantNodes(), semaphore2.getParticipantNodes());


 // 获取多个许可, 参数为许可数量

 Collection<Lease> leases = semaphore1.acquire(2);

 Assert.assertTrue(leases.size() == 2);

 Assert.assertEquals(semaphore1.getParticipantNodes(), semaphore2.getParticipantNodes());


 // 超时获取多个许可, 第一个参数为许可数量

 Collection<Lease> leases2 = semaphore2.acquire(2, 2, TimeUnit.SECONDS);

 Assert.assertTrue(leases2.size() == 2);

 Assert.assertEquals(semaphore1.getParticipantNodes(), semaphore2.getParticipantNodes());


 // 目前 semaphore 已经获取 3 个许可, semaphore2 也获取 3 个许可, 加起来为 6 个, 所以他们无法再进行许可获取

 Assert.assertNull(semaphore1.acquire(2, TimeUnit.SECONDS));

 Assert.assertNull(semaphore2.acquire(2, TimeUnit.SECONDS));


 // 释放一个许可

 semaphore1.returnLease(lease1);

 semaphore2.returnLease(lease2);

 // 释放多个许可

 semaphore1.returnAll(leases);

 semaphore2.returnAll(leases2);

}


/**

 * InterProcessMutex :可重入、独占锁

 * InterProcessSemaphoreMutex : 不可重入、独占锁

 * InterProcessMultiLock: 多重共享锁(将多个锁作为单个实体管理的容器)

 */

@Test

public void multiLock() throws Exception {

 InterProcessMutex mutex = new InterProcessMutex(client1, lockPath);

 InterProcessSemaphoreMutex semaphoreMutex = new InterProcessSemaphoreMutex(client2, lockPath);

 //将上面的两种锁入到其中

 InterProcessMultiLock multiLock = new InterProcessMultiLock(Arrays.asList(mutex, semaphoreMutex));

 // 获取参数集合中的所有锁

 multiLock.acquire();

 // 因为存在一个不可重入锁, 所以整个 multiLock 不可重入

 Assert.assertFalse(multiLock.acquire(2, TimeUnit.SECONDS));

 // mutex 是可重入锁, 所以可以继续获取锁

 Assert.assertTrue(mutex.acquire(2, TimeUnit.SECONDS));

 // semaphoreMutex  是不可重入锁, 所以获取锁失败

 Assert.assertFalse(semaphoreMutex.acquire(2, TimeUnit.SECONDS));

 // 释放参数集合中的所有锁

 multiLock.release();

 // interProcessLock2 中的锁已经释放, 所以可以获取

 Assert.assertTrue(semaphoreMutex.acquire(2, TimeUnit.SECONDS));

}

}

```


相关文章
|
6月前
|
NoSQL Java Redis
如何通俗易懂的理解Redis分布式锁
在多线程并发的情况下,我们如何保证一个代码块在同一时间只能由一个线程访问呢?
62 2
|
NoSQL Redis
【面试题精讲】Redis如何实现分布式锁
【面试题精讲】Redis如何实现分布式锁
|
6月前
|
存储 NoSQL Java
分布式锁,Redission,其它实现问题讲解,以及面试题回答案例
分布式锁,Redission,其它实现问题讲解,以及面试题回答案例
214 1
|
6月前
|
NoSQL Java Redis
Redis入门到通关之分布式锁Rediision
Redis入门到通关之分布式锁Rediision
57 0
|
6月前
|
存储 NoSQL Java
阿里面试官问我Redis怎么实现分布式锁
分布式锁一般有三种实现方式:
72 0
|
6月前
|
存储 NoSQL Java
常见面试题知识点之:分布式锁
常见面试题知识点之:分布式锁
|
12月前
|
NoSQL 关系型数据库 MySQL
手把手实现分布式锁
手把手实现分布式锁
93 0
|
消息中间件 NoSQL JavaScript
图解 Redis 分布式锁,写得太好了!
图解 Redis 分布式锁,写得太好了!
|
NoSQL Java Redis
图解 Redis 分布式锁,写得太好了!(1)
图解 Redis 分布式锁,写得太好了!
110 0
图解 Redis 分布式锁,写得太好了!(1)
|
NoSQL Java Redis
图解 Redis 分布式锁,写得太好了!(2)
图解 Redis 分布式锁,写得太好了!
101 0
图解 Redis 分布式锁,写得太好了!(2)