内部锁(对象监视器)使用Object类的wait(), notify(), notifyAll()方法来进行线程之间的协作通信;Lock锁引入Condition来支持线程之间通信协作,Condition提供await(), signal(), signalAll()方法实现与内部锁同样的等待与唤醒功能,但与内部锁不同的是一个Lock可以绑定多个Condition,以满足不同条件下唤醒不同线程的功能。
最典型的线程协作例子就是生产者与消费者。如果使用内部锁控制线程通信,所有线程,不管生产者还是消费者,都被同一个对象监视,当新生产的对象放入消费队列后,生产者会唤醒所有的线程,包括其它生产者,这时队列可能已经满了,所以被唤醒的生产者只好继续回头进入等待队列,一直轮询直到唤醒一个消费者才会进行消费,然后继续。因为Lock支持绑定多个Condition,所以如果使用Lock实现,可以为当队列有对象可消费创建一个Condition,当队列有空位创建一个Condition,这样,生产者和消费者可以相互明确告知,而不是像内部锁广播的方式而缺乏明确的目标对象。
简而言之,内部锁的notify()与notifyAll()会唤醒任何一个线程,不管线程的等待条件是否已经满足;Condition则提供了针对不同类型线程定制唤醒条件的实现,减少无谓的唤醒。
另外,经过测试,在多核情况下,一个线程还可能被一些其它的条件唤醒,所以测试结果并不一定会满意,有待细究。
内部锁与Lock方式实现的生产者与消费者代码案例:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
|
import
java.util.LinkedList;
import
java.util.Queue;
import
java.util.concurrent.TimeUnit;
import
java.util.concurrent.locks.Condition;
import
java.util.concurrent.locks.Lock;
import
java.util.concurrent.locks.ReentrantLock;
public
class
TestLockCondition {
private
static
final
int
MAX_SIZE =
3
;
private
static
final
int
PRODUCER_NUM =
5
;
private
static
final
int
CONSUMER_NUM =
1
;
private
Queue<Object> queue =
new
LinkedList<Object>();
private
volatile
boolean
isWorkingTime =
true
;
private
volatile
boolean
isUsingLock =
true
;
private
Lock lock =
new
ReentrantLock();
private
Condition full = lock.newCondition();
private
Condition empty = lock.newCondition();
private
void
produceWithLock()
throws
InterruptedException{
lock.lock();
try
{
while
(queue.size() >= MAX_SIZE){
System.out.println(Thread.currentThread().getName() +
" go wait."
);
full.await();
System.out.println(Thread.currentThread().getName() +
" was awaken."
);
}
queue.offer(
new
Object());
System.out.println(Thread.currentThread().getName() +
" produce one object."
);
empty.signalAll();
//empty.signal();
}
finally
{
lock.unlock();
}
}
private
void
consumeWithLock()
throws
InterruptedException{
lock.lock();
try
{
while
(queue.isEmpty()){
System.out.println(Thread.currentThread().getName() +
" go wait."
);
empty.await();
System.out.println(Thread.currentThread().getName() +
" was awaken."
);
}
queue.poll();
System.out.println(Thread.currentThread().getName() +
" consume one object."
);
full.signalAll();
//full.signal();
}
finally
{
lock.unlock();
}
}
private
synchronized
void
produce()
throws
InterruptedException{
while
(queue.size() >= MAX_SIZE){
System.out.println(Thread.currentThread().getName() +
" go wait."
);
wait();
System.out.println(Thread.currentThread().getName() +
" was awaken."
);
}
queue.offer(
new
Object());
System.out.println(Thread.currentThread().getName() +
" produce one object."
);
//notifyAll();
notify();
}
private
synchronized
void
consume()
throws
InterruptedException{
while
(queue.isEmpty()){
System.out.println(Thread.currentThread().getName() +
" go wait."
);
wait();
System.out.println(Thread.currentThread().getName() +
" was awaken."
);
}
queue.poll();
System.out.println(Thread.currentThread().getName() +
" consume one object."
);
//notifyAll();
notify();
}
public
static
void
main(String[] args)
throws
InterruptedException {
TestLockCondition testLockCondition =
new
TestLockCondition();
Thread[] producers =
new
Thread[PRODUCER_NUM];
for
(
int
i=
0
; i<PRODUCER_NUM; i++){
producers[i] =
new
Thread(testLockCondition.
new
Producer());
producers[i].start();
}
Thread[] consumers =
new
Thread[CONSUMER_NUM];
for
(
int
i=
0
; i<CONSUMER_NUM; i++){
consumers[i] =
new
Thread(testLockCondition.
new
Consumer());
consumers[i].start();
}
TimeUnit.SECONDS.sleep(
10
);
for
(
int
i=
0
; i<PRODUCER_NUM; i++){
System.out.println(producers[i].getName() +
" : "
+ producers[i].getState());
producers[i].interrupt();
}
for
(
int
i=
0
; i<CONSUMER_NUM; i++){
System.out.println(consumers[i].getName() +
" : "
+ consumers[i].getState());
consumers[i].interrupt();
}
testLockCondition.isWorkingTime =
false
;
}
private
class
Consumer
implements
Runnable{
@Override
public
void
run() {
try
{
while
(isWorkingTime){
if
(isUsingLock){
//System.out.println(Thread.currentThread().getName() + " is using Lock.");
consumeWithLock();
}
else
{
consume();
}
}
}
catch
(InterruptedException e) {
e.printStackTrace();
}
}
}
private
class
Producer
implements
Runnable{
@Override
public
void
run() {
try
{
while
(isWorkingTime){
if
(isUsingLock){
//System.out.println(Thread.currentThread().getName() + " is using Lock.");
produceWithLock();
}
else
{
produce();
}
}
}
catch
(InterruptedException e) {
e.printStackTrace();
}
}
}
}
|
本文转自sarchitect 51CTO博客,原文链接http://blog.51cto.com/stevex/1300223,如需转载请自行联系原作者