如何使用Zookeeper去实现分布式锁?

本文涉及的产品
服务治理 MSE Sentinel/OpenSergo,Agent数量 不受限
简介: 在单体环境中,遇到临界资源的时候我们会使用Synchronized或者RetreenLock在调用临界资源前上锁。但是在分布式的环境下,锁住单体资源就不起作用了,这个时候就需要用到分布式锁。分布式锁的原理就是借用外部的一个系统来充当锁的作用,比如Mysql、Redis、Zookeeper等都可以用作分布式锁。在实际业务中,Redis和Zookeeper用到的最多。

听说微信搜索《Java鱼仔》会变更强哦!


本文收录于JavaStarter ,里面有我完整的Java系列文章,学习或面试都可以看看哦


(一)引言


在单体环境中,遇到临界资源的时候我们会使用Synchronized或者RetreenLock在调用临界资源前上锁。但是在分布式的环境下,锁住单体资源就不起作用了,这个时候就需要用到分布式锁。分布式锁的原理就是借用外部的一个系统来充当锁的作用,比如Mysql、Redis、Zookeeper等都可以用作分布式锁。在实际业务中,Redis和Zookeeper用到的最多。


(二)Zookeeper锁的原理


锁分为两种:共享锁(读锁)和排他锁(写锁) 读锁:当有一个线程获取读锁后,其他线程也可以获取读锁,但是在读锁没有完全被释放之前,其他线程不能获取写锁。写锁:当有一个线程获取写锁后,其他线程就无法获取读锁和写锁了


zookeeper有一种节点类型叫做临时序号节点,它会按序号自增地创建临时节点,这正好可以作为分布式锁的实现工具。


读锁获取原理: 1、根据资源的id创建临时序号节点:/lock/mylockR0000000005  Read 2、获取/lock下的所有子节点,判断比他小的节点是否全是读锁,如果是读锁则获取锁成功 3、如果不是,则阻塞等待,监听自己的前一个节点。 4、当前面一个节点发生变更时,重新执行第二步操作。


写锁获取原理: 1、根据资源的id创建临时序号节点:/lock/mylockW0000000006  Write 2、获取 /lock 下所有子节点,判断最小的节点是否为自己,如果是则获锁成功 3、如果不是,则阻塞等待,监听自己的前一个节点 4、当前面一个节点发生变更时,重新执行第二步。


通过一张图更清晰地看出现象:首先是写锁,因为写锁不是最前面的节点,所以阻塞了,008读锁因为前面并不是所有都是读锁,所以阻塞了


网络异常,图片无法展示
|


释放锁: 删除对应的临时节点即可,如果服务器宕机了,因为临时节点的原理也不会发生死锁的情况。


(三)代码实现


真实的场景中,一般来说为了效率不会上读锁,想想看如果有人在查看数据,你就不能去修改了,这样效率是不是特别低。这里用代码实现分布式写锁,首先自己定义一个锁类


@Data@AllArgsConstructor@NoArgsConstructorpublicclassLock {
privateStringlockId;
privateStringpath;
privatebooleanactive;
publicLock(StringlockId, StringnodePath) {
this.lockId=lockId;
this.path=nodePath;
    }
}


再通过Zookeeper写一个加锁工具类,代码已经给了注释,里面的实现原理和上面所讲的写锁获取原理一致:


publicclassZookeeperLock {
privateStringserver="192.168.78.128:2181";
privateZkClientzkClient;
privatestaticfinalStringrootPath="/lock";
//初始化ZkClient,并创建根节点publicZookeeperLock(){
zkClient=newZkClient(server,5000,20000);
buildRoot();
    }
//创建根节点publicvoidbuildRoot(){
//如果根节点不存在,就创建if (!zkClient.exists(rootPath)){
zkClient.createPersistent(rootPath);
System.out.println("创建根节点成功");
        }
    }
publicLocklock(StringlockId,longtimeout){
//创建一个临时节点LocklockNode=createLockNode(lockId);
//尝试去激活锁lockNode=tryActiveLock(lockNode);
//如果没有激活,则等待timeout的时间if (!lockNode.isActive()){
try {
synchronized (lockNode){
lockNode.wait(timeout);
                }
            } catch (InterruptedExceptione) {
e.printStackTrace();
            }
        }
//timeout时间内节点还未释放,就报lock timeout错误if (!lockNode.isActive()){
thrownewRuntimeException("lock timeout");
        }
returnlockNode;
    }
//释放锁publicvoidunlock(Locklock){
if (lock.isActive()){
zkClient.delete(lock.getPath());
        }
    }
//尝试激活锁privateLocktryActiveLock(LocklockNode){
//获取所有的子节点List<String>childList=zkClient.getChildren(rootPath)
                .stream()
                .sorted()
                .map(p->rootPath+"/"+p)
                .collect(Collectors.toList());
//获取第一个元素StringfirstNodePath=childList.get(0);
//如果自己就是第一个节点,就激活锁if (firstNodePath.equals(lockNode.getPath())){
lockNode.setActive(true);
        }else {
//否则监听前一个锁StringupNodePath=childList.get(childList.indexOf(lockNode.getPath())-1);
zkClient.subscribeDataChanges(upNodePath, newIZkDataListener() {
@OverridepublicvoidhandleDataChange(Strings, Objecto) throwsException {
                }
//如果前面一个节点被删除了,再次尝试获取锁@OverridepublicvoidhandleDataDeleted(Strings) throwsException {
System.out.println("节点删除"+s);
Locklock=tryActiveLock(lockNode);
synchronized (lockNode){
if (lock.isActive()){
lockNode.notify();
                        }
                    }
zkClient.unsubscribeDataChanges(upNodePath,this);
                }
            });
        }
returnlockNode;
    }
publicLockcreateLockNode(StringlockId) {
StringnodePath=zkClient.createEphemeralSequential(rootPath+"/"+lockId, "lock");
returnnewLock(lockId, nodePath);
    }
}


(四)测试


上面写的这个工具类,以后可以直接拿过来用,我们来测试一下,首先是不加锁开100个线程去加一个变量:


publicclassTest {
privateintflag=0;
privateZookeeperLockzookeeperLock=newZookeeperLock();
@TestpublicvoidtestLock() throwsInterruptedException {
ExecutorServiceexecutorService=Executors.newCachedThreadPool();
for (inti=0; i<100; i++) {
executorService.submit(()->{
flag++;
            });
        }
executorService.shutdown();
executorService.awaitTermination(10, TimeUnit.SECONDS);
System.out.println(flag);
    }
}


最后的返回结果永远到不了100,因为存在更新丢失。 加上锁:


publicclassTest {
privateintflag=0;
privateZookeeperLockzookeeperLock=newZookeeperLock();
@TestpublicvoidtestLock() throwsInterruptedException {
ExecutorServiceexecutorService=Executors.newCachedThreadPool();
for (inti=0; i<100; i++) {
executorService.submit(()->{
Locklock=zookeeperLock.lock("myLock", 60*1000);
flag++;
zookeeperLock.unlock(lock);
            });
        }
executorService.shutdown();
executorService.awaitTermination(10, TimeUnit.SECONDS);
System.out.println(flag);
    }
}


最后的结果一直都是100。


(五)总结


只要懂得分布式锁的原理,代码的实现就会变得十分简单。你会累是因为你在走上坡路!我们下期再见。

相关实践学习
基于MSE实现微服务的全链路灰度
通过本场景的实验操作,您将了解并实现在线业务的微服务全链路灰度能力。
相关文章
|
4天前
|
监控 负载均衡 Cloud Native
ZooKeeper分布式协调服务详解:面试经验与必备知识点解析
【4月更文挑战第9天】本文深入剖析ZooKeeper分布式协调服务原理,涵盖核心概念如Server、Client、ZNode、ACL、Watcher,以及ZAB协议在一致性、会话管理、Leader选举中的作用。讨论ZooKeeper数据模型、操作、会话管理、集群部署与管理、性能调优和监控。同时,文章探讨了ZooKeeper在分布式锁、队列、服务注册与发现等场景的应用,并在面试方面分析了与其它服务的区别、实战挑战及解决方案。附带Java客户端实现分布式锁的代码示例,助力提升面试表现。
131 2
|
4天前
|
监控 Dubbo 前端开发
快速入门分布式系统与Dubbo+zookeeper Demo
快速入门分布式系统与Dubbo+zookeeper Demo
43 0
|
4天前
|
监控 NoSQL Java
Zookeeper分布式锁
Zookeeper分布式锁
91 1
|
1天前
|
前端开发 JavaScript 算法
分布式系统的一致性级别划分及Zookeeper一致性级别分析
分布式系统的一致性级别划分及Zookeeper一致性级别分析
|
4天前
|
存储 大数据 Apache
深入理解ZooKeeper:分布式协调服务的核心与实践
【5月更文挑战第7天】ZooKeeper是Apache的分布式协调服务,确保大规模分布式系统中的数据一致性与高可用性。其特点包括强一致性、高可用性、可靠性、顺序性和实时性。使用ZooKeeper涉及安装配置、启动服务、客户端连接及执行操作。实际应用中,面临性能瓶颈、不可伸缩性和单点故障等问题,可通过水平扩展、集成其他服务和多集群备份来解决。理解ZooKeeper原理和实践,有助于构建高效分布式系统。
|
4天前
|
Java 网络安全 Apache
搭建Zookeeper集群:三台服务器,一场分布式之舞
搭建Zookeeper集群:三台服务器,一场分布式之舞
53 0
|
4天前
|
Java Linux Spring
Zookeeper实现分布式服务配置中心
Zookeeper实现分布式服务配置中心
50 0
|
4天前
|
存储 分布式计算 Hadoop
ZooKeeper初探:分布式世界的守护者
ZooKeeper初探:分布式世界的守护者
67 0
|
4天前
|
NoSQL Java API
分布式锁【数据库乐观锁实现的分布式锁、Zookeeper分布式锁原理、Redis实现的分布式锁】(三)-全面详解(学习总结---从入门到深化)
分布式锁【数据库乐观锁实现的分布式锁、Zookeeper分布式锁原理、Redis实现的分布式锁】(三)-全面详解(学习总结---从入门到深化)
302 0
|
4天前
|
消息中间件 Java 网络安全
JAVAEE分布式技术之Zookeeper的第一次课
JAVAEE分布式技术之Zookeeper的第一次课
73 0