RabbitMQ是一个消息代理,从“生产者”接收消息并传递消息至“消费者”,期间可根据规则路由、缓存、持久化消息。“生产者”也即message发送者以下简称P,相对应的“消费者”乃message接收者以下简称C,message通过queue由P到C,queue存在于RabbitMQ,可存储尽可能多的message,多个P可向同一queue发送message,多个C可从同一个queue接收message。
实现的协议:AMQP。
术语(Jargon)
P,Producing,制造和发送信息的一方。
Queue,消息队列。
C,Consuming,接收消息的一方。
RabbitMQ安装
1 安装配置epel源 2 $ rpm -ivh http://dl.fedoraproject.org/pub/epel/6/i386/epel-release-6-8.noarch.rpm 3 4 安装erlang 5 $ yum -y install erlang 6 7 安装RabbitMQ 8 $ yum -y install rabbitmq-server
安装rabbitmq API
1 pip install pika 2 or 3 easy_install pika 4 or 5 源码 6 7 https://pypi.python.org/pypi/pika
使用API操作RabbitMQ
基于Queue实现生产者消费者模型
1 #!/usr/bin/env python 2 import pika 3 4 # ######################### 生产者 ######################### 5 6 connection = pika.BlockingConnection(pika.ConnectionParameters( 7 host='localhost')) 8 channel = connection.channel() 9 10 channel.queue_declare(queue='hello') 11 12 channel.basic_publish(exchange='', 13 routing_key='hello', 14 body='Hello World!') 15 print(" [x] Sent 'Hello World!'") 16 connection.close()
1 #!/usr/bin/env python 2 import pika 3 4 # ########################## 消费者 ########################## 5 6 connection = pika.BlockingConnection(pika.ConnectionParameters( 7 host='localhost')) 8 channel = connection.channel() 9 10 channel.queue_declare(queue='hello') 11 12 def callback(ch, method, properties, body): 13 print(" [x] Received %r" % body) 14 15 channel.basic_consume(callback, 16 queue='hello', 17 no_ack=True) 18 19 print(' [*] Waiting for messages. To exit press CTRL+C') 20 channel.start_consuming()
1、acknowledgment 消息不丢失(订阅端消息不丢失)
no-ack = False,如果消费者遇到情况(its channel is closed, connection is closed, or TCP connection is lost)挂掉了,那么,RabbitMQ会重新将该任务添加到队列中。
消费者
生产者
2、durable 消息不丢失(服务端消息不丢失)
消费者
生产者
3、消息获取顺序
默认消息队列里的数据是按照顺序被消费者拿走,例如:消费者1 去队列中获取 奇数 序列的任务,消费者1去队列中获取 偶数 序列的任务。
channel.basic_qos(prefetch_count=1) 表示谁来谁取,不再按照索引排列
消费者
生产者
4、发布订阅
发布订阅和简单的消息队列区别在于,发布订阅会将消息发送给所有的订阅者,而消息队列中的数据被消费一次便消失。所以,RabbitMQ实现发布和订阅时,会为每一个订阅者创建一个队列,而发布者发布消息时,会将消息放置在所有相关队列中。
exchange type = fanout
发布者
订阅者
5、关键字发送
exchange type = direct
之前事例,发送消息时明确指定某个队列并向其中发送消息,RabbitMQ还支持根据关键字发送,即:队列绑定关键字,发送者将数据根据关键字发送到消息exchange,exchange根据 关键字 判定应该将数据发送至指定队列。
发布者
订阅在1
订阅在2
6、模糊匹配
exchange type = topic
在topic类型下,可以让队列绑定几个模糊的关键字,之后发送者将数据发送到exchange,exchange将传入”路由值“和 ”关键字“进行匹配,匹配成功,则将数据发送到指定队列。
- # 表示可以匹配 0 个 或 多个 单词
- * 表示只能匹配 一个 单词
订阅者
发布者
注:
订阅/发布Demo
发送消息给多个订阅者
核心思想:消息发送给exchange,每个接收方创建匿名Queue绑定到exchange,exchange发送消息给每个接收方。
Exchanges
在RabbitMQ完整的模型中,消息只能发送给一个exchange。
exchange一方面接收消息,另一方面push给queues。
exchange类型
> rabbitmqctl list_exchanges
direct
topic
headers
fanout 广播消息给已知队列