zookeeper 分布式锁

本文涉及的产品
服务治理 MSE Sentinel/OpenSergo,Agent数量 不受限
注册配置 MSE Nacos/ZooKeeper,118元/月
云原生网关 MSE Higress,422元/月
简介:

分布式锁有很多,redis也可以实现分布式锁,

http://shangdc.blog.51cto.com/10093778/1914852(查看redis的分布式锁)


zookeeper分布式锁步骤:

1、zookeeper是一个带有节点的,类似于文件目录,所以我们把锁抽象成目录,zookeeper有一个EPHEMERAL_SEQUENTIAL类型的节点, 多个线程再zookeeper创建的节点的时候,它会帮我们安排好顺序进行创建,所以这个节点下的目录都是顺序的。

2、获取当前目录的最小的节点,判断最小节点是不是当前的自己的节点,如果是说明获取锁成功了,如果不是获取锁失败了。

3、当获取锁的时候失败了,为了避免惊群效应,你要做的就是获取当前自己的节点的上一个节点,然后对该节点进行监听,当上一个节点删除的时候,会触发这个监听,通知该节点。

4、这么做,释放锁的时候,也会通知下一个节点。


什么是惊群效应:理解为肉少狼多,当一个节点删除的时候,凡是订阅了此节点的watcha的监听都会重新获取锁,都要去争夺,如果数量少还好,当数量很大的时候这种设计就是不合理也是浪费资源。


zookeeper的状态和事件类型,提前了解一下。

1
2
3
4
5
6
7
8
9
10
11
12
13
状态 KeeperState.Disconnected ( 0 )  断开
  * KeeperState.SyncConnected ( 3 )  同步连接状态
  * KeeperState.AuthFailed ( 4 ) 认证失败状态
  * KeeperState.ConnectedReadOnly ( 5 )  只读连接状态
  * KeeperState.SaslAuthenticated ( 6 ) SASL认证通过状态
  * KeeperState.Expired (- 112 )  过期状态
 
  // EventType 是事件类型 主要关注 Create Delete DataChanged ChildrenChanged
  * EventType.None (- 1 ), 无
  * EventType.NodeCreated ( 1 ),
  * EventType.NodeDeleted ( 2 ),
  * EventType.NodeDataChanged ( 3 ),  结点数据变化
  * EventType.NodeChildrenChanged ( 4 ); 结点子节点变化


下面是代码,自己敲下,理解一下。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
package  com.lhcis.spider.system.annotation;
 
import  java.io.IOException;
import  java.io.UnsupportedEncodingException;
import  java.util.concurrent.CountDownLatch;
 
import  org.apache.zookeeper.CreateMode;
import  org.apache.zookeeper.KeeperException;
import  org.apache.zookeeper.WatchedEvent;
import  org.apache.zookeeper.Watcher;
import  org.apache.zookeeper.ZooDefs;
import  org.apache.zookeeper.ZooKeeper;
import  org.apache.zookeeper.data.Stat;
import  org.slf4j.Logger;
import  org.slf4j.LoggerFactory;
 
/**
  * @author sdc
  *
  */
public  class  ZooDistributeLock  implements  Watcher {
 
     private  static  final  Logger LOG = LoggerFactory.getLogger(ZooDistributeLock. class );
 
     private  static  final  String LOCK_PATH =  "/zkLock" ;
 
     // 模拟开启的线程数
     private  static  final  int  THREAD_NUM =  5 ;
 
     // 用于等待所有线程都连接成功后再执行任务
     private  static  CountDownLatch startFlag =  new  CountDownLatch( 1 );
 
     // 用于确保所有线程执行完毕
     private  static  CountDownLatch threadFlag =  new  CountDownLatch(THREAD_NUM);
 
     private  ZooKeeper zk =  null ;
 
     private  String currentPath;
 
     private  String lockPath;
 
     public  static  void  main(String[] args) {
         for  ( int  i =  0 ; i < THREAD_NUM; i++) {
             final  int  j = i;
             new  Thread() {
                 @Override
                 public  void  run() {
                     ZooDistributeLock zooDistributeLock =  new  ZooDistributeLock();
                     try  {
                         zooDistributeLock.connection();
                         System.out.println( "连接"  + j);
                         zooDistributeLock.createNode();
                         System.out.println( "创建"  + j);
                         zooDistributeLock.getLock();
                         System.out.println( "获取锁"  + j);
                     catch  (IOException | InterruptedException | KeeperException e) {
                         e.printStackTrace();
                     }
                 }
             }.start();
         }
         try  {
             threadFlag.await();
             LOG.info( "所有线程执行完毕..." );
         catch  (InterruptedException e) {
             LOG.error(e.getMessage(), e);
         }
     }
 
     /**
      * Disconnected为网络闪断时触发的事件,当然其他的拔掉网线、kill zookeeper server ,kill zk
      * connection也会触发该事件。 SyncConnected为client端重新选择下一个zk
      * server连接触发的事件,此时watcher有效,也就是能正常感知
      * Expired为客户端重新连server时,服务端发现该session超过了设定的时长,返回给client
      * Expired,此时watcher失效,也就是不能正常感知
      */
     @Override
     public  void  process(WatchedEvent event) {
 
         Event.KeeperState state = event.getState();
         Event.EventType type = event.getType();
 
         if  (Event.KeeperState.SyncConnected == state) {
             if  (Event.EventType.None == type) {
                 // 标识连接成功
                 LOG.info( "成功连接上ZK服务器" );
                 startFlag.countDown();
             }
 
             if  (Event.EventType.NodeDeleted == type && event.getPath().equals( this .lockPath)) {
                 LOG.info( "node:"  this .lockPath +  "的锁已经被释放" );
                 try  {
                     // 上一个节点释放了,当前节点去获取锁
                     getLock();
                 catch  (KeeperException | InterruptedException e) {
                     LOG.error(e.getMessage(), e);
                 }
             }
         }
 
     }
 
     /**
      * 连接到 ZK
      *
      * @throws IOException
      */
     private  void  connection()  throws  IOException, InterruptedException {
 
         zk =  new  ZooKeeper( "127.0.0.1:2181" 5000 this );
 
         // 等待连接成功后再执行下一步操作
         startFlag.await();
     }
 
     // 创建节点,并初始化当前路径
     private  void  createNode()  throws  KeeperException, InterruptedException, UnsupportedEncodingException {
         this .currentPath =  this .zk.create(LOCK_PATH,  "" .getBytes( "UTF-8" ), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
     }
 
     private  void  getLock()  throws  KeeperException, InterruptedException {
         if  (minNode()) {
             doSomething();
             // 释放锁
             releaseLock();
         }
     }
 
     /**
      * 当前是否为最小节点
      *
      * @return
      */
     private  boolean  minNode() {
 
         // 当前序号
         try  {
             initLockPath();
             // 判断前一个节点存在不存在,如果存在,则表示当前节点不是最小节点
             // zk.getData(this.lockPath, this, new Stat());
             zk.getData( this .lockPath,  true new  Stat());
             LOG.info( this .currentPath +  " 不是最小值,没有获取锁,等待 "  this .lockPath +  " 释放锁" );
             return  false ;
         catch  (KeeperException e) {
             LOG.info( this .currentPath +  " 是最小值,获得锁" );
             return  true ;
         catch  (InterruptedException e) {
             LOG.error(e.getMessage(), e);
         }
         return  true ;
     }
 
     private  void  doSomething() {
         LOG.info( "处理业务逻辑..." );
     }
 
     /**
      * 释放锁并关闭连接
      *
      * @throws KeeperException
      * @throws InterruptedException
      */
     private  void  releaseLock()  throws  KeeperException, InterruptedException {
         Thread.sleep( 2000 );
         if  ( this .zk !=  null ) {
             LOG.info( this .currentPath +  " 业务处理完毕,释放锁..." );
             zk.delete( this .currentPath, - 1 );
             this .zk.close();
             LOG.info(Thread.currentThread().getName() +  "关闭 zookeeper 连接" );
         }
         threadFlag.countDown();
     }
 
     /**
      * 初始化 lockpath
      */
     private  void  initLockPath() {
 
         int  currentSeq = Integer.parseInt( this .currentPath.substring(LOCK_PATH.length()));
 
         // 上一个序号
         int  preSeq = currentSeq -  1 ;
 
         String preSeqStr = String.valueOf(preSeq);
         while  (preSeqStr.length() <  10 ) {
             preSeqStr =  "0"  + preSeqStr;
         }
         this .lockPath = LOCK_PATH + preSeqStr;
     }
 
}



参考代码:

https://juejin.im/entry/596438bc6fb9a06bb47495f1


本文转自 豆芽菜橙 51CTO博客,原文链接:http://blog.51cto.com/shangdc/1958619

相关实践学习
基于MSE实现微服务的全链路灰度
通过本场景的实验操作,您将了解并实现在线业务的微服务全链路灰度能力。
相关文章
|
2月前
|
监控 负载均衡 Cloud Native
ZooKeeper分布式协调服务详解:面试经验与必备知识点解析
【4月更文挑战第9天】本文深入剖析ZooKeeper分布式协调服务原理,涵盖核心概念如Server、Client、ZNode、ACL、Watcher,以及ZAB协议在一致性、会话管理、Leader选举中的作用。讨论ZooKeeper数据模型、操作、会话管理、集群部署与管理、性能调优和监控。同时,文章探讨了ZooKeeper在分布式锁、队列、服务注册与发现等场景的应用,并在面试方面分析了与其它服务的区别、实战挑战及解决方案。附带Java客户端实现分布式锁的代码示例,助力提升面试表现。
437 2
|
24天前
|
Shell 虚拟化
分布式系统详解--框架(Zookeeper-基本shell命令)
分布式系统详解--框架(Zookeeper-基本shell命令)
21 1
|
17天前
|
缓存 NoSQL 数据库
分布式系统面试全集通第一篇(dubbo+redis+zookeeper----分布式+CAP+BASE+分布式事务+分布式锁)
分布式系统面试全集通第一篇(dubbo+redis+zookeeper----分布式+CAP+BASE+分布式事务+分布式锁)
25 0
|
17天前
|
设计模式 监控 安全
一文搞懂:zookeeper实现分布式锁安全用法
一文搞懂:zookeeper实现分布式锁安全用法
13 0
|
24天前
|
Java 网络安全
分布式系统详解--框架(Zookeeper-简介和集群搭建)
分布式系统详解--框架(Zookeeper-简介和集群搭建)
111 0
|
1月前
|
存储 监控 负载均衡
Zookeeper 详解:分布式协调服务的核心概念与实践
Zookeeper 详解:分布式协调服务的核心概念与实践
28 0
|
2月前
|
存储 大数据 Apache
深入理解ZooKeeper:分布式协调服务的核心与实践
【5月更文挑战第7天】ZooKeeper是Apache的分布式协调服务,确保大规模分布式系统中的数据一致性与高可用性。其特点包括强一致性、高可用性、可靠性、顺序性和实时性。使用ZooKeeper涉及安装配置、启动服务、客户端连接及执行操作。实际应用中,面临性能瓶颈、不可伸缩性和单点故障等问题,可通过水平扩展、集成其他服务和多集群备份来解决。理解ZooKeeper原理和实践,有助于构建高效分布式系统。
|
2月前
|
前端开发 JavaScript 算法
分布式系统的一致性级别划分及Zookeeper一致性级别分析
分布式系统的一致性级别划分及Zookeeper一致性级别分析
|
2月前
|
Java 网络安全 Apache
搭建Zookeeper集群:三台服务器,一场分布式之舞
搭建Zookeeper集群:三台服务器,一场分布式之舞
206 0
|
2月前
|
监控 Dubbo 前端开发
快速入门分布式系统与Dubbo+zookeeper Demo
快速入门分布式系统与Dubbo+zookeeper Demo
445 0