分布式锁简单封装
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
|
package
com.hellojd.cloud.locks.v1;
import
org.apache.zookeeper.*;
import
java.io.IOException;
import
java.util.concurrent.CountDownLatch;
import
static
org.apache.zookeeper.Watcher.Event.KeeperState.SyncConnected;
/**
* 最简单分布式锁实现方案
*/
public
class
ZookerSession
implements
Watcher {
private
CountDownLatch countDownLatch=
new
CountDownLatch(
1
);
private
ZooKeeper zooKeeper;
private
static
class
Singleton{
private
static
ZookerSession instance;
static
{
instance =
new
ZookerSession();
}
public
static
ZookerSession getInstance(){
return
instance;
}
}
public
static
ZookerSession getInstance(){
return
Singleton.getInstance();
}
public
ZookerSession() {
try
{
this
.zooKeeper =
new
ZooKeeper(
"192.168.0.10:2181"
,
50000
,
this
);
countDownLatch.await();
System.out.println(
"state: "
+zooKeeper.getState());
System.out.println(
"connection estalished!!"
);
}
catch
(IOException e) {
e.printStackTrace();
}
catch
(InterruptedException e) {
e.printStackTrace();
}
}
public
ZooKeeper getZooKeeper(){
return
this
.zooKeeper;}
@Override
public
void
process(WatchedEvent event) {
if
(event.getState()==SyncConnected){
countDownLatch.countDown();
}
}
public
void
releaseDistributeLock(Long id){
String path =
"/lock_"
+id;
try
{
zooKeeper.delete(path,-
1
);
}
catch
(InterruptedException e) {
e.printStackTrace();
}
catch
(KeeperException e) {
e.printStackTrace();
}
}
public
void
acquireDistributeLock(Long id){
String path =
"/lock_"
+id;
try
{
zooKeeper.create(path,
null
, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
// System.out.println("success to acquire lock "+id);
}
catch
(KeeperException e) {
//没有获取到锁
int
count=
0
;
while
(
true
){
try
{
Thread.sleep(
200
);
zooKeeper.create(path,
null
, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
}
catch
(InterruptedException e1) {
count++;
continue
;
}
catch
(KeeperException e1) {
// e1.printStackTrace();
count++;
continue
;
}
// System.out.println("success to acquire lock "+id);
break
;
}
}
catch
(InterruptedException e) {
e.printStackTrace();
}
}
}
|
2.测试线程
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
|
package
com.hellojd.cloud.locks.v1;
import
org.apache.zookeeper.CreateMode;
import
org.apache.zookeeper.KeeperException;
import
org.apache.zookeeper.ZooDefs;
import
org.apache.zookeeper.data.Stat;
/**
* 测试线程
*/
public
class
Task
implements
Runnable {
private
int
loop;
private
Long lockid;
ZookerSession zs;
public
Task(
int
loop,Long lockid) {
zs = ZookerSession.getInstance();
this
.loop = loop;
this
.lockid = lockid;
}
@Override
public
void
run() {
ZookerSession.getInstance().acquireDistributeLock(lockid);
for
(
int
i=
0
;i<loop;i++){
int
count = getcount();
System.out.println(Thread.currentThread().getName()+
"-->"
+count);
count(++count);
}
zs.releaseDistributeLock(lockid);
}
//计数
public
void
count(
int
count){
try
{
Stat s = zs.getZooKeeper().exists(
"/count"
,
false
);
if
(s==
null
){
System.out.println(
"count 不存在"
);
zs.getZooKeeper().create(
"/count"
,String.valueOf(count).getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
}
else
{
zs.getZooKeeper().setData(
"/count"
,String.valueOf(count).getBytes(),-
1
);
}
}
catch
(KeeperException e) {
e.printStackTrace();
}
catch
(InterruptedException e) {
e.printStackTrace();
}
}
public
int
getcount(){
try
{
Stat s = zs.getZooKeeper().exists(
"/count"
,
false
);
if
(s==
null
){
zs.getZooKeeper().create(
"/count"
,String.valueOf(
0
).getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
return
0
;
}
else
{
byte
[] data = zs.getZooKeeper().getData(
"/count"
,
false
,
null
);
return
Integer.valueOf(
new
String(data));
}
}
catch
(KeeperException e) {
e.printStackTrace();
}
catch
(InterruptedException e) {
e.printStackTrace();
}
return
-
1
;
}
public
static
void
main(String[] args) {
long
LOCKID = 2l;
for
(
int
i=
0
;i<
100
;i++){
new
Thread(
new
Task(
1
, LOCKID)).start();
}
}
}
|
原理:分布式锁通过死循环+实现等待效果。如果线程可以创建节点,则成功获取锁;
释放锁:就是将数据节点在ZK上删除。
缺点:简单且性能低。有风险。如果获取锁的线程中断,导致任务异常。
本文转自 randy_shandong 51CTO博客,原文链接:http://blog.51cto.com/dba10g/1975120,如需转载请自行联系原作者