听说微信搜索《Java鱼仔》会变更强哦!
本文收录于JavaStarter ,里面有我完整的Java系列文章,学习或面试都可以看看哦
(一)引言
在单体环境中,遇到临界资源的时候我们会使用Synchronized或者RetreenLock在调用临界资源前上锁。但是在分布式的环境下,锁住单体资源就不起作用了,这个时候就需要用到分布式锁。分布式锁的原理就是借用外部的一个系统来充当锁的作用,比如Mysql、Redis、Zookeeper等都可以用作分布式锁。在实际业务中,Redis和Zookeeper用到的最多。
(二)Zookeeper锁的原理
锁分为两种:共享锁(读锁)和排他锁(写锁) 读锁:当有一个线程获取读锁后,其他线程也可以获取读锁,但是在读锁没有完全被释放之前,其他线程不能获取写锁。写锁:当有一个线程获取写锁后,其他线程就无法获取读锁和写锁了。
zookeeper有一种节点类型叫做临时序号节点,它会按序号自增地创建临时节点,这正好可以作为分布式锁的实现工具。
读锁获取原理: 1、根据资源的id创建临时序号节点:/lock/mylockR0000000005 Read 2、获取/lock下的所有子节点,判断比他小的节点是否全是读锁,如果是读锁则获取锁成功 3、如果不是,则阻塞等待,监听自己的前一个节点。 4、当前面一个节点发生变更时,重新执行第二步操作。
写锁获取原理: 1、根据资源的id创建临时序号节点:/lock/mylockW0000000006 Write 2、获取 /lock 下所有子节点,判断最小的节点是否为自己,如果是则获锁成功 3、如果不是,则阻塞等待,监听自己的前一个节点 4、当前面一个节点发生变更时,重新执行第二步。
通过一张图更清晰地看出现象:首先是写锁,因为写锁不是最前面的节点,所以阻塞了,008读锁因为前面并不是所有都是读锁,所以阻塞了
释放锁: 删除对应的临时节点即可,如果服务器宕机了,因为临时节点的原理也不会发生死锁的情况。
(三)代码实现
真实的场景中,一般来说为了效率不会上读锁,想想看如果有人在查看数据,你就不能去修改了,这样效率是不是特别低。这里用代码实现分布式写锁,首先自己定义一个锁类
publicclassLock { privateStringlockId; privateStringpath; privatebooleanactive; publicLock(StringlockId, StringnodePath) { this.lockId=lockId; this.path=nodePath; } }
再通过Zookeeper写一个加锁工具类,代码已经给了注释,里面的实现原理和上面所讲的写锁获取原理一致:
publicclassZookeeperLock { privateStringserver="192.168.78.128:2181"; privateZkClientzkClient; privatestaticfinalStringrootPath="/lock"; //初始化ZkClient,并创建根节点publicZookeeperLock(){ zkClient=newZkClient(server,5000,20000); buildRoot(); } //创建根节点publicvoidbuildRoot(){ //如果根节点不存在,就创建if (!zkClient.exists(rootPath)){ zkClient.createPersistent(rootPath); System.out.println("创建根节点成功"); } } publicLocklock(StringlockId,longtimeout){ //创建一个临时节点LocklockNode=createLockNode(lockId); //尝试去激活锁lockNode=tryActiveLock(lockNode); //如果没有激活,则等待timeout的时间if (!lockNode.isActive()){ try { synchronized (lockNode){ lockNode.wait(timeout); } } catch (InterruptedExceptione) { e.printStackTrace(); } } //timeout时间内节点还未释放,就报lock timeout错误if (!lockNode.isActive()){ thrownewRuntimeException("lock timeout"); } returnlockNode; } //释放锁publicvoidunlock(Locklock){ if (lock.isActive()){ zkClient.delete(lock.getPath()); } } //尝试激活锁privateLocktryActiveLock(LocklockNode){ //获取所有的子节点List<String>childList=zkClient.getChildren(rootPath) .stream() .sorted() .map(p->rootPath+"/"+p) .collect(Collectors.toList()); //获取第一个元素StringfirstNodePath=childList.get(0); //如果自己就是第一个节点,就激活锁if (firstNodePath.equals(lockNode.getPath())){ lockNode.setActive(true); }else { //否则监听前一个锁StringupNodePath=childList.get(childList.indexOf(lockNode.getPath())-1); zkClient.subscribeDataChanges(upNodePath, newIZkDataListener() { publicvoidhandleDataChange(Strings, Objecto) throwsException { } //如果前面一个节点被删除了,再次尝试获取锁publicvoidhandleDataDeleted(Strings) throwsException { System.out.println("节点删除"+s); Locklock=tryActiveLock(lockNode); synchronized (lockNode){ if (lock.isActive()){ lockNode.notify(); } } zkClient.unsubscribeDataChanges(upNodePath,this); } }); } returnlockNode; } publicLockcreateLockNode(StringlockId) { StringnodePath=zkClient.createEphemeralSequential(rootPath+"/"+lockId, "lock"); returnnewLock(lockId, nodePath); } }
(四)测试
上面写的这个工具类,以后可以直接拿过来用,我们来测试一下,首先是不加锁开100个线程去加一个变量:
publicclassTest { privateintflag=0; privateZookeeperLockzookeeperLock=newZookeeperLock(); publicvoidtestLock() throwsInterruptedException { ExecutorServiceexecutorService=Executors.newCachedThreadPool(); for (inti=0; i<100; i++) { executorService.submit(()->{ flag++; }); } executorService.shutdown(); executorService.awaitTermination(10, TimeUnit.SECONDS); System.out.println(flag); } }
最后的返回结果永远到不了100,因为存在更新丢失。 加上锁:
publicclassTest { privateintflag=0; privateZookeeperLockzookeeperLock=newZookeeperLock(); publicvoidtestLock() throwsInterruptedException { ExecutorServiceexecutorService=Executors.newCachedThreadPool(); for (inti=0; i<100; i++) { executorService.submit(()->{ Locklock=zookeeperLock.lock("myLock", 60*1000); flag++; zookeeperLock.unlock(lock); }); } executorService.shutdown(); executorService.awaitTermination(10, TimeUnit.SECONDS); System.out.println(flag); } }
最后的结果一直都是100。
(五)总结
只要懂得分布式锁的原理,代码的实现就会变得十分简单。你会累是因为你在走上坡路!我们下期再见。