1 Producer-Consumer Pattern
Producer-Consumer Pattern主要就是在生产者与消费者之间建立一个“桥梁参与者”,用来解决生产者线程与消费者线程之间速度的不匹配。
当要从某个线程Produccer参与者将数据传输给其它线程Consumer参与者的时候,此时就可以在中间加一个Channel参与者,在Channel参与者中以某种方式存放接受的数据,再以某方式来获取收到的数据,Channel就可以来缓存两个线程之间传输的数据,在Channel参与者为了保证安全性,也要用Guarded Suspension Pattern模式。
Channel参与者作为一个中间者,当Channel参与者从Producer参与者接收到数据,可以用三种方式将数据按顺序 传递给Consumer参与者。
1 队列,这是一种按照FIFO的方式存储数据,即最先到达的数据最先传输给Consumer参与者。在Java中,可以利用数组形式来存放,每次从数组下标最前端获取数据,而从数组下标最后端来缓存数据。也可以利用LinkedList来存放,每次缓存数据的时候,利用LinkedList.addLast(obj),每次获取数据的时候利用LinkedList.removeFirst();移除并且返回队列的第一个元素。
2 堆栈,这是一种以LIFO的方式存储数据,即最先到达的数据最后传输给Consumer参与者。在Java中,对于堆栈的实现,可以直接使用LinkedList,利用pop()从栈顶弹出一个数据来获取数据,利用push(obj)来向堆栈中缓存一个数据
3 优先级队列,对于缓存的数据设置一些优先级来存储。
生产者与消费者模式,其实就是线程之间的合作关系,同时又包含了互斥关系。所谓的合作就是生产者生成产品,提供消费者消费。所谓的互斥就是生产者和消费者对于中间的缓冲区是互斥访问的。
实例:
几个厨师制作食物,将物品放置在桌子上,但是桌子放置的盘子有限,消费者可以从桌子上获取食物来吃。当桌子上有空位置的时候,厨师就可以继续放置做好的食物,且通知消费者来吃,但是满了就只能一直等待消费者吃了有空的位置。而消费者每次取食物的时候,如果桌子上面有食物,则就取走,并且通知厨师来做食物,如果没有则就等待。
生产者Producer代码:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
|
package
whut.producer;
import
java.util.Random;
public
class
MakerThread
extends
Thread{
private
final
Table table;
private
final
Random random;
private
static
int
id=
0
;
public
MakerThread(String name,Table table,
long
seed)
{
super
(name);
this
.table=table;
this
.random=
new
Random(seed);
}
public
void
run()
{
try
{
while
(
true
)
{
Thread.sleep(random.nextInt(
1000
));
String cake=
" [Cake No."
+nextId()+
" by "
+Thread.currentThread().getName()+
"]"
;
table.put(cake);
}
}
catch
(InterruptedException e)
{
}
}
//为了使得所有实例共享该字段
public
static
synchronized
int
nextId()
{
return
id++;
}
}
|
消费者Consumer代码:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
|
package
whut.producer;
import
java.util.Random;
public
class
EaterThread
extends
Thread{
private
final
Table table;
private
final
Random random;
public
EaterThread(String name,Table table,
long
seed)
{
super
(name);
this
.table=table;
this
.random=
new
Random(seed);
}
public
void
run()
{
try
{
while
(
true
)
{
String cake=table.take();
Thread.sleep(random.nextInt(
1000
));
}
}
catch
(InterruptedException e)
{
}
}
}
|
Channel中间缓冲区,关键部分
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
|
package
whut.producer;
public
class
Table {
private
final
String[] cakes;
//利用数组来作为缓冲区
private
int
head;
//下一次蛋糕取的位置
private
int
tail;
//下一次蛋糕放置位置
private
int
count;
//桌子上蛋糕的总数
public
Table(
int
count)
{
this
.cakes=
new
String[count];
this
.head=
0
;
this
.tail=
0
;
this
.count=
0
;
}
public
synchronized
void
put(String cake)
throws
InterruptedException
{
System.out.println(Thread.currentThread().getName()+
" puts "
+cake);
while
(count>=cakes.length)
{
System.out.println(Thread.currentThread().getName()+
" Begin wait...."
);
wait();
System.out.println(Thread.currentThread().getName()+
" End wait...."
);
}
cakes[tail]=cake;
tail=(tail+
1
)%cakes.length;
count++;
notifyAll();
}
//取蛋糕
public
synchronized
String take()
throws
InterruptedException
{
while
(count<=
0
)
{
System.out.println(Thread.currentThread().getName()+
" Begin wait...."
);
wait();
System.out.println(Thread.currentThread().getName()+
" End wait...."
);
}
String cake=cakes[head];
head=(head+
1
)%cakes.length;
count--;
notifyAll();
System.out.println(Thread.currentThread().getName()+
" gets "
+cake);
return
cake;
}
}
|
测试类:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
|
package
whut.producer;
public
class
MainTest {
public
static
void
main(String[] args) {
// TODO Auto-generated method stub
Table table=
new
Table(
3
);
new
MakerThread(
"MakerThread-1"
,table,
31415
).start();
new
MakerThread(
"MakerThread-2"
,table,
92653
).start();
new
MakerThread(
"MakerThread-3"
,table,
58979
).start();
new
EaterThread(
"EaterThread-1"
,table,
32384
).start();
new
EaterThread(
"EaterThread-2"
,table,
32384
).start();
new
EaterThread(
"EaterThread-3"
,table,
32384
).start();
//可以通过调用interrupt来去中断结束任何线程
}
}
|