Python之RabbitMQ操作

简介: Python之RabbitMQ操作

RabbitMQ是一个消息代理,从“生产者”接收消息并传递消息至“消费者”,期间可根据规则路由、缓存、持久化消息。“生产者”也即message发送者以下简称P,相对应的“消费者”乃message接收者以下简称C,message通过queue由P到C,queue存在于RabbitMQ,可存储尽可能多的message,多个P可向同一queue发送message,多个C可从同一个queue接收message。

实现的协议:AMQP。

 

术语(Jargon)

 

P,Producing,制造和发送信息的一方。

Queue,消息队列。

C,Consuming,接收消息的一方。

 

RabbitMQ安装

1 安装配置epel源
2    $ rpm -ivh http://dl.fedoraproject.org/pub/epel/6/i386/epel-release-6-8.noarch.rpm
3  
4 安装erlang
5    $ yum -y install erlang
6  
7 安装RabbitMQ
8    $ yum -y install rabbitmq-server

安装rabbitmq API

1 pip install pika
2 or
3 easy_install pika
4 or
5 源码
6  
7 https://pypi.python.org/pypi/pika

使用API操作RabbitMQ

基于Queue实现生产者消费者模型

1 #!/usr/bin/env python
 2 import pika
 3  
 4 # ######################### 生产者 #########################
 5  
 6 connection = pika.BlockingConnection(pika.ConnectionParameters(
 7         host='localhost'))
 8 channel = connection.channel()
 9  
10 channel.queue_declare(queue='hello')
11  
12 channel.basic_publish(exchange='',
13                       routing_key='hello',
14                       body='Hello World!')
15 print(" [x] Sent 'Hello World!'")
16 connection.close()
1 #!/usr/bin/env python
 2 import pika
 3  
 4 # ########################## 消费者 ##########################
 5  
 6 connection = pika.BlockingConnection(pika.ConnectionParameters(
 7         host='localhost'))
 8 channel = connection.channel()
 9  
10 channel.queue_declare(queue='hello')
11  
12 def callback(ch, method, properties, body):
13     print(" [x] Received %r" % body)
14  
15 channel.basic_consume(callback,
16                       queue='hello',
17                       no_ack=True)
18  
19 print(' [*] Waiting for messages. To exit press CTRL+C')
20 channel.start_consuming()

 

1、acknowledgment 消息不丢失(订阅端消息不丢失)

no-ack = False,如果消费者遇到情况(its channel is closed, connection is closed, or TCP connection is lost)挂掉了,那么,RabbitMQ会重新将该任务添加到队列中。

消费者

生产者

 

2、durable   消息不丢失(服务端消息不丢失)

消费者

生产者

 

3、消息获取顺序

默认消息队列里的数据是按照顺序被消费者拿走,例如:消费者1 去队列中获取 奇数 序列的任务,消费者1去队列中获取 偶数 序列的任务。

channel.basic_qos(prefetch_count=1) 表示谁来谁取,不再按照索引排列

消费者

生产者

 

4、发布订阅

发布订阅和简单的消息队列区别在于,发布订阅会将消息发送给所有的订阅者,而消息队列中的数据被消费一次便消失。所以,RabbitMQ实现发布和订阅时,会为每一个订阅者创建一个队列,而发布者发布消息时,会将消息放置在所有相关队列中。

exchange type = fanout

发布者

订阅者

 

5、关键字发送

exchange type = direct

之前事例,发送消息时明确指定某个队列并向其中发送消息,RabbitMQ还支持根据关键字发送,即:队列绑定关键字,发送者将数据根据关键字发送到消息exchange,exchange根据 关键字 判定应该将数据发送至指定队列。

发布者

订阅在1

订阅在2

 

6、模糊匹配

exchange type = topic

在topic类型下,可以让队列绑定几个模糊的关键字,之后发送者将数据发送到exchange,exchange将传入”路由值“和 ”关键字“进行匹配,匹配成功,则将数据发送到指定队列。

  • # 表示可以匹配 0 个 或 多个 单词
  • *  表示只能匹配 一个 单词

订阅者

发布者

 

注:

订阅/发布Demo

 

发送消息给多个订阅者

核心思想:消息发送给exchange,每个接收方创建匿名Queue绑定到exchange,exchange发送消息给每个接收方。

 

Exchanges

 

在RabbitMQ完整的模型中,消息只能发送给一个exchange。

exchange一方面接收消息,另一方面push给queues。

 

exchange类型

> rabbitmqctl list_exchanges

direct

topic

headers

fanout 广播消息给已知队列

相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
7月前
|
消息中间件 分布式计算 监控
Python面试:消息队列(RabbitMQ、Kafka)基础知识与应用
【4月更文挑战第18天】本文探讨了Python面试中RabbitMQ与Kafka的常见问题和易错点,包括两者的基础概念、特性对比、Python客户端使用、消息队列应用场景及消息可靠性保证。重点讲解了消息丢失与重复的避免策略,并提供了实战代码示例,帮助读者提升在分布式系统中使用消息队列的能力。
242 2
|
27天前
|
存储 算法 安全
FreeMQTT:一款Python语言实现的开源MQTT Server
FreeMQTT 是一款用 Python 语言并基于 Tornado 开发的开源 MQTT 服务器,支持 MQTT3.1.1 和 MQTT5.0 协议,提供多租户安全隔离、高效 Topic 匹配算法及实时上下线通知等功能,适用于 IoT 场景。快速启动仅需克隆仓库、安装依赖并运行服务。
|
4月前
|
消息中间件 监控 调度
Celery与RabbitMQ的结合【Python】
【8月更文挑战第18天】 Celery与RabbitMQ结合是构建高效Python分布式系统的利器。Celery作为分布式任务队列,支持任务调度与结果管理;RabbitMQ则确保了消息的可靠传递。二者联用不仅提升了系统的异步处理能力,还增强了其扩展性与可靠性。通过简单的安装与配置,即可实现任务的异步执行与调度,同时利用监控工具优化性能并确保安全性。这种组合适用于需要处理大量异步任务的应用场景,极大地简化了分布式系统的设计与实现。
90 0
|
6月前
|
消息中间件 监控 调度
构建Python中的分布式系统结合Celery与RabbitMQ
在当今的软件开发中,构建高效的分布式系统是至关重要的。Python作为一种流行的编程语言,提供了许多工具和库来帮助开发人员构建分布式系统。其中,Celery和RabbitMQ是两个强大的工具,它们结合在一起可以为你的Python应用程序提供可靠的异步任务队列和消息传递机制。
|
6月前
|
消息中间件 存储 JSON
python 操作RabbitMq详细操作分享
python 操作RabbitMq详细操作分享
115 0
|
7月前
|
消息中间件 数据采集 Python
2024年Python最全使用python的pika链接rabbitMq断裂_pika,BTAJ面试有关散列(哈希)表的面试题详解
2024年Python最全使用python的pika链接rabbitMq断裂_pika,BTAJ面试有关散列(哈希)表的面试题详解
2024年Python最全使用python的pika链接rabbitMq断裂_pika,BTAJ面试有关散列(哈希)表的面试题详解
|
物联网 Python
如何通过示例在Python中使用Paho MQTT客户端?
如何通过示例在Python中使用Paho MQTT客户端?
399 2
如何通过示例在Python中使用Paho MQTT客户端?
|
7月前
|
消息中间件 安全 Docker
Docker中部署RabbitMQ并使用Python3.x操作全书(Python操作RabbitMQ看这一篇就够了)
Docker中部署RabbitMQ并使用Python3.x操作全书(Python操作RabbitMQ看这一篇就够了)
98 0
|
消息中间件 监控 NoSQL
python3.7+Tornado5.1.1+Celery3.1+Rabbitmq3.7.16实现异步队列任务
在之前的一篇文章中提到了用[Django+Celery+Redis实现了异步任务队列](https://v3u.cn/a_id_54),只不过消息中间件使用了redis,redis作为消息中间件可谓是差强人意,功能和性能上都不如Rabbitmq,所以本次使用tornado框架结合celery,同时消息中间件使用Rabbitmq来实现异步发邮件,并且使用flower来监控任务队列。
python3.7+Tornado5.1.1+Celery3.1+Rabbitmq3.7.16实现异步队列任务
下一篇
DataWorks