python 操作RabbitMq详细操作分享

简介: python 操作RabbitMq详细操作分享

二、RabbitMq 生产和消费
生产者(producter):队列消息的产生者,负责生产消息,并将消息传入队列

复制代码
import pika
import json

credentials = pika.PlainCredentials('shampoo', '123456') # mq用户名和密码

虚拟队列需要指定参数 virtual_host,如果是默认的可以不填。

connection = pika.BlockingConnection(pika.ConnectionParameters(host = '10.1.62.170',port = 5672,virtual_host = '/',credentials = credentials))

//代码效果参考:https://v.youku.com/v_show/id_XNjQwNjgyNjU2MA==.html
channel=connection.channel()

声明消息队列,消息将在这个队列传递,如不存在,则创建

result = channel.queue_declare(queue = 'python-test')
//代码效果参考:https://v.youku.com/v_show/id_XNjQwNjgyNjU0MA==.html

for i in range(10):
message=json.dumps({'OrderId':"1000%s"%i})

向队列插入数值 routing_key是队列名

channel.basic_publish(exchange = '',routing_key = 'python-test',body = message)
print(message)

connection.close()
复制代码

消费者(consumer):队列消息的接收者,负责 接收并处理 消息队列中的消息

复制代码
import pika

credentials = pika.PlainCredentials('shampoo', '123456')
connection = pika.BlockingConnection(pika.ConnectionParameters(host = '10.1.62.170',port = 5672,virtual_host = '/',credentials = credentials))
channel = connection.channel()

申明消息队列,消息在这个队列传递,如果不存在,则创建队列

channel.queue_declare(queue = 'python-test', durable = False)

定义一个回调函数来处理消息队列中的消息,这里是打印出来

def callback(ch, method, properties, body):
ch.basic_ack(delivery_tag = method.delivery_tag)
print(body.decode())

告诉rabbitmq,用callback来接收消息

channel.basic_consume('python-test',callback)

开始接收信息,并进入阻塞状态,队列里有信息才会调用callback进行处理

channel.start_consuming()
复制代码

三、RabbitMq 持久化
MQ默认建立的是临时 queue 和 exchange,如果不声明持久化,一旦 rabbitmq 挂掉,queue、exchange 将会全部丢失。所以我们一般在创建 queue 或者 exchange 的时候会声明 持久化。

1.queue 声明持久化

声明消息队列,消息将在这个队列传递,如不存在,则创建。durable = True 代表消息队列持久化存储,False 非持久化存储

result = channel.queue_declare(queue = 'python-test',durable = True)
2.exchange 声明持久化

声明exchange,由exchange指定消息在哪个队列传递,如不存在,则创建.durable = True 代表exchange持久化存储,False 非持久化存储

channel.exchange_declare(exchange = 'python-test', durable = True)
注意:如果已存在一个非持久化的 queue 或 exchange ,执行上述代码会报错,因为当前状态不能更改 queue 或 exchange 存储属性,需要删除重建。如果 queue 和 exchange 中一个声明了持久化,另一个没有声明持久化,则不允许绑定。

3.消息持久化

虽然 exchange 和 queue 都申明了持久化,但如果消息只存在内存里,rabbitmq 重启后,内存里的东西还是会丢失。所以必须声明消息也是持久化,从内存转存到硬盘。

向队列插入数值 routing_key是队列名。delivery_mode = 2 声明消息在队列中持久化,delivery_mod = 1 消息非持久化

channel.basic_publish(exchange = '',routing_key = 'python-test',body = message,
                      properties=pika.BasicProperties(delivery_mode = 2))

4.acknowledgement 消息不丢失

消费者(consumer)调用callback函数时,会存在处理消息失败的风险,如果处理失败,则消息丢失。但是也可以选择消费者处理失败时,将消息回退给 rabbitmq ,重新再被消费者消费,这个时候需要设置确认标识。

channel.basic_consume(callback,queue = 'python-test',

no_ack 设置成 False,在调用callback函数时,未收到确认标识,消息会重回队列。True,无论调用callback成功与否,消息都被消费掉

                  no_ack = False)
相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
5月前
|
消息中间件 存储 缓存
Python之RabbitMQ操作
Python之RabbitMQ操作
|
6月前
|
消息中间件 C语言 RocketMQ
消息队列 MQ操作报错合集之出现"Connection reset by peer"的错误,该如何处理
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
|
2月前
|
存储 算法 安全
FreeMQTT:一款Python语言实现的开源MQTT Server
FreeMQTT 是一款用 Python 语言并基于 Tornado 开发的开源 MQTT 服务器,支持 MQTT3.1.1 和 MQTT5.0 协议,提供多租户安全隔离、高效 Topic 匹配算法及实时上下线通知等功能,适用于 IoT 场景。快速启动仅需克隆仓库、安装依赖并运行服务。
|
6月前
|
存储 监控 数据处理
💻Python高手必备!文件系统操作秘籍,让你的数据存取如臂使指
【7月更文挑战第29天】在数据驱动时代, Python以简洁语法、丰富库生态和强大跨平台能力, 成为数据科学等领域首选。本文探讨Python文件系统操作秘籍, 助力高效数据处理。
56 11
|
6月前
|
监控 网络协议 网络安全
SMTP操作使用详解并通过python进行smtp邮件发送示例
SMTP操作使用详解并通过python进行smtp邮件发送示例
173 3
|
6月前
|
数据挖掘 数据处理 Python
🔍深入Python系统编程腹地:文件系统操作与I/O管理,打造高效数据处理流水线
【7月更文挑战第29天】深入Python系统编程腹地:文件系统操作与I/O管理,打造高效数据处理流水线
44 3
|
6月前
|
安全 数据安全/隐私保护 Python
|
6月前
|
Serverless 语音技术 开发工具
函数计算操作报错合集之怎么何集成nls tts python sdk
在使用函数计算服务(如阿里云函数计算)时,用户可能会遇到多种错误场景。以下是一些常见的操作报错及其可能的原因和解决方法,包括但不限于:1. 函数部署失败、2. 函数执行超时、3. 资源不足错误、4. 权限与访问错误、5. 依赖问题、6. 网络配置错误、7. 触发器配置错误、8. 日志与监控问题。
|
6月前
|
API Python
Python高手修炼手册:精通文件系统操作,掌控I/O管理,提升编程效率
【7月更文挑战第30天】在 Python 编程中, 文件系统操作与 I/O 管理是连接程序与数据的关键。初学者常因路径错误和权限问题受挫, 而高手能自如管理文件。传统 `os` 和 `os.path` 模块易出错, `pathlib` 提供了更直观的对象导向 API。I/O 方面, 同步操作会阻塞程序, 异步 (如使用 `aiofiles`) 则能大幅提升并发能力。真正的高手不仅掌握 API, 更能预见性能瓶颈并优化代码, 实现高效与优雅。
50 1