消息队列
消息队列是在消息的传输过程中保存消息的容器
消息队列最经典的用法就是消费者 和生产者之间通过消息管道传递消息,消费者和生成者是不同的进程。生产者往管道写消息,消费者从管道中读消息
操作系统提供了很多机制来实现进程间的通信,multiprocessing模块提供了Queue和Pipe两种方法来实现
一、使用multiprocessing里面的Queue来实现消息队列
q = Queue()
q.put(data) #生产消息
data = q.get() #消费消息
例子:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
|
from
multiprocessing
import
Queue, Process
def
write(q):
for
i
in
[
"a"
,
"b"
,
"c"
,
"d"
]:
q.put(i)
print
(
"put {0} to queue"
.
format
(i))
def
read(q):
while
1
:
result
=
q.get()
print
(
"get {0} from queue"
.
format
(result))
def
main():
q
=
Queue()
#定义一个消息队列容器
pw
=
Process(target
=
write,args
=
(q,))
#定义一个写的进程
pr
=
Process(target
=
read,args
=
(q,))
#定义一个读的进程
pw.start()
#启动进程
pr.start()
pw.join()
pr.terminate()
if
__name__
=
=
"__main__"
:
main()
|
运行结果:
put a to queue
put b to queueget a from queue
get b from queue
put c to queue
put d to queue
get c from queue
get d from queue
二、通过Multiprocessing里面的Pipe来实现消息队列
1)Pipe方法返回(conn1,conn2)代表一个管道的两个端。Pipe方法有duplex参数,如果duplux参数为True(默认值),那么这个管道是全双工模式,即conn1和conn2均可收发。duplux为False,conn1负责接收消息,conn2负责发行消息
2)send和recv方法分别是发送和接收消息的方法。close方法表示关闭管道,当消息接收结束以后,关闭管道。
例子:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
|
from
multiprocessing
import
Process,Pipe
import
time
def
proc1(pipe):
for
i
in
xrange
(
1
,
10
):
pipe.send(i)
time.sleep(
3
)
print
(
"send {0} to pipe"
.
format
(i))
def
proc2(pipe):
n
=
9
while
n>
0
:
result
=
pipe.recv()
time.sleep(
3
)
print
(
"recv {0} from pipe"
.
format
(result))
n
-
=
1
if
__name__
=
=
"__main__"
:
pipe
=
Pipe(duplex
=
False
)
#定义并实例化一个管道
print
(
type
(pipe))
p1
=
Process(target
=
proc1,args
=
(pipe[
1
],))
#pipe[1],管道的右边,表示进入端,发送数据
p2
=
Process(target
=
proc2,args
=
(pipe[
0
],))
#pipe[0],管道的左边,表示出口端,接收数据
p1.start()
p2.start()
p1.join()
p2.join()
pipe[
0
].close()
pipe[
1
].close()
|
运行结果:
<type 'tuple'>
send 1 to pipe
recv 1 from pipe
send 2 to pipe
recv 2 from pipe
recv 3 from pipe
send 3 to pipe
send 4 to piperecv 4 from pipe
send 5 to pipe
recv 5 from pipe
recv 6 from pipe
send 6 to pipe
send 7 to pipe
recv 7 from pipe
send 8 to pipe
recv 8 from pipe
recv 9 from pipesend 9 to pipe
三、Queue模块
python提供了Queue模块来专门实现消息队列:
Queue对象实现一个fifo队列(其他的还有lifo、priority队列)。queue只有gsize一个构造函数,用来指定队列容量,指定为0的时候代表容量无限。主要有以下成员函数:
Queue.gsize():返回消息队列的当前空间。返回的值不一定可靠。
Queue.empty():判断消息队列是否为空,返回True或者False。同样不可靠
Queue.full():判断消息是否满
Queue.put(item,block=True,timeout=None):往消息队列中存放数据。block可以控制是否阻塞,timeout控制阻塞时候的等待时间。如果不阻塞或者超时,会引起一个full exception。
Queue.put_nowait(item):相当于put(item,False)
Queue.get(block=True,timeout=None):获取一个消息,其他等同put
以下两个函数用来判断消息对应的任务是否完成:
Queue.task_done():接收消息的线程通过调用这个函来说明消息对应的任务已完成
Queue.join():实际上意味着等到队列为空,再执行别的操作
例子:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
|
from
multiprocessing
import
Process, Pipe, Queue
import
time
from
threading
import
Thread
class
Proceduer(Thread):
def
__init__(
self
,queue):
super
(Proceduer,
self
).__init__()
# 超类
self
.queue
=
queue
#将queue赋给self.queue,便于类中其他函数调用
def
run(
self
):
try
:
for
i
in
xrange
(
1
,
10
):
print
(
"put data is: {0} to queue"
.
format
(i))
self
.queue.put(i)
except
Exception as e:
print
(
"put data error"
)
raise
e
class
Consumer_odd(Thread):
def
__init__(
self
,queue):
super
(Consumer_odd,
self
).__init__()
self
.queue
=
queue
def
run(
self
):
try
:
while
self
.queue.empty:
#判断消息队列是否为空
number
=
self
.queue.get()
#取到消息值
if
number
%
2
!
=
0
:
print
(
"get {0} from queue ODD"
.
format
(number))
else
:
self
.queue.put(number)
#将信息放回队列中
time.sleep(
1
)
except
Exception as e:
raise
e
class
Consumer_even(Thread):
def
__init__(
self
,queue):
super
(Consumer_even,
self
).__init__()
self
.queue
=
queue
def
run(
self
):
try
:
while
self
.queue.empty:
number
=
self
.queue.get()
if
number
%
2
=
=
0
:
print
(
"get {0} from queue Even,thread name is :{1}"
.
format
(number,
self
.getName()))
else
:
self
.queue.put(number)
time.sleep(
1
)
except
Exception as e:
raise
e
def
main():
queue
=
Queue()
#实例化一个消息队列
p
=
Proceduer(queue
=
queue)
#消息队列作为参数赋值给生产者函数,并实例化
p.start()
#启动一个带消息队列的函数
p.join()
#等待结束
time.sleep(
1
)
c1
=
Consumer_odd(queue
=
queue)
#消息队列作为参数赋值给消费者函数,并实例化
c2
=
Consumer_even(queue
=
queue)
#消息队列作为参数赋值给消费者函数,并实例化
c1.start()
c2.start()
c1.join()
c2.join()
print
(
"All threads terminate!"
)
if
__name__
=
=
"__main__"
:
main()
|
运行结果:
put data is: 1 to queue
put data is: 2 to queue
put data is: 3 to queue
put data is: 4 to queue
put data is: 5 to queue
put data is: 6 to queue
put data is: 7 to queue
put data is: 8 to queue
put data is: 9 to queue
get 1 from queue ODD
get 3 from queue ODD
get 4 from queue Even,thread name is :Thread-3
get 5 from queue ODD
get 7 from queue ODD
get 9 from queue ODD
get 2 from queue Even,thread name is :Thread-3
get 6 from queue Even,thread name is :Thread-3
get 8 from queue Even,thread name is :Thread-3
例子2:
1
2
3
4
5
6
7
8
9
|
import
Queue
q
=
Queue.Queue()
for
i
in
range
(
5
):
q.put(i)
while
not
q.empty():
print
q.get()
|
运行结果:
0
1
2
3
4