【RabbitMQ 服务器】
1
2
3
4
5
6
|
# 在 vhosttest 里面有 exchangetest 和 queuetest 通过 rkeytest 绑定
Broker: 192.168.0.xx
virtual host: vhosttest
Exchange: exchangetest
Queue: queuetest
Routing key: rkeytest
|
【Python 环境】
1
2
3
|
OS: Windows 10
Python: 3.6.3 x64
pika: 0.11.2
|
【查看队列状态】
1
2
3
4
5
6
7
8
9
|
# 通过浏览器查看队列状态
http:
//192
.168.0.xx:15672
/api/queues/vhosttest/queuetest
# 通过命令行查看队列状态
curl -u user:password http:
//192
.168.0.xx:15672
/api/queues/vhosttest/queuetest
| jq
# 通过命令行查看队列长度
curl -s -u user:password http:
//192
.168.0.xx:15672
/api/queues/vhosttest/queuetest
| \
jq
'.backing_queue_status.len'
|
【send.py】
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
|
#encoding: utf-8
#author: walker
#date: 2018-01-31
#summary: 发送方/生产者
import
os, sys, time
import
pika
def
Main():
credentials
=
pika.PlainCredentials(
"test"
,
"test"
)
parameters
=
pika.ConnectionParameters(host
=
"192.168.0.xx"
,
virtual_host
=
'vhosttest'
,
credentials
=
credentials)
connection
=
pika.BlockingConnection(parameters)
# 连接 RabbitMQ
channel
=
connection.channel()
# 创建频道
queue
=
channel.queue_declare(queue
=
'queuetest'
)
# 声明或创建队列
while
True
:
# 循环向队列中发送信息
message
=
time.strftime(
'%H:%M:%S'
, time.localtime())
channel.basic_publish(exchange
=
'exchangetest'
,
routing_key
=
'rkeytest'
,
body
=
message)
print
(
'send message: %s'
%
message)
while
True
:
# 检查队列,以重新得到消息计数
queue
=
channel.queue_declare(queue
=
'queuetest'
, passive
=
True
)
messageCount
=
queue.method.message_count
print
(
'messageCount: %d'
%
messageCount)
if
messageCount <
100
:
break
time.sleep(
1
)
# 关闭连接
connection.close()
if
__name__
=
=
'__main__'
:
Main()
|
【recv.py】
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
|
#encoding: utf-8
#author: walker
#date: 2018-01-31
#summary: 接收方/消费者
import
os, sys, time
import
pika
# 接收处理消息的回调函数
def
ConsumerCallback (channel, method, properties, body):
print
(
"Received %s"
%
body)
def
Main():
credentials
=
pika.PlainCredentials(
"test"
,
"test"
)
parameters
=
pika.ConnectionParameters(host
=
"192.168.0.xx"
,
virtual_host
=
'vhosttest'
,
credentials
=
credentials)
connection
=
pika.BlockingConnection(parameters)
# 连接 RabbitMQ
channel
=
connection.channel()
# 创建频道
queue
=
channel.queue_declare(queue
=
'queuetest'
)
# 声明或创建队列
# no_ack=True 开启自动确认,不然消费后的消息会一直留在队列里面
# no_ack = no_manual_ack = auto_ack;不手动应答,开启自动应答模式
channel.basic_consume(ConsumerCallback, queue
=
'queuetest'
, no_ack
=
True
)
print
(
'Wait Message ...'
)
channel.start_consuming()
if
__name__
=
=
'__main__'
:
Main()
|
【相关阅读】
本文转自walker snapshot博客51CTO博客,原文链接http://blog.51cto.com/walkerqt/2067244如需转载请自行联系原作者
RQSLT