RabbitMQ 简介

简介:

RabbitMQ是干什么的呢?

     解释RabbitMQ,就不得不提到AMQP(Advanced Message Queuing Protocol)协议。AMQP协议是一种基于网络的消息传输协议,它能够在应用或组织之间提供可靠的消息传输。RabbitMQ是该AMQP协议的一种实现,利用它,可以将消息安全可靠的从发送方传输到接收方。简单的说,就是消息发送方利用RabbitMQ将信息安全的传递给接收方。


     可靠的消息传输为什么一定要用RabbitMQ呢?直接用TCP,HTTP不OK?

     在回答这个问题时,我比较模糊。应该说这个应用的范围不同吧,TCP协议支持在IP之间进行消息传输,而RabbitMQ是根据关键字进行消息的分配和传输。TCP可以将消息从192.168.1.2传输到192.168.1.3。但是它不能将消息根据关键字进行传输吧,比如,给定一个关键字’key‘,你知道要将消息传输到哪吗?呵呵,RabbitMQ知道。


     怎么根据关键字发送消息呢?

     这个嘛!理解比较简单,但介绍起来有点长。要理解这个发送机制,首先要对RabbitMQ的几个定义搞清楚:

1 Server,要利用RabbitMQ进行消息传输,那么就得有一个运行的RabbitMQ服务,我们可以称为Server

2 Producer,既然是传输消息,那得有一个消息的发送者,这里我们成为生产者(Producer)

3 Consumer,消息的接收者,这里称为消息的消费这(Consumer)

4 Exchange,在生产者将消息发出后,消息往哪走呢?这个得由Exchange来决定,可以将它看作一个交换机,它 根据消息自带的特征信息(这里指的是routing_key),进行发送。发给谁呢?不是接收者,是下面的Queue。

5 Queue,一个Exchange可以对应多个Queue,每个Queue都会在定义时,声明自己要接收消息的特征信息(routing_key)。Exchange根据Queue和消息的routing_key的匹配情况进行发送。消息到达Queue后,Consumer就可以将消息从Queue取出了。

       一个Server可以声明n多Exchange,一个Exchange可以对应n多Queue。Exchange和Queue都是存在Server内部的。简化点,可以理解为一个Producer与一个Exchange绑定,Producer将消息交给Exchange,Exchange负责发送消息。一个Consumer与一个Queue绑定,当Queue中有消息时,直接取就行了,管它怎么来的。而消息在Exchange在Queue之间的怎么传递,是由RabbitMQ负责的。我们只需要将消息交给Exchange,然后在Queue中取就行了。(当然还要多点步骤:声明消息和Queue的特征信息,将Queue与Exchange进行关联)

     用一个来自官网的图片说明下:

RabbitMQ

P: Producer, X: Exchange, amq: Queue, C: Consumer

       在消息的传输机制上理解,较为简单。但在源码级别上,进行使用要为复杂。这个我个人要归功于RabbitMQ客户端的几种封装,让我是分不清东南西北。下面是官网上列举的六个应用实例,有易到难,比较容易理解。

1 直接发送模式

不声明Exchange,即采用默认的Exchange。默认的Exchange可以根据routing_key将消息直接发送给特定Queue。

注意:

1) 这种匹配是消息的routing_key与Queue的名子直接进行匹配,而不是与Queue的routing_key。

2) 消息并不能直接发送给queue,这里是经过一个名为''的Exchange进行发送的。

RabbitMQ

Producer.py

#!/usr/bin/env python 
import pika 
connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost')) 
channel = connection.channel() 
#这里要对queue进行声明,已确定Queue存在,如果不存在则创建名为‘hello'的queue。
#否则消息发送时,queue不存在,消息会被直接丢弃
channel.queue_declare(queue='hello') 
channel.basic_publish(exchange='', routing_key='hello', body='Hello World!') 
print " [x] Sent 'Hello World!'" 
connection.close()

Consumer.py

#!/usr/bin/env python 
import pika 
connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost')) 
channel = connection.channel() 
channel.queue_declare(queue='hello') 
print ' [*] Waiting for messages. To exit press CTRL+C' 
def callback(ch, method, properties, body): 
    print " [x] Received %r" % (body,) 
channel.basic_consume(callback, queue='hello', no_ack=True) 
channel.start_consuming()

2 工作列队模式

该模型适用于分发资源密集型的任务。假设一下你需要进行10次计算圆周率的操作,每次计算到小数点后n位,每次耗时1一个小时(这只是一个假设,一般没有比要进行10重复操作的。现在就当作你真的有这个必要)。如果只用一台机器计算,需要十个小时。但如果我们将这十个任务分发给十台进行计算,那么只需一个小时。下面的模型就是适用于这种分发任务的。

RabbitMQ

采用这种模式,列队消息以轮询(round_robin)的方式将消息平均的发给所有与Queue关联的Consumer,一般情况下,每个Consumer都平均的分摊任务。

注意:

1) 在目前的情况下,消息一旦被Consumer取出,就立即从列队中消除。这样当woker执行到任务中途失败时,该任务的信息也丢失了,不能重新开始。

2) RabbitMQ提供一种消息认证机制(message acknowledgments),只有当Consumer返回一个ack时,它才会将消息从列队中删除。如果当Consumer断开连接时,依然没有收到ack,那么它就会重新分发给消息。

3) RabbitMQ不允许以新的属性来重新定义Queue,所以这里我们需要给Queue换个新名子

Producer.py

#!/usr/bin/env python 
import pika 
import sys 
connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost')) 
channel = connection.channel() 
#Queue声明持久化
channel.queue_declare(queue='task_queue', durable=True) 
message = ' '.join(sys.argv[1:]) or "Hello World!" 
channel.basic_publish(exchange='', routing_key='task_queue', body=message, 
                                    properties=pika.BasicProperties( delivery_mode = 2, 消息声明持久化 )) 
print " [x] Sent %r" % (message,) 
connection.close()

Consumer.py

#!/usr/bin/env python 
import pika 
import time 
connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost')) 
channel = connection.channel() 
#Queue声明持久化
channel.queue_declare(queue='task_queue', durable=True) 
print ' [*] Waiting for messages. To exit press CTRL+C' 
def callback(ch, method, properties, body): 
    print " [x] Received %r" % (body,) 
    time.sleep( body.count('.') ) 
    print " [x] Done" 
    #返回消息认证
    ch.basic_ack(delivery_tag = method.delivery_tag) 
channel.basic_qos(prefetch_count=1) 
channel.basic_consume(callback, queue='task_queue') 
channel.start_consuming()


3 广播(fanout)模型

前面两种模型,每条消息只能被一个Consumer获取。原因在于:使用默认的Exchange,它只能将每条消息发给一个或零个Queue中。这里我们将使用一种类型为fanout的Exchange,它可以将消息发送给每一个于它关联的Queue,这样每个Consumer都可以获取相同的消息。

RabbitMQ

Producer.py

#!/usr/bin/env python 
import pika 
import sys 
connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost')) 
channel = connection.channel() 
#定义一个类型为fanout的Exchange
channel
.exchange_declare(exchange='logs', type='fanout') 
message = ' '.join(sys.argv[1:]) or "info: Hello World!" 
channel.basic_publish(exchange='logs', routing_key='', body=message) 
print " [x] Sent %r" % (message,) 
connection.close()

Consumer.py

                 #由于此时Exchange将消息发给所有的Queue,所以Queue无需命名, #此处没有给queue指定名称,MQ会给它产生一个随机名子。

4 direct模型

到目前为止,Consumer只能被动的随机接收一部分消息,或者接收全部,不能自主选择接收哪一部分消息。该消息路由模型,可以使得Consumer指定它想要接收到消息。

RabbitMQ

RabbitMQ

将Exchange声明为direct类型:

channel.exchange_declare(exchange='direct_logs', type='direct')

将Queue与Exchange绑定,注意此处queue与Exchange绑定时,指定了一个参数routing_key。如上面两图所示,一个queue可以以多个routing_key与Exchange进行绑定,多个不同的Queue可以以相同routing_key与同一个Exchange进行绑定。遗留一个问题,一个Queue可不可以与多个Exchange进行绑定呢?

channel.queue_bind(exchange=exchange_name, queue=queue_name, routing_key='black')

发送消息:

channel.basic_publish(exchange='direct_logs', routing_key=severity, body=message)

在发送消息时,也会指定一个routing_key。当Exchange在决定将消息发给哪几个Queue时,它会将该routing_key与Queue绑定时指定的routing_key进行匹配,相同的Queue则可接收消息。

Producer.py

#!/usr/bin/env python 
import pika 
import sys 
connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost')) 
channel = connection.channel() 
channel.exchange_declare(exchange='direct_logs', type='direct') 
severity = sys.argv[1if len(sys.argv) > 1 else 'info' 
message = ' '.join(sys.argv[2:]) or 'Hello World!' 
channel.basic_publish(exchange='direct_logs', routing_key=severity, body=message) 
print " [x] Sent %r:%r" % (severity, message) 
connection.close()

Consumer.py

#!/usr/bin/env python 
import pika 
import sys 
connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost')) 
channel = connection.channel() 
channel.exchange_declare(exchange='direct_logs', type='direct') 
result = channel.queue_declare(exclusive=True) 
queue_name = result.method.queue 
severities = sys.argv[1:] 
if not severities: 
    print >> sys.stderr, "Usage: %s [info] [warning] [error]" % \ (sys.argv[0],) 
    sys.exit(1) 
for severity in severities: 
    channel.queue_bind(exchange='direct_logs', queue=queue_name, routing_key=severity) 
print ' [*] Waiting for logs. To exit press CTRL+C' 

def
 callback(ch, method, properties, body): 
    print " [x] %r:%r" % (method.routing_key, body,)  

channel
.basic_consume(callback, queue=queue_name, no_ack=True)
channel.start_consuming()

5 Topic模型

direct模型是不是已经很好用、很灵活了?不,它的灵活度还不够。看它在routing_key进行匹配时,只能将两个完全相同的routing_key进行匹配,这个不够好,要是能用正则表达式进行匹配,那就完美了。

Topic模型,虽然没有实现用正则表达式进行匹配,但是它进步了一小步。实现了对任意的单词进行匹配:

  • * 

    (星号) 可以匹配任意一个单词

  • # 

    (警号) 可以匹配任意零个或多个单词

RabbitMQ

Producer.py

#!/usr/bin/env python 
import pika 
import sys 
connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost')) 
channel = connection.channel() 
channel.exchange_declare(exchange='topic_logs', type='topic') 
routing_key = sys.argv[1] if len(sys.argv) > 1 else 'anonymous.info' 
message = ' '.join(sys.argv[2:]) or 'Hello World!' 
channel.basic_publish(exchange='topic_logs', routing_key=routing_key, body=message) 
print " [x] Sent %r:%r" % (routing_key, message) 
connection.close()

Consumer.py

                                                                    [object Object]

6 远程调用模型(RPC)

这个是对RabbitMQ消息传输的实际应用,在Openstack中,各个组件间的调用就是通过RabbitMQ实现的,这样是我为什么学习RabbitMQ的原因了。

RabbitMQ

个人猜想,Client端执行远程调用(RPC CALL),通过一个Queue将函数名、传参、已经用于传输返回结果的临时声明的Queue(这个Queue在声明时,无需指定名子,由RabbitMQ自动分配,这样还可以避免命名冲突),Server端接收到消息后,调用相应的函数进行处理,并将结果通过默认Exchange传给临时Queue,这样就完成了一个远程调用。

但是这个猜想有问题:

 每进行一次RPC CALL就要声明一个临时Queue。这个有点浪费。有多浪费我也不知道,没测试过。

我们可以为每个Client指定一个用于返回结果的Queue,这样就不用每次声明了。每次RPC CALL时,绑定一个correlation_id,这样使得返回的结果不会混乱。

注意:

在返回结果的Queue中可能存在脏数据,比如,Server在将结果传输到Queue后,还没来得及返回消息确认就挂了。那么先前发的调用消息就不会消除,在Server下次启动时会再次执行,并再次返回结果。这就有了脏数据。所以,面对Queue里的脏数据,我们只需忽略就行了。

Server.py

                                         [object Object]

Client.py

#!/usr/bin/env python 
import pika 
import uuid 
class FibonacciRpcClient(object): 
    def __init__(self): 
        self.connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost')) 
        self.channel = self.connection.channel() 
        result = self.channel.queue_declare(exclusive=True) 
        self.callback_queue = result.method.queue 
        self.channel.basic_consume(self.on_response, no_ack=True, queue=self.callback_queue) 
    def on_response(self, ch, method, props, body): 
         if self.corr_id == props.correlation_id: 
             self.response = body 
    def call(self, n): 
        self.response = None 
        self.corr_id = str(uuid.uuid4()) 
        self.channel.basic_publish(exchange='', routing_key='rpc_queue',properties=pika.BasicProperties( reply_to = self.callback_queue, correlation_id =self.corr_id, ), body=str(n)) 
       while self.response is None: 
           self.connection.process_data_events()
           return int(self.response)

fibonacci_rpc
 = FibonacciRpcClient() 
print " [x] Requesting fib(30)" 
response = fibonacci_rpc.call(30) 
print " [.] Got %r" % (response,)









本文转自 落花非有意  51CTO博客,原文链接:http://blog.51cto.com/1992zhong/1686350,如需转载请自行联系原作者
相关实践学习
RocketMQ一站式入门使用
从源码编译、部署broker、部署namesrv,使用java客户端首发消息等一站式入门RocketMQ。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
目录
相关文章
|
2天前
|
消息中间件 传感器 网络协议
阿里云MQTT简介和使用流程
以下是内容的摘要: 该文主要介绍了在阿里云上搭建 MQTT 服务器的步骤。首先,需要注册阿里云账号并进行实名认证。然后,购买阿里云 MQTT 实例,选择合适的类型、地域、连接和消息限制。接着,创建产品和设备,命名并上线,获取 MQTT 连接的相关信息,包括 ProductKey、DeviceName 和 DeviceSecret。通过提供的 MQTT.fx 工具,设置 MQTT 客户端连接参数,包括 Broker 地址、端口、用户名和密码。最后,使用 MQTT.fx 测试连接,实现数据的上报和接收,验证 MQTT 服务器的配置是否成功。
|
6月前
|
消息中间件 Linux 虚拟化
消息中间件系列教程(04) -RabbitMQ -简介&安装
消息中间件系列教程(04) -RabbitMQ -简介&安装
38 0
|
2天前
|
消息中间件 存储 中间件
RabbitMq简介
RabbitMq简介
|
6月前
|
传感器 负载均衡 网络协议
01 MQTT简介
01 MQTT简介
44 0
|
9月前
|
消息中间件 中间件 微服务
RabbitMQ 入门简介及安装
RabbitMQ 入门简介及安装
88 0
|
10月前
|
存储 安全 数据可视化
阿里云mqtt简介和优惠购买流程
MQTT(Message Queuing Telemetry Transport)是一种轻量级的通信协议,它可以在不同的设备和系统之间传递信息。阿里云是中国市场主流的云计算服务提供商,它提供了MQTT服务来支持IoT(Internet of Things)设备的通信。
|
11月前
|
消息中间件 传感器 网络协议
动手写物联网平台(二、物联网和MQTT协议简介)
动手写物联网平台(二、物联网和MQTT协议简介)
|
11月前
|
消息中间件 传感器 网络协议
阿里云MQTT简介和使用流程
MQTT(Message Queuing Telemetry Transport,消息队列遥测传输协议),是一种基于发布/订阅(publish/subscribe)模式的"轻量级"通讯协议,该协议构建于TCP/IP协议上,由IBM在1999年发布。MQTT最大优点在于,可以以极少的代码和有限的带宽,为连接远程设备提供实时可靠的消息服务。作为一种低开销、低带宽占用的即时通讯协议,使其在物联网、小型设备、移动应用等方面有较广泛的应用。
|
物联网 Windows
MQTT协议简介2
MQTT协议简介
116 0
|
消息中间件 存储 传感器
MQTT协议简介
MQTT协议简介
431 0