分布式改造剧集2---DIY分布式锁

简介: 前言:​ 好了,终于又开始播放分布式改造剧集了。前面一集中(http://www.cnblogs.com/Kidezyq/p/8748961.html)我们DIY了一个Hessian转发实现,最后我们也留下了一个展望方向:可以实现一个管理界面管理节点,实现简单的服务治理的功能。

前言:

​ 好了,终于又开始播放分布式改造剧集了。前面一集中(http://www.cnblogs.com/Kidezyq/p/8748961.html)我们DIY了一个Hessian转发实现,最后我们也留下了一个展望方向:可以实现一个管理界面管理节点,实现简单的服务治理的功能。这一集我们接着继续DIY分布式锁。

第二集:分布式锁DIY

探索之路

​ 由于业务互斥的需要,当前项目中实现了一个内存锁。锁的大致模型是分为锁类型和锁键值,只有当锁类型和键值都相同的时候,整个业务才互斥。但是必须提供一个方法,来判断某种类型的锁是否存在。大致代码如下:

/**
 * 内存锁
 *
 */
public class MemoryLock {
    /**
     * 同步锁
     */
    private final Object lock = new Object();
    
    /**
     * 内存锁模型
     */
    private ConcurrentHashMap<String, ConcurrentHashMap<String, String>> lockMap = new ConcurrentHashMap<String, ConcurrentHashMap<String, String>>();

    /**
     * 尝试获取到锁
     * @param lockType 锁类型
     * @param key       锁键值
     * @return 如果当前获取到锁,则返回true。否则,返回false。
     */
    private boolean tryLock(String lockType, String key) {
        synchronized (this.lock) {
            ConcurrentHashMap<String, String> map = this.lockMap.get(lockType);
            if (map == null) {
                map = new ConcurrentHashMap<String, String>();
                this.lockMap.put(lockType, map);
            }
            return (map.putIfAbsent(key, key) == null);
        }
    }
    
    /**
     * 判断某种类型的锁是不是空的
     * @param lockType  锁类型
     * @return true,不存在某种类型的锁;false,存在某种类型的锁。
     */
    public boolean isLockTypeEmpty(String lockType) {
        if (null != this.lockMap.get(lockType)) {
            return this.lockMap.get(lockType).size() == 0;
        }
        return true;
    }

    /**
     * 获取锁
     * @param lockType  锁类型
     * @param key       锁键值
     * @param timeout   超时时间(毫秒)
     * @throws TimeoutException 如果超时之后还没有获得到锁,则抛出超时异常
     */
    public void lock(String lockType, String key, long timeout) throws TimeoutException {
        // 是否没有超时设置,当传入的超时时间为负数或者为0时,表示没有超时时间
        boolean noTimeOutFlag = false;
        if (timeout <= 0L) {
            noTimeOutFlag = true;
        }

        long expireTime = System.currentTimeMillis() + timeout;
        do {
            if (tryLock(lockType, key))
                return;
            try {
                Thread.sleep(100L);
            } catch (InterruptedException localInterruptedException) {
            }
        } while ((noTimeOutFlag) || (System.currentTimeMillis() < expireTime));
        
        throw new TimeoutException();
    }

    /**
     * 释放锁
     * @param lockType 锁类型
     * @param key      锁键值
     */
    public void unlock(String lockType, String key) {
        synchronized (this.lock) {
            ConcurrentMap<String, String> map = this.lockMap.get(lockType);
            if (map != null)
                map.remove(key);
        }
    }
}

​ 可以看到,单机模式下的互斥锁是直接在内存中保存一个ConcurrentHashMap,然后利用putIfAbsent的原子特性。该锁的使用方式如下:

try {
    memoryLock.lock(lockType, lockKey, 0l);
} catch(TimeOutException e) {
    // TODO: Exception caught  
} finally {
    memoryLock.unlock(lockType, lockKey);
}

​ 当应用部署在分布式环境中的时候。显然,原来的内存锁已经不适用。那么在分布式情况下,如何实现锁服务呢?网上给出的分布式锁的实现方案一般有三种:

  1. 利用数据库的for update行锁
  2. 利用Redis的setnx
  3. 利用zookeeper的分布式一致性算法

​ 考虑到尽量不增加新的应用部署,那么先排除2、3,只剩下数据库的行级锁。但其实数据库的行级锁在并发量特别大的时候会对数据库性能造成较大影响,而且估计我想使用DBA都不会允许.....

​ 那么,有没有什么其他更好的办法呢?这次我们利用曲线救国的方式来实现,将分布式转变成非分布式。


实现Demo

​ 在分布式改造剧集第一集中,我们的实现方式中有一个主节点,主节点为配置文件中默认配置的Hessian服务的地址。只有加上了Distribute注解的服务,才会在客户端进行Hessian调用的时候进行路由,否则最终调用的Hessian服务地址即为配置文件中配置的主节点。依赖于这个特性,我们可以不给锁服务添加Distribute注解,使得所有分机部署的服务请求都落到主节点上。具体实现步骤如下:

定义一个内存锁Hessian服务

​ 其实简单来说我们直接将原来的MemoryLock发布成Hessian服务,并且不使用Distribute注解就可以实现将分布式锁转换成单机锁。但是还有以下两点需要特殊考虑:

  1. 分布式服务的多机特性: 内存锁的释放必须显示释放,如果一个服务调用unlock方法之前就挂掉,就可能导致某一个锁永远被锁住。所以我们还需要一个类似于Redis分布式锁实现中的锁超时移除机制。
  2. 远程RPC调用的可能超时: 最终锁的服务调用是需要通过Hessian来实现的,考虑到Hessian调用存在超时时间,如果将前面MemoryLocklock方法等待实现在Hessian服务中,那么等待时间超长的话会直接导致Hessian服务调用超时。所以改造后的MemoryLock不实现lock方法,只实现tryLock方法,调用该方法时立即返回当前是否可以获得到锁。
  3. 本地服务实现锁等待以及减少Hessian调用: 如第2点所说,我们的锁等待特性不能在内存锁的Hessian服务中实现,只能通过本地服务中实现。另外频繁的Hessian调用会影响应用程序的性能,也需要一个本地的锁服务来巧妙地减少远程服务调用

​ 改造后的MemoryLock代码如下:

@Service("moemoryLockServiceFacade")
public class MemoryLockServiceImpl implements MemoryLockService {
    
    /**
     * 自动超时时间:当前设置为10分钟 单位为纳秒
     */
    private final static long AUTO_EXPIRE_TIME = 1000000000l * 60 * 10;
    
    /**
     * 锁
     */
    private Object semaphore = new Object();

    /**
     * 内存锁结构,双层Map 首层Map的Key存锁类型,value为内层Map。内层Map额key为锁键值,value为锁的加入时间
     */
    private ConcurrentMap<String, ConcurrentMap<Object, Long>> lockMap = new ConcurrentHashMap<String, ConcurrentMap<Object, Long>>();
    
    /**
     * 守护线程: 用来清理过期内存缓存(如果加锁的客户端由于各种原因没有显示解锁,则可能出现其他服务无法获取锁的情况)
     */
    private Thread daemonThread;
    
    private static final Logger LOGGER = LoggerFactory.getLogger(MemoryLockServiceImpl.class);
    
    /**
     * 是否终止守护线程的标识
     */
    private volatile boolean stop = false;
    
    /**
     * 清理失效锁的线程
     *
     */
    private class ClearExpireLockThread extends Thread {
        
        @Override
        public void run() {
            Iterator<Entry<String, ConcurrentMap<Object, Long>>> outerIterator = null;
            Iterator<Entry<Object, Long>> innerIterator = null;
            
            // 清理超过超时时间的锁
            while (!stop) {
                synchronized (semaphore) {
                    long expireNanoTimes = System.nanoTime() - AUTO_EXPIRE_TIME;    // 算出超时时间,小于该时间的缓存都应该被移除
                    outerIterator = lockMap.entrySet().iterator();
                    while (outerIterator.hasNext()) {
                        Entry<String, ConcurrentMap<Object, Long>> entrySet = outerIterator.next();
                        innerIterator = entrySet.getValue().entrySet().iterator();
                        boolean allDeleted = true;  // 是否全部删除的标识,默认设为true
                        while (innerIterator.hasNext()) {
                            Entry<Object, Long> innerEntry = innerIterator.next();
                            if (expireNanoTimes > innerEntry.getValue()) {
                                innerIterator.remove();
                                LOGGER.info("守护线程移除类型为【{}】键值为【{}】的锁......", entrySet.getKey(), innerEntry.getKey());
                            } else {
                                allDeleted = false;
                            }
                        }
                        
                        // 如果所类型下的所有锁都被清除,则锁类型也该被移除
                        if (allDeleted) {
                            outerIterator.remove();
                            LOGGER.info("守护线程移除类型为【{}】的锁......", entrySet.getKey());
                        }
                    }
                }
                
                try {
                    // 如果超时时间为1秒,则等待千分之一秒
                    Thread.sleep(AUTO_EXPIRE_TIME / 1000000000l);
                } catch (InterruptedException e) {
                }
            }
        }
    }
    
    /**
     * 终止守护线程
     */
    @PreDestroy
    public void stopDeamonThread() {
        this.stop = true;
        this.daemonThread.interrupt();
    }
    
    /**
     * 初始化守护线程,用来扫描移除超时的内存锁
     */
    @PostConstruct
    public void initDeamonThread() {
        daemonThread = new ClearExpireLockThread();
        daemonThread.setDaemon(true);
        daemonThread.start();
    }

    @Override
    public boolean tryLock(String lockType, Object key) {
        synchronized (this.semaphore) {
            ConcurrentMap<Object, Long> map = (ConcurrentMap<Object, Long>) this.lockMap.get(lockType);
            if (map == null) {
                map = new ConcurrentHashMap<Object, Long>();
                this.lockMap.put(lockType, map);
            }
            
            // 这里的value值设置为加锁的初始时间
            return (map.putIfAbsent(key, System.nanoTime()) == null);
        }
    }

    @Override
    public boolean isLockTypeEmpty(String lockType){
        return MapUtils.isEmpty(this.lockMap.get(lockType));
    }
    
    @Override
    public void unlock(String lockType, Object key) {
        synchronized (this.semaphore) {
            ConcurrentMap<Object, Long> map = (ConcurrentMap<Object, Long>) this.lockMap.get(lockType);
            if (map != null) {
                map.remove(key);
                LOGGER.info("手工释放类型为【{}】键值为【{}】的锁......", lockType, key);
            }
        }
    }
}

定义一个分布式锁管理服务实现

​ 定义一个DistributeLock服务,该服务作为本地服务,用来实现锁等待以及减少Hessian锁请求调用。在本地锁服务中注入原来的内存锁Hessian服务实现。具体代码如下:

/**
 * 分布式锁管理类
 *
 */
@Service
public class DistributeLock {
    /**
     * 注入hessian接口的实现类
     */
    @Resource(name="moemoryLockServiceFacade")
    private MemoryLockService memoryLockService;
    
    private Object semaphore = new Object(); 

    /**
     * 内存锁结构,双层Map 首层Map的Key存锁类型,value为内层Map。内层Map额key为锁键值,value为锁住的尝试远程hessian调用获取锁的线程
     */
    private ConcurrentMap<String, ConcurrentMap<String, Thread>> lockMap = new ConcurrentHashMap<String, ConcurrentMap<String, Thread>>();

    /**
     * 判断是否能够获得锁,不阻塞立即返回
     * @param lockType 锁类型
     * @param key  锁的键值
     * @return  true,能够获得锁.false,不能获得锁.
     */
    private boolean tryLock(String lockType, String key) {
        // 提升效率,先内部map判断是否存在锁,如果存在,则直接等待
        synchronized (this.semaphore) {
            ConcurrentMap<String, Thread> map = (ConcurrentMap<String, Thread>) this.lockMap.get(lockType);
            if (map == null) {
                map = new ConcurrentHashMap<String, Thread>();
                this.lockMap.put(lockType, map);
            }
            Thread t = map.putIfAbsent(key, Thread.currentThread());
            
             // 单个服务只有首先获得本机内存锁的线程才有机会去远程调用hessian服务判断是否有锁
            if (t != null && Thread.currentThread() != t) {
                return false;
            }
        }
        
        return memoryLockService.tryLock(lockType, key);
    }
    
    /**
     * 获得锁,在获得锁之前阻塞
     * @param lockType  锁类型
     * @param key   锁键值
     * @param timeout 超时时间
     * @throws TimeoutException 超时抛出超时异常
     */
    public void lock(String lockType, String key, long timeout) throws TimeoutException {
        // 是否没有超时设置,当传入的超时时间为负数或者为0时,表示没有超时时间
        boolean noTimeOutFlag = false;
        if (timeout <= 0L) {
            noTimeOutFlag = true;
        }

        long expireTime = System.currentTimeMillis() + timeout;
        do {
            if (tryLock(lockType, key))
                return;
            try {
                Thread.sleep(100L);
            } catch (InterruptedException localInterruptedException) {
            }
        } while ((noTimeOutFlag) || (System.currentTimeMillis() < expireTime));

        synchronized(this.semaphore) {
            // 需释放当前线程占用的本地内存锁
            this.lockMap.get(lockType).remove(key, Thread.currentThread());
        }
        
        throw new TimeoutException();
    }

    /**
     * 是否指定的锁类型,当前锁的数量为空
     * @param lockType 锁类型
     * @return true,当前锁类型的锁的数量为空;false,当前锁类型的锁锁的数量不为空
     */
    public boolean isLockTypeEmpty(String lockType){
        // 直接内部判断
        if (MapUtils.isNotEmpty(lockMap.get(lockType))) {
            return false;
        }
        
        // 内部判断成功还需远程调用判断
        return memoryLockService.isLockTypeEmpty(lockType);
    }
    
    /**
     * 释放锁
     * @param lockType 锁类型
     * @param key       锁的键值
     */
    public void unlock(String lockType, String key) {
        // 移除本机内存锁模型
        synchronized (this.semaphore) {
            ConcurrentMap<String, Thread> map = (ConcurrentMap<String, Thread>) this.lockMap.get(lockType);
            if (map != null)
                map.remove(key);
        }
        
        // 远程调用释放锁
        memoryLockService.unlock(lockType, key);
    }
}

​ 好了,分布式锁的Demo顺利完成。使用的时候只要将原来的MemoryLock替换成DistributeLock即可。


展望

​ 分布式锁的实现就到这里,其实现的本质在于将分布式转变成非分布式。这里也可以说我是钻了"分布式"的空子

​ 那么既然分布式锁的最终实现也是通过内存锁实现的,且利用了主节点的特性。那么其实我们在实现分布式锁之后,还有下面两个方向可以优化:

  1. 锁管理: 可以增加一个锁管理页面,来展示当前内存中存在的锁,以及移除需要马上移除的锁
  2. 主节点替换: 当前的分布式锁的实现还是依赖于主节点。考虑到主节点可能也挂掉,需要增加主节点可以动态切换的功能。严格上来讲这个是分布式改造剧集1应该实现的功能

后续

​ 好了,分布式锁的改造暂且到此。可以看到其实分布式其实并没有我们想象的这么复杂,分布式技术也没有特别地遥不可及。面对不断革新的技术,我们应该除了拿来主义之外,多思考,真正了解技术背后的实现原理。就像我一直认为的:相比于用轮子造轮子的能力要重要的多

黎明前最黑暗,成功前最绝望!
相关文章
|
4月前
|
缓存 NoSQL 算法
认真学习分布式应用中的分布式锁
认真学习分布式应用中的分布式锁
58 0
|
4月前
|
缓存 算法 NoSQL
【分布式详解】一致性算法、全局唯一ID、分布式锁、分布式事务、 分布式缓存、分布式任务、分布式会话
分布式系统通过副本控制协议,使得从系统外部读取系统内部各个副本的数据在一定的约束条件下相同,称之为副本一致性(consistency)。副本一致性是针对分布式系统而言的,不是针对某一个副本而言。强一致性(strong consistency):任何时刻任何用户或节点都可以读到最近一次成功更新的副本数据。强一致性是程度最高的一致性要求,也是实践中最难以实现的一致性。单调一致性(monotonic consistency):任何时刻,任何用户一旦读到某个数据在某次更新后的值,这个用户不会再读到比这个值更旧的值。
578 0
|
4月前
|
NoSQL Java Redis
【分布式技术专题】「分布式技术架构」手把手教你如何开发一个属于自己的分布式锁的功能组件(一)
【分布式技术专题】「分布式技术架构」手把手教你如何开发一个属于自己的分布式锁的功能组件
67 0
|
23天前
|
缓存 NoSQL Java
SpringBoot整合Redis、以及缓存穿透、缓存雪崩、缓存击穿的理解分布式情况下如何添加分布式锁 【续篇】
这篇文章是关于如何在SpringBoot应用中整合Redis并处理分布式场景下的缓存问题,包括缓存穿透、缓存雪崩和缓存击穿。文章详细讨论了在分布式情况下如何添加分布式锁来解决缓存击穿问题,提供了加锁和解锁的实现过程,并展示了使用JMeter进行压力测试来验证锁机制有效性的方法。
SpringBoot整合Redis、以及缓存穿透、缓存雪崩、缓存击穿的理解分布式情况下如何添加分布式锁 【续篇】
|
27天前
|
存储 调度
分布式锁设计问题之云存储的最佳实践中保障分布式锁的容错能力如何解决
分布式锁设计问题之云存储的最佳实践中保障分布式锁的容错能力如何解决
|
2月前
|
NoSQL Java Redis
分布式锁实现原理问题之使用Redis的setNx命令来实现分布式锁问题如何解决
分布式锁实现原理问题之使用Redis的setNx命令来实现分布式锁问题如何解决
|
23天前
|
缓存 NoSQL Java
SpringBoot整合Redis、以及缓存穿透、缓存雪崩、缓存击穿的理解、如何添加锁解决缓存击穿问题?分布式情况下如何添加分布式锁
这篇文章介绍了如何在SpringBoot项目中整合Redis,并探讨了缓存穿透、缓存雪崩和缓存击穿的问题以及解决方法。文章还提供了解决缓存击穿问题的加锁示例代码,包括存在问题和问题解决后的版本,并指出了本地锁在分布式情况下的局限性,引出了分布式锁的概念。
SpringBoot整合Redis、以及缓存穿透、缓存雪崩、缓存击穿的理解、如何添加锁解决缓存击穿问题?分布式情况下如何添加分布式锁
|
2月前
|
NoSQL 算法 Java
(十三)全面理解并发编程之分布式架构下Redis、ZK分布式锁的前世今生
本文探讨了从单体架构下的锁机制到分布式架构下的线程安全问题,并详细分析了分布式锁的实现原理和过程。
|
2月前
|
数据库
分布式锁实现问题之数据库中的分布式锁有哪些缺点
分布式锁实现问题之数据库中的分布式锁有哪些缺点
|
27天前
|
存储 调度
分布式锁设计问题之分布式锁系统通常设计其架构如何解决
分布式锁设计问题之分布式锁系统通常设计其架构如何解决
下一篇
DDNS