分布式锁有很多,redis也可以实现分布式锁,
http://shangdc.blog.51cto.com/10093778/1914852(查看redis的分布式锁)
zookeeper分布式锁步骤:
1、zookeeper是一个带有节点的,类似于文件目录,所以我们把锁抽象成目录,zookeeper有一个EPHEMERAL_SEQUENTIAL类型的节点, 多个线程再zookeeper创建的节点的时候,它会帮我们安排好顺序进行创建,所以这个节点下的目录都是顺序的。
2、获取当前目录的最小的节点,判断最小节点是不是当前的自己的节点,如果是说明获取锁成功了,如果不是获取锁失败了。
3、当获取锁的时候失败了,为了避免惊群效应,你要做的就是获取当前自己的节点的上一个节点,然后对该节点进行监听,当上一个节点删除的时候,会触发这个监听,通知该节点。
4、这么做,释放锁的时候,也会通知下一个节点。
什么是惊群效应:理解为肉少狼多,当一个节点删除的时候,凡是订阅了此节点的watcha的监听都会重新获取锁,都要去争夺,如果数量少还好,当数量很大的时候这种设计就是不合理也是浪费资源。
zookeeper的状态和事件类型,提前了解一下。
1
2
3
4
5
6
7
8
9
10
11
12
13
|
状态 KeeperState.Disconnected (
0
) 断开
* KeeperState.SyncConnected (
3
) 同步连接状态
* KeeperState.AuthFailed (
4
) 认证失败状态
* KeeperState.ConnectedReadOnly (
5
) 只读连接状态
* KeeperState.SaslAuthenticated (
6
) SASL认证通过状态
* KeeperState.Expired (-
112
) 过期状态
*
*
// EventType 是事件类型 主要关注 Create Delete DataChanged ChildrenChanged
* EventType.None (-
1
), 无
* EventType.NodeCreated (
1
),
* EventType.NodeDeleted (
2
),
* EventType.NodeDataChanged (
3
), 结点数据变化
* EventType.NodeChildrenChanged (
4
); 结点子节点变化
|
下面是代码,自己敲下,理解一下。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
|
package
com.lhcis.spider.system.annotation;
import
java.io.IOException;
import
java.io.UnsupportedEncodingException;
import
java.util.concurrent.CountDownLatch;
import
org.apache.zookeeper.CreateMode;
import
org.apache.zookeeper.KeeperException;
import
org.apache.zookeeper.WatchedEvent;
import
org.apache.zookeeper.Watcher;
import
org.apache.zookeeper.ZooDefs;
import
org.apache.zookeeper.ZooKeeper;
import
org.apache.zookeeper.data.Stat;
import
org.slf4j.Logger;
import
org.slf4j.LoggerFactory;
/**
* @author sdc
*
*/
public
class
ZooDistributeLock
implements
Watcher {
private
static
final
Logger LOG = LoggerFactory.getLogger(ZooDistributeLock.
class
);
private
static
final
String LOCK_PATH =
"/zkLock"
;
// 模拟开启的线程数
private
static
final
int
THREAD_NUM =
5
;
// 用于等待所有线程都连接成功后再执行任务
private
static
CountDownLatch startFlag =
new
CountDownLatch(
1
);
// 用于确保所有线程执行完毕
private
static
CountDownLatch threadFlag =
new
CountDownLatch(THREAD_NUM);
private
ZooKeeper zk =
null
;
private
String currentPath;
private
String lockPath;
public
static
void
main(String[] args) {
for
(
int
i =
0
; i < THREAD_NUM; i++) {
final
int
j = i;
new
Thread() {
@Override
public
void
run() {
ZooDistributeLock zooDistributeLock =
new
ZooDistributeLock();
try
{
zooDistributeLock.connection();
System.out.println(
"连接"
+ j);
zooDistributeLock.createNode();
System.out.println(
"创建"
+ j);
zooDistributeLock.getLock();
System.out.println(
"获取锁"
+ j);
}
catch
(IOException | InterruptedException | KeeperException e) {
e.printStackTrace();
}
}
}.start();
}
try
{
threadFlag.await();
LOG.info(
"所有线程执行完毕..."
);
}
catch
(InterruptedException e) {
LOG.error(e.getMessage(), e);
}
}
/**
* Disconnected为网络闪断时触发的事件,当然其他的拔掉网线、kill zookeeper server ,kill zk
* connection也会触发该事件。 SyncConnected为client端重新选择下一个zk
* server连接触发的事件,此时watcher有效,也就是能正常感知
* Expired为客户端重新连server时,服务端发现该session超过了设定的时长,返回给client
* Expired,此时watcher失效,也就是不能正常感知
*/
@Override
public
void
process(WatchedEvent event) {
Event.KeeperState state = event.getState();
Event.EventType type = event.getType();
if
(Event.KeeperState.SyncConnected == state) {
if
(Event.EventType.None == type) {
// 标识连接成功
LOG.info(
"成功连接上ZK服务器"
);
startFlag.countDown();
}
if
(Event.EventType.NodeDeleted == type && event.getPath().equals(
this
.lockPath)) {
LOG.info(
"node:"
+
this
.lockPath +
"的锁已经被释放"
);
try
{
// 上一个节点释放了,当前节点去获取锁
getLock();
}
catch
(KeeperException | InterruptedException e) {
LOG.error(e.getMessage(), e);
}
}
}
}
/**
* 连接到 ZK
*
* @throws IOException
*/
private
void
connection()
throws
IOException, InterruptedException {
zk =
new
ZooKeeper(
"127.0.0.1:2181"
,
5000
,
this
);
// 等待连接成功后再执行下一步操作
startFlag.await();
}
// 创建节点,并初始化当前路径
private
void
createNode()
throws
KeeperException, InterruptedException, UnsupportedEncodingException {
this
.currentPath =
this
.zk.create(LOCK_PATH,
""
.getBytes(
"UTF-8"
), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
}
private
void
getLock()
throws
KeeperException, InterruptedException {
if
(minNode()) {
doSomething();
// 释放锁
releaseLock();
}
}
/**
* 当前是否为最小节点
*
* @return
*/
private
boolean
minNode() {
// 当前序号
try
{
initLockPath();
// 判断前一个节点存在不存在,如果存在,则表示当前节点不是最小节点
// zk.getData(this.lockPath, this, new Stat());
zk.getData(
this
.lockPath,
true
,
new
Stat());
LOG.info(
this
.currentPath +
" 不是最小值,没有获取锁,等待 "
+
this
.lockPath +
" 释放锁"
);
return
false
;
}
catch
(KeeperException e) {
LOG.info(
this
.currentPath +
" 是最小值,获得锁"
);
return
true
;
}
catch
(InterruptedException e) {
LOG.error(e.getMessage(), e);
}
return
true
;
}
private
void
doSomething() {
LOG.info(
"处理业务逻辑..."
);
}
/**
* 释放锁并关闭连接
*
* @throws KeeperException
* @throws InterruptedException
*/
private
void
releaseLock()
throws
KeeperException, InterruptedException {
Thread.sleep(
2000
);
if
(
this
.zk !=
null
) {
LOG.info(
this
.currentPath +
" 业务处理完毕,释放锁..."
);
zk.delete(
this
.currentPath, -
1
);
this
.zk.close();
LOG.info(Thread.currentThread().getName() +
"关闭 zookeeper 连接"
);
}
threadFlag.countDown();
}
/**
* 初始化 lockpath
*/
private
void
initLockPath() {
int
currentSeq = Integer.parseInt(
this
.currentPath.substring(LOCK_PATH.length()));
// 上一个序号
int
preSeq = currentSeq -
1
;
String preSeqStr = String.valueOf(preSeq);
while
(preSeqStr.length() <
10
) {
preSeqStr =
"0"
+ preSeqStr;
}
this
.lockPath = LOCK_PATH + preSeqStr;
}
}
|
参考代码:
https://juejin.im/entry/596438bc6fb9a06bb47495f1
本文转自 豆芽菜橙 51CTO博客,原文链接:http://blog.51cto.com/shangdc/1958619