二、RabbitMq 生产和消费
生产者(producter):队列消息的产生者,负责生产消息,并将消息传入队列
复制代码
import pika
import json
credentials = pika.PlainCredentials('shampoo', '123456') # mq用户名和密码
虚拟队列需要指定参数 virtual_host,如果是默认的可以不填。
connection = pika.BlockingConnection(pika.ConnectionParameters(host = '10.1.62.170',port = 5672,virtual_host = '/',credentials = credentials))
//代码效果参考:https://v.youku.com/v_show/id_XNjQwNjgyNjU2MA==.html
channel=connection.channel()
声明消息队列,消息将在这个队列传递,如不存在,则创建
result = channel.queue_declare(queue = 'python-test')
//代码效果参考:https://v.youku.com/v_show/id_XNjQwNjgyNjU0MA==.html
for i in range(10):
message=json.dumps({'OrderId':"1000%s"%i})
向队列插入数值 routing_key是队列名
channel.basic_publish(exchange = '',routing_key = 'python-test',body = message)
print(message)
connection.close()
复制代码
消费者(consumer):队列消息的接收者,负责 接收并处理 消息队列中的消息
复制代码
import pika
credentials = pika.PlainCredentials('shampoo', '123456')
connection = pika.BlockingConnection(pika.ConnectionParameters(host = '10.1.62.170',port = 5672,virtual_host = '/',credentials = credentials))
channel = connection.channel()
申明消息队列,消息在这个队列传递,如果不存在,则创建队列
channel.queue_declare(queue = 'python-test', durable = False)
定义一个回调函数来处理消息队列中的消息,这里是打印出来
def callback(ch, method, properties, body):
ch.basic_ack(delivery_tag = method.delivery_tag)
print(body.decode())
告诉rabbitmq,用callback来接收消息
channel.basic_consume('python-test',callback)
开始接收信息,并进入阻塞状态,队列里有信息才会调用callback进行处理
channel.start_consuming()
复制代码
三、RabbitMq 持久化
MQ默认建立的是临时 queue 和 exchange,如果不声明持久化,一旦 rabbitmq 挂掉,queue、exchange 将会全部丢失。所以我们一般在创建 queue 或者 exchange 的时候会声明 持久化。
1.queue 声明持久化
声明消息队列,消息将在这个队列传递,如不存在,则创建。durable = True 代表消息队列持久化存储,False 非持久化存储
result = channel.queue_declare(queue = 'python-test',durable = True)
2.exchange 声明持久化
声明exchange,由exchange指定消息在哪个队列传递,如不存在,则创建.durable = True 代表exchange持久化存储,False 非持久化存储
channel.exchange_declare(exchange = 'python-test', durable = True)
注意:如果已存在一个非持久化的 queue 或 exchange ,执行上述代码会报错,因为当前状态不能更改 queue 或 exchange 存储属性,需要删除重建。如果 queue 和 exchange 中一个声明了持久化,另一个没有声明持久化,则不允许绑定。
3.消息持久化
虽然 exchange 和 queue 都申明了持久化,但如果消息只存在内存里,rabbitmq 重启后,内存里的东西还是会丢失。所以必须声明消息也是持久化,从内存转存到硬盘。
向队列插入数值 routing_key是队列名。delivery_mode = 2 声明消息在队列中持久化,delivery_mod = 1 消息非持久化
channel.basic_publish(exchange = '',routing_key = 'python-test',body = message,
properties=pika.BasicProperties(delivery_mode = 2))
4.acknowledgement 消息不丢失
消费者(consumer)调用callback函数时,会存在处理消息失败的风险,如果处理失败,则消息丢失。但是也可以选择消费者处理失败时,将消息回退给 rabbitmq ,重新再被消费者消费,这个时候需要设置确认标识。
channel.basic_consume(callback,queue = 'python-test',
no_ack 设置成 False,在调用callback函数时,未收到确认标识,消息会重回队列。True,无论调用callback成功与否,消息都被消费掉
no_ack = False)