RabbitMQ详解(三)

简介:

一、分发到多Consumer(fanout)
二、Routing路由(Direct)
三、主题路由(Topic)

一、分发到多Consumer(fanout)
将同一个Message deliver到多个Consumer中。这个模式也被称为"publish/subscribe"
创建一个日志系统,包含两部分:第一部分发出log(Producer),第二部分接收到并打印(Consumer)。两个Consumer,第一个将log写到物理磁盘上;第二个将log输出的屏幕。

1.发送消息流程:
    1.Producer发送的Message实际上是发到了Exchange中。
    2.Exchanges从Producer接收message投递到queue中
    3.Prducer发送的消息只是到达了Exchange中,Exchange具有不同的类型实现不同的分发方式

Exchnges的类型:direct、topic和fanout
fanout就是广播模式,会将所有的Message都放到它所知道的queue中
channel.exchange_declare(exchange='logs',  
    type='fanout')   //创建一个名字为logs,类型为fanout的Exchange:

1
2
3
4
5
6
7
8
9
10
11
[root@node 112  ~]# rabbitmqctl list_exchanges //查看所有的Exchanges
Listing exchanges ...
logs  fanout
amq.direct    direct
amq.fanout    fanout
amq.headers    headers
amq.match    headers
amq.rabbitmq.log    topic
amq.rabbitmq.trace    topic
amq.topic    topic
...done.

注意:amq.* exchanges 和the default (unnamed)exchange是RabbitMQ默认创建的。 

通过exchange,而不是routing_key来publish Message:
channel.basic_publish(exchange='logs',  
    routing_key='',  
    body=message)  

2.临时队列
截至现在,我们用的queue都是有名字的:第一个是hello,第二个是task_queue。使用有名字的queue,使得在Producer和Consumer之前共享queue成为可能。
但是对于我们将要构建的日志系统,并不需要有名字的queue。我们希望得到所有的log,而不是它们中间的一部分。而且我们只对当前的log感兴趣。为了实现这个目标,我们需要两件事情:
    1)每当Consumer连接时,我们需要一个新的,空的queue。因为我们不对老的log感兴趣。幸运的是,如果在声明queue时不指定名字,那么RabbitMQ会随机为我们选择这个名字。方法:
    result = channel.queue_declare() 
    通过result.method.queue 可以取得queue的名字。基本上都是这个样子:amq.gen-JzTY20BRgKO-HjmUJj0wLg。
    2)当Consumer关闭连接时,这个queue要被deleted。可以加个exclusive的参数。方法:
    result = channel.queue_declare(exclusive=True)   //每次获取的都是新的,单独使用的
    
3.Bindings绑定
    创建好fanout类型的Exchange和没有名字的queue后(实际上是RabbitMQ帮我们取的名字)Exchange通过bindings把它的Message发送到目标queue
    channel.queue_bind(exchange='logs',  
        queue=result.method.queue)      
    使用命令rabbitmqctl list_bindings 查看bindings
    
4.最终代码
拓扑图:
1.png

Producer,在这里就是产生log的program,基本上和前几个都差不多。最主要的区别就是publish通过了exchange而不是routing_key。
emit_log.py script:
===========================================================================

1
2
3
4
5
6
7
8
9
10
11
12
13
14
#!/usr/bin/env python
import pika
import sys
connection = pika.BlockingConnection(pika.ConnectionParameters(
     host= 'localhost' ))
channel = connection.channel()
channel.exchange_declare(exchange= 'logs' ,
     type= 'fanout' )
message =  ' ' .join(sys.argv[ 1: ]) or  "info: Hello World!"
channel.basic_publish(exchange= 'logs' ,
     routing_key= '' ,
     body=message)
print  " [x] Sent %r"  % (message,)
connection.close()

还有一点要注意的是我们声明了exchange。publish到一个不存在的exchange是被禁止的。如果没有queue bindings exchange的话,log是被丢弃的。
Consumer:receive_logs.py:
===========================================================================

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
#!/usr/bin/env python
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters(
     host= 'localhost' ))
channel = connection.channel()
channel.exchange_declare(exchange= 'logs' ,
     type= 'fanout' )
result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue
channel.queue_bind(exchange= 'logs' ,
     queue=queue_name)
print  ' [*] Waiting for logs. To exit press CTRL+C'
def callback(ch, method, properties, body):
     print  " [x] %r"  % (body,)
channel.basic_consume(callback,
     queue=queue_name,
     no_ack=True)
channel.start_consuming()

试运行:
    Consumer1:$ python receive_logs.py > logs_from_rabbit.log  //追加到文件
    Consumer2:python receive_logs.py //输出到屏幕
    Producer:python emit_log.py
也可通过修改callback自己写文件
输出结果如图:
3.png

二、Routing路由(Direct)
对于上一个日志系统改进。能够使用不同的severity来监听不同等级的log。比如我们希望只有error的log才保存到磁盘上。
1.Bindings绑定
之前的绑定
channel.queue_bind(exchange=exchange_name,  
    queue=queue_name)  
绑定其实就是关联了exchange和queue。或者这么说:queue对exchagne的内容感兴趣,exchange要把它的Message deliver到queue中。
实际上,绑定可以带routing_key 这个参数。其实这个参数的名称和basic_publish 的参数名是相同了。为了避免混淆,我们把它成为binding key。
    使用一个key来创建binding :
channel.queue_bind(exchange=exchange_name,  
    queue=queue_name,  
    routing_key='black') 
对于fanout的exchange来说,这个参数是被忽略的。

2.Direct Exchange
通过Bindings key完全匹配
图Direct路由模型
Direct.png

exchange X和两个queue绑定在一起。Q1的binding key是orange。Q2的binding key是black和green。
当P publish key是orange时,exchange会把它放到Q1。如果是black或者green那么就会到Q2。其余的Message都会被丢弃。

3.多重绑定(Multiple Bindings)
多个queue绑定同一个key是可以的。对于下图的例子,Q1和Q2都绑定了black。也就是说,对于routing key是black的Message,会被deliver到Q1和Q2。其余的Message都会被丢弃。
图muliti-bindings
multi.png

4.生产者和消费者
生产者:
===========================================================================

1
2
3
4
5
6
7
8
channel.exchange_declare(exchange= 'direct_logs' ,  
     type= 'direct' )  
//创建一个direct的exchange。使用log的severity作为routing key,这样Consumer可以针对不同severity的log进行不同的处理。
publish:
channel.basic_publish(exchange= 'direct_logs' ,  
     routing_key=severity, 
     body=message)  
//涉及三种severity: 'info' 'warning' 'error' .

消费者:
===========================================================================

1
2
3
4
5
6
7
result = channel.queue_declare(exclusive=True)  
queue_name = result.method.queue  
for severity in severities:  
     channel.queue_bind(exchange= 'direct_logs' ,  
         queue=queue_name,  
         routing_key=severity) 
//queue需要绑定severity

5.最终版本
图:direct_2
direct_2.png

emit_log_direct.py 
===========================================================================

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
#!/usr/bin/env python
import pika
import sys
connection = pika.BlockingConnection(pika.ConnectionParameters(
     host= 'localhost' ))
channel = connection.channel()
channel.exchange_declare(exchange= 'direct_logs' ,
     type= 'direct' )
severity = sys.argv[ 1 ] if len(sys.argv) >  1  else  'info'
message =  ' ' .join(sys.argv[ 2: ]) or  'Hello World!'
channel.basic_publish(exchange= 'direct_logs' ,
     routing_key=severity,
     body=message)
print  " [x] Sent %r:%r"  % (severity, message)
connection.close()

receive_logs_direct.py: 
===========================================================================

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
#!/usr/bin/env python  
import pika  
import sys  
connection = pika.BlockingConnection(pika.ConnectionParameters(  
     host= 'localhost' ))  
channel = connection.channel()  
channel.exchange_declare(exchange= 'direct_logs' ,  
     type= 'direct' )  
result = channel.queue_declare(exclusive=True)  
queue_name = result.method.queue  
severities = sys.argv[ 1: ]  
if not severities:  
     print  >> sys.stderr,  "Usage: %s [info] [warning] [error]"  % \  
         (sys.argv[ 0 ],)  
     sys.exit( 1 )  
for severity in severities:      
     channel.queue_bind(exchange= 'direct_logs' ,  
         queue=queue_name,  
         routing_key=severity)  
print  ' [*] Waiting for logs. To exit press CTRL+C'  
def callback(ch, method, properties, body):  
     print  " [x] %r:%r"  % (method.routing_key, body,)  
channel.basic_consume(callback,  
     queue=queue_name,  
     no_ack=True)  
channel.start_consuming()

===========================================================================
试运行:
$ python receive_logs_direct.py warning error > logs_from_rabbit.log 
    //把warning和error的log记录到一个文件中
$ python receive_logs_direct.py info warning error  
    //打印所有log到屏幕    

三、主题路由(Topic)
1.Topic exchange
Message的routing_key使用限制,不能使任意的。格式是以点号“."分割的字符表。
比如:"stock.usd.nyse", "nyse.vmw", "quick.orange.rabbit"。你可以放任意的key在routing_key中,当然最长不能超过255 bytes。
    对于routing_key,有两个特殊字符(在正则表达式里叫元字符):
    * (星号) 代表任意 一个单词
    # (hash) 0个或者多个单词
示例:
Producer发送消息时需要设置routing_key,routing_key包含三个单词和两个点号。
    第一个key是描述了celerity(灵巧,敏捷),第二个是colour(色彩),第三个是species(物种):"<celerity>.<colour>.<species>"。
在这里我们创建了两个绑定: Q1 的binding key 是"*.orange.*"; Q2 是  "*.*.rabbit" 和 "lazy.#":
    Q1 感兴趣所有orange颜色的动物
    Q2 感兴趣所有的rabbits和所有的lazy的
比如routing_key是 "quick.orange.rabbit"将会发送到Q1和Q2中。消息"lazy.orange.elephant" 也会发送到Q1和Q2。但是"quick.orange.fox" 会发送到Q1;"lazy.brown.fox"会发送到Q2。"lazy.pink.rabbit" 也会发送到Q2,但是尽管两个routing_key都匹配,它也只是发送一次。"quick.brown.fox" 会被丢弃。
如果发送的单词不是3个呢? 答案要看情况,因为#是可以匹配0个或任意个单词。比如"orange" or "quick.orange.male.rabbit",它们会被丢弃。如果是lazy那么就会进入Q2。类似的还有 "lazy.orange.male.rabbit",尽管它包含四个单词。

Topic exchange和其他exchange
    由于有"*" (star) and "#" (hash), Topic exchange 非常强大并且可以转化为其他的exchange:
    如果binding_key 是 "#" - 它会接收所有的Message,不管routing_key是什么,就像是fanout exchange。
    如果 "*" (star) and "#" (hash) 没有被使用,那么topic exchange就变成了direct exchange。

2.代码实现
The code for emit_log_topic.py:
========================================================================

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
#!/usr/bin/env python
import pika
import sys
connection = pika.BlockingConnection(pika.ConnectionParameters(
     host= 'localhost' ))
channel = connection.channel()
channel.exchange_declare(exchange= 'topic_logs' ,
     type= 'topic' )
routing_key = sys.argv[ 1 ] if len(sys.argv) >  1  else  'anonymous.info'
message =  ' ' .join(sys.argv[ 2: ]) or  'Hello World!'
channel.basic_publish(exchange= 'topic_logs' ,
     routing_key=routing_key,
     body=message)
print  " [x] Sent %r:%r"  % (routing_key, message)
connection.close()

========================================================================

The code for receive_logs_topic.py:     
========================================================================    

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
#!/usr/bin/env python
import pika
import sys
connection = pika.BlockingConnection(pika.ConnectionParameters(
     host= 'localhost' ))
channel = connection.channel()
channel.exchange_declare(exchange= 'topic_logs' ,
     type= 'topic' )
     
result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue
binding_keys = sys.argv[ 1: ]
if not binding_keys:
     print  >> sys.stderr,  "Usage: %s [binding_key]..."  % (sys.argv[ 0 ],)
     sys.exit( 1 )
for binding_key in binding_keys:
     channel.queue_bind(exchange= 'topic_logs' ,
         queue=queue_name,
         routing_key=binding_key)
print  ' [*] Waiting for logs. To exit press CTRL+C'
def callback(ch, method, properties, body):
     print  " [x] %r:%r"  % (method.routing_key, body,)
channel.basic_consume(callback,
     queue=queue_name,
     no_ack=True)
channel.start_consuming()

    
3.运行和结果
    python receive_logs_topic.py "#"  //接收所有的log
    python receive_logs_topic.py "kern.*"  //接收所有kern facility的log
    python receive_logs_topic.py "*.critical"  //仅仅接收critical的log: 
    python receive_logs_topic.py "kern.*" "*.critical"  //可以创建多个绑定: 
    python emit_log_topic.py "kern.critical" "A critical kernel error"  //Producer产生一个log:"kern.critical" type: 
    
参考:    
http://www.rabbitmq.com/tutorials/tutorial-three-python.html










本文转自MT_IT51CTO博客,原文链接:http://blog.51cto.com/hmtk520/2051247,如需转载请自行联系原作者

相关实践学习
RocketMQ一站式入门使用
从源码编译、部署broker、部署namesrv,使用java客户端首发消息等一站式入门RocketMQ。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
7天前
|
消息中间件 存储 中间件
精通 RabbitMQ 系列 01
精通 RabbitMQ 系列 01
32 0
|
7天前
|
消息中间件 存储 网络协议
精通 RabbitMQ 系列 02
精通 RabbitMQ 系列 02
24 0
|
2月前
|
消息中间件 存储 负载均衡
什么是RabbitMQ?
RabbitMQ是一个开源的消息代理软件,用于在分布式系统中传递消息。它实现了高级消息队列协议(AMQP),提供了一种可靠的、强大的、灵活的消息传递机制,使得不同应用程序或组件之间可以轻松地进行通信。
23 0
|
8月前
|
消息中间件 存储
RabbitMq
RabbitMq
67 0
|
9月前
|
消息中间件
1. 什么是 RabbitMQ?
1. 什么是 RabbitMQ?
45 0
|
10月前
|
消息中间件 存储 JSON
关于RabbitMQ
MQ是一种应用程序键一步通讯的技术,MQ是消息队列的缩写(Message Queue) 在MQ中,消息由一个应用程序发送到一个称为队列的中间件中,接着被中间件存储,并最终被另一个或多个消费者应用程序读取和处理; MQ组成:消息——生产者——队列——中间件——消费者!
55 0
|
10月前
|
消息中间件 网络协议 Java
RabbitMQ(3)
RabbitMQ(3)
|
10月前
|
消息中间件 网络协议 Java
RabbitMQ(1)
RabbitMQ(1)
|
10月前
|
消息中间件 Java
RabbitMQ(2)
RabbitMQ(2)
|
10月前
|
消息中间件 存储 缓存
初识RabbitMQ
初识RabbitMQ
85 1

相关实验场景

更多