zookeeper 分布式锁

简介:

分布式锁有很多,redis也可以实现分布式锁,

http://shangdc.blog.51cto.com/10093778/1914852(查看redis的分布式锁)


zookeeper分布式锁步骤:

1、zookeeper是一个带有节点的,类似于文件目录,所以我们把锁抽象成目录,zookeeper有一个EPHEMERAL_SEQUENTIAL类型的节点, 多个线程再zookeeper创建的节点的时候,它会帮我们安排好顺序进行创建,所以这个节点下的目录都是顺序的。

2、获取当前目录的最小的节点,判断最小节点是不是当前的自己的节点,如果是说明获取锁成功了,如果不是获取锁失败了。

3、当获取锁的时候失败了,为了避免惊群效应,你要做的就是获取当前自己的节点的上一个节点,然后对该节点进行监听,当上一个节点删除的时候,会触发这个监听,通知该节点。

4、这么做,释放锁的时候,也会通知下一个节点。


什么是惊群效应:理解为肉少狼多,当一个节点删除的时候,凡是订阅了此节点的watcha的监听都会重新获取锁,都要去争夺,如果数量少还好,当数量很大的时候这种设计就是不合理也是浪费资源。


zookeeper的状态和事件类型,提前了解一下。

1
2
3
4
5
6
7
8
9
10
11
12
13
状态 KeeperState.Disconnected ( 0 )  断开
  * KeeperState.SyncConnected ( 3 )  同步连接状态
  * KeeperState.AuthFailed ( 4 ) 认证失败状态
  * KeeperState.ConnectedReadOnly ( 5 )  只读连接状态
  * KeeperState.SaslAuthenticated ( 6 ) SASL认证通过状态
  * KeeperState.Expired (- 112 )  过期状态
 
  // EventType 是事件类型 主要关注 Create Delete DataChanged ChildrenChanged
  * EventType.None (- 1 ), 无
  * EventType.NodeCreated ( 1 ),
  * EventType.NodeDeleted ( 2 ),
  * EventType.NodeDataChanged ( 3 ),  结点数据变化
  * EventType.NodeChildrenChanged ( 4 ); 结点子节点变化


下面是代码,自己敲下,理解一下。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
package  com.lhcis.spider.system.annotation;
 
import  java.io.IOException;
import  java.io.UnsupportedEncodingException;
import  java.util.concurrent.CountDownLatch;
 
import  org.apache.zookeeper.CreateMode;
import  org.apache.zookeeper.KeeperException;
import  org.apache.zookeeper.WatchedEvent;
import  org.apache.zookeeper.Watcher;
import  org.apache.zookeeper.ZooDefs;
import  org.apache.zookeeper.ZooKeeper;
import  org.apache.zookeeper.data.Stat;
import  org.slf4j.Logger;
import  org.slf4j.LoggerFactory;
 
/**
  * @author sdc
  *
  */
public  class  ZooDistributeLock  implements  Watcher {
 
     private  static  final  Logger LOG = LoggerFactory.getLogger(ZooDistributeLock. class );
 
     private  static  final  String LOCK_PATH =  "/zkLock" ;
 
     // 模拟开启的线程数
     private  static  final  int  THREAD_NUM =  5 ;
 
     // 用于等待所有线程都连接成功后再执行任务
     private  static  CountDownLatch startFlag =  new  CountDownLatch( 1 );
 
     // 用于确保所有线程执行完毕
     private  static  CountDownLatch threadFlag =  new  CountDownLatch(THREAD_NUM);
 
     private  ZooKeeper zk =  null ;
 
     private  String currentPath;
 
     private  String lockPath;
 
     public  static  void  main(String[] args) {
         for  ( int  i =  0 ; i < THREAD_NUM; i++) {
             final  int  j = i;
             new  Thread() {
                 @Override
                 public  void  run() {
                     ZooDistributeLock zooDistributeLock =  new  ZooDistributeLock();
                     try  {
                         zooDistributeLock.connection();
                         System.out.println( "连接"  + j);
                         zooDistributeLock.createNode();
                         System.out.println( "创建"  + j);
                         zooDistributeLock.getLock();
                         System.out.println( "获取锁"  + j);
                     catch  (IOException | InterruptedException | KeeperException e) {
                         e.printStackTrace();
                     }
                 }
             }.start();
         }
         try  {
             threadFlag.await();
             LOG.info( "所有线程执行完毕..." );
         catch  (InterruptedException e) {
             LOG.error(e.getMessage(), e);
         }
     }
 
     /**
      * Disconnected为网络闪断时触发的事件,当然其他的拔掉网线、kill zookeeper server ,kill zk
      * connection也会触发该事件。 SyncConnected为client端重新选择下一个zk
      * server连接触发的事件,此时watcher有效,也就是能正常感知
      * Expired为客户端重新连server时,服务端发现该session超过了设定的时长,返回给client
      * Expired,此时watcher失效,也就是不能正常感知
      */
     @Override
     public  void  process(WatchedEvent event) {
 
         Event.KeeperState state = event.getState();
         Event.EventType type = event.getType();
 
         if  (Event.KeeperState.SyncConnected == state) {
             if  (Event.EventType.None == type) {
                 // 标识连接成功
                 LOG.info( "成功连接上ZK服务器" );
                 startFlag.countDown();
             }
 
             if  (Event.EventType.NodeDeleted == type && event.getPath().equals( this .lockPath)) {
                 LOG.info( "node:"  this .lockPath +  "的锁已经被释放" );
                 try  {
                     // 上一个节点释放了,当前节点去获取锁
                     getLock();
                 catch  (KeeperException | InterruptedException e) {
                     LOG.error(e.getMessage(), e);
                 }
             }
         }
 
     }
 
     /**
      * 连接到 ZK
      *
      * @throws IOException
      */
     private  void  connection()  throws  IOException, InterruptedException {
 
         zk =  new  ZooKeeper( "127.0.0.1:2181" 5000 this );
 
         // 等待连接成功后再执行下一步操作
         startFlag.await();
     }
 
     // 创建节点,并初始化当前路径
     private  void  createNode()  throws  KeeperException, InterruptedException, UnsupportedEncodingException {
         this .currentPath =  this .zk.create(LOCK_PATH,  "" .getBytes( "UTF-8" ), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
     }
 
     private  void  getLock()  throws  KeeperException, InterruptedException {
         if  (minNode()) {
             doSomething();
             // 释放锁
             releaseLock();
         }
     }
 
     /**
      * 当前是否为最小节点
      *
      * @return
      */
     private  boolean  minNode() {
 
         // 当前序号
         try  {
             initLockPath();
             // 判断前一个节点存在不存在,如果存在,则表示当前节点不是最小节点
             // zk.getData(this.lockPath, this, new Stat());
             zk.getData( this .lockPath,  true new  Stat());
             LOG.info( this .currentPath +  " 不是最小值,没有获取锁,等待 "  this .lockPath +  " 释放锁" );
             return  false ;
         catch  (KeeperException e) {
             LOG.info( this .currentPath +  " 是最小值,获得锁" );
             return  true ;
         catch  (InterruptedException e) {
             LOG.error(e.getMessage(), e);
         }
         return  true ;
     }
 
     private  void  doSomething() {
         LOG.info( "处理业务逻辑..." );
     }
 
     /**
      * 释放锁并关闭连接
      *
      * @throws KeeperException
      * @throws InterruptedException
      */
     private  void  releaseLock()  throws  KeeperException, InterruptedException {
         Thread.sleep( 2000 );
         if  ( this .zk !=  null ) {
             LOG.info( this .currentPath +  " 业务处理完毕,释放锁..." );
             zk.delete( this .currentPath, - 1 );
             this .zk.close();
             LOG.info(Thread.currentThread().getName() +  "关闭 zookeeper 连接" );
         }
         threadFlag.countDown();
     }
 
     /**
      * 初始化 lockpath
      */
     private  void  initLockPath() {
 
         int  currentSeq = Integer.parseInt( this .currentPath.substring(LOCK_PATH.length()));
 
         // 上一个序号
         int  preSeq = currentSeq -  1 ;
 
         String preSeqStr = String.valueOf(preSeq);
         while  (preSeqStr.length() <  10 ) {
             preSeqStr =  "0"  + preSeqStr;
         }
         this .lockPath = LOCK_PATH + preSeqStr;
     }
 
}



参考代码:

https://juejin.im/entry/596438bc6fb9a06bb47495f1


本文转自 豆芽菜橙 51CTO博客,原文链接:http://blog.51cto.com/shangdc/1958619

相关文章
|
2月前
|
消息中间件 分布式计算 资源调度
《聊聊分布式》ZooKeeper与ZAB协议:分布式协调的核心引擎
ZooKeeper是一个开源的分布式协调服务,基于ZAB协议实现数据一致性,提供分布式锁、配置管理、领导者选举等核心功能,具有高可用、强一致和简单易用的特点,广泛应用于Kafka、Hadoop等大型分布式系统中。
|
安全 应用服务中间件 API
微服务分布式系统架构之zookeeper与dubbo-2
微服务分布式系统架构之zookeeper与dubbo-2
|
负载均衡 Java 应用服务中间件
微服务分布式系统架构之zookeeper与dubbor-1
微服务分布式系统架构之zookeeper与dubbor-1
|
12月前
|
存储 SpringCloudAlibaba Java
【SpringCloud Alibaba系列】一文全面解析Zookeeper安装、常用命令、JavaAPI操作、Watch事件监听、分布式锁、集群搭建、核心理论
一文全面解析Zookeeper安装、常用命令、JavaAPI操作、Watch事件监听、分布式锁、集群搭建、核心理论。
【SpringCloud Alibaba系列】一文全面解析Zookeeper安装、常用命令、JavaAPI操作、Watch事件监听、分布式锁、集群搭建、核心理论
|
存储 运维 NoSQL
分布式读写锁的奥义:上古世代 ZooKeeper 的进击
本文作者将介绍女娲对社区 ZooKeeper 在分布式读写锁实践细节上的思考,希望帮助大家理解分布式读写锁背后的原理。
328 11
|
分布式计算 NoSQL Java
Hadoop-32 ZooKeeper 分布式锁问题 分布式锁Java实现 附带案例和实现思路代码
Hadoop-32 ZooKeeper 分布式锁问题 分布式锁Java实现 附带案例和实现思路代码
215 2
|
分布式计算 Hadoop
Hadoop-27 ZooKeeper集群 集群配置启动 3台云服务器 myid集群 zoo.cfg多节点配置 分布式协调框架 Leader Follower Observer
Hadoop-27 ZooKeeper集群 集群配置启动 3台云服务器 myid集群 zoo.cfg多节点配置 分布式协调框架 Leader Follower Observer
268 1
|
存储 SQL 消息中间件
Hadoop-26 ZooKeeper集群 3台云服务器 基础概念简介与环境的配置使用 架构组成 分布式协调框架 Leader Follower Observer
Hadoop-26 ZooKeeper集群 3台云服务器 基础概念简介与环境的配置使用 架构组成 分布式协调框架 Leader Follower Observer
210 0
分布式-Zookeeper-分布式锁
分布式-Zookeeper-分布式锁
|
存储 负载均衡 Dubbo
分布式-Zookeeper(一)
分布式-Zookeeper(一)