RabbitMQ详解(二)

简介:

一、RabbitMQ架构浅析

二、收发“hello world”

三、任务分发机制


一、RabbitMQ架构浅析

1.MQ架构图  

1.png  

RabbitMQ Server: 也叫broker server,是一种传输服务,维护一条从Producer到Consumer的路线,保证数据能够按照指定的方式进行传输。

但是这个保证也不是100%的保证,但是对于普通的应用来说这已经足够了。当然对于商业系统来说,可以再做一层数据一致性的guard,就可以彻底保证系统的一致性了。

Client A & B: 也叫Producer,数据的发送方。createmessages and publish (send) them to a broker server (RabbitMQ).

一个Message有两个部分:payload(有效载荷)和label(标签)。payload顾名思义就是传输的数据。label是exchange的名字或者说是一个tag,它描述了payload,而且RabbitMQ也是通过这个label来决定把这个Message发给哪个Consumer。AMQP仅仅描述了label,而RabbitMQ决定了如何使用这个label的规则。

Client 1,2,3:也叫Consumer,数据的接收方。Consumersattach to a broker server (RabbitMQ) and subscribe to a queue。把queue比作是一个有名字的邮箱。当有Message到达某个邮箱后,RabbitMQ把它发送给它的某个订阅者即Consumer。当然可能会把同一个Message发送给很多的Consumer。在这个Message中,只有payload,label已经被删掉了。对于Consumer来说,它是不知道谁发送的这个信息的。就是协议本身不支持。但是当然了如果Producer发送的payload包含了Producer的信息就另当别论了。


2.channle和connection

Connection: 就是一个TCP的连接。Producer和Consumer都是通过TCP连接到RabbitMQ Server的。程序的起始处就是建立这个TCP连接。

Channels: 虚拟连接。它建立在上述的TCP连接中。数据流动都是在Channel中进行的。也就是说,一般情况是程序起始建立TCP连接,第二步就是建立这个Channel。

对于OS来说,建立和关闭TCP连接是有代价的,频繁的建立关闭TCP连接对于系统的性能有很大的影响,而且TCP的连接数也有限制,这也限制了系统处理高并发的能力。但是,在TCP连接中建立Channel是没有上述代价的。

对于Producer或者Consumer来说,可以并发的使用多个Channel进行Publish或者Receive。 

 

3.ack确认机制

如果Message被某个consumer消费了,那么该Message就会被从queue中移除。//当然也可以让同一到个Message发送到很多Consumer

如果没有被任何consumer消费,那么这个Message会被Cache,不会被丢弃。数据被consumer正确的Consumer收到时,数据就会被从queue中删除

正确的收到:使用ack机制实现//可以显式在程序中去ack,也可以自动的ack。如果数据没有被ack:rabbitmq server会把该消息传输到下一个consumer

如果这个app忘记了ack。那么rabbitmq server不会再发送数据给它。因为server认为这个consumer的处理能力有限

使用ack也可以起到一定的限流的作用:在consumer处理完成数据后发送ack,甚至在额外的延时后发送ack,将有效的balance consumer的load

当然对于实际的例子,比如我们可能会对某些数据进行merge,比如merge 4s内的数据,然后sleep 4s后再获取数据。特别是在监听系统的state,我们不希望所有的state实时的传递上去,而是希望有一定的延时。这样可以减少某些IO,而且终端用户也不会感觉到。


4.Reject a message 

有两种方式,第一种的Reject可以让RabbitMQ Server将该Message 发送到下一个Consumer。第二种是从queue中立即删除该Message。


5.Creating a queue

Consumer和Procuder都可以通过 queue.declare 创建queue。对于某个Channel来说,Consumer不能declare一个queue,却订阅其他的queue。当然也可以创建私有的queue。这样只有app本身才可以使用这个queue。queue也可以自动删除,被标为auto-delete的queue在最后一个Consumer unsubscribe后就会被自动删除。那么如果是创建一个已经存在的queue呢?那么不会有任何的影响。需要注意的是没有任何的影响,也就是说第二次创建如果参数和第一次不一样,那么该操作虽然成功,但是queue的属性并不会被修改。

那么谁应该负责创建这个queue呢?是Consumer,还是Producer?

如果queue不存在,当然Consumer不会得到任何的Message。但是如果queue不存在,那么Producer Publish的Message会被丢弃。所以,还是为了数据不丢失,Consumer和Producer都try to create the queue!反正不管怎么样,这个接口都不会出问题。

queue对load balance的处理是完美的。对于多个Consumer来说,RabbitMQ 使用循环的方式(round-robin)的方式均衡的发送给不同的Consumer。


6.Exchanges

从架构图可以看出,Procuder Publish的Message进入了Exchange。接着通过“routing keys”, RabbitMQ会找到应该把这个Message放到哪个queue里。queue也是通过这个routing keys来做的绑定。

有三种类型的Exchanges:direct, fanout,topic。 每个实现了不同的路由算法(routing algorithm)。

Direct exchange: 如果 routing key 匹配, 那么Message就会被传递到相应的queue中。其实在queue创建时,它会自动的以queue的名字作为routing key来绑定那个exchange。

Fanout exchange: 会向响应的queue广播。

Topic exchange: 对key进行模式匹配,比如ab*可以传递到所有ab*的queue。

7.Virtual hosts

每个virtual host本质上都是一个RabbitMQ Server,拥有它自己的queue,exchagne,和bings rule等等。这保证了你可以在多个不同的application中使用RabbitMQ。

二、收发“hello world”

python --version //用python2的 安装python2-pika 

1.发送消息

1
2
3
4
5
6
7
8
9
10
11
#!/usr/bin/env python
import  pika
connection  =  pika.BlockingConnection(pika.ConnectionParameters(
host = 'localhost' ))
channel  =  connection.channel()
channel.queue_declare(queue = 'hello' )
channel.basic_publish(exchange = '',
routing_key = 'hello' ,
body = 'Hello World!' )
print  " [x] Sent 'Hello World!'"
connection.close()

=====================================================================

建立连接->创建channel->创建名字为hello的队列->发送消息->关闭连接

从架构图可以看出,Producer只能发送到exchange,它是不能直接发送到queue的。现在我们使用默认的exchange(名字是空字符)。这个默认的exchange允许我们发送给指定的queue。routing_key就是指定的queue名字。

关闭连接

[root@node112 test]# rabbitmqctl list_queues //查看已经发送的队列

Listing queues ...

Hello 1 //被消费后,会变成0

...done.


2.接受消息

1
2
3
4
5
6
7
8
9
10
11
12
13
#!/usr/bin/env python
import  pika
connection  =  pika.BlockingConnection(pika.ConnectionParameters(
host = 'localhost' ))
channel  =  connection.channel()
channel.queue_declare(queue = 'hello' )
print  ' [*] Waiting for messages. To exit press CTRL+C'
def  callback(ch, method, properties, body):
print  " [x] Received %r"  %  (body,)
channel.basic_consume(callback,
queue = 'hello' ,
no_ack = True )
channel.start_consuming()

=====================================================================

建立连接->创建channel->创建名字为hello的队列->消费消息->关闭连接

subscribe了。在这之前,需要声明一个回调函数来处理接收到的数据。

3.运行测试

$ python send.py  

[x] Sent 'Hello World!'

send.py 每次运行完都会停止。注意:现在数据已经存到queue里了。接收它:

$ python receive.py  

[*] Waiting for messages. To exit press CTRL+C  

[x] Received 'Hello World!'  


三、任务分发机制

RabbitMQ Server将queue的Message发送给不同的Consumer以处理计算密集型的任务

1.任务分发机制


new_task.py //发送者

====================================================================

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
#!/usr/bin/env python  
import  pika  
import  sys  
connection  =  pika.BlockingConnection(pika.ConnectionParameters(  
host = 'localhost' ))  
channel  =  connection.channel()  
channel.queue_declare(queue = 'task_queue' , durable = True )    
message  =  ' ' .join(sys.argv[ 1 :])  or  "Hello World!"  
channel.basic_publish(exchange = '',  
routing_key = 'task_queue' ,  
body = message,  
properties = pika.BasicProperties(  
delivery_mode  =  2 # make message persistent  
))  
print  " [x] Sent %r"  %  (message,)  
connection.close()

worker.py //收集者

===================================================================

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
#!/usr/bin/env python  
import  pika  
import  time    
connection  =  pika.BlockingConnection(pika.ConnectionParameters(  
host = 'localhost' ))  
channel  =  connection.channel()  
channel.queue_declare(queue = 'task_queue' , durable = True )  
print  ' [*] Waiting for messages. To exit press CTRL+C'    
def  callback(ch, method, properties, body):  
print  " [x] Received %r"  %  (body,)  
time.sleep( body.count( '.' ) )  
print  " [x] Done"  
ch.basic_ack(delivery_tag  =  method.delivery_tag)    
channel.basic_qos(prefetch_count = 1 )  
channel.basic_consume(callback,  
queue = 'task_queue' )
channel.start_consuming()

2.Round-robin循环分发

RabbitMQ对于load较大的情况,可以通过增加consumer和多创建VirtualHost解决

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
Consumer 1: # python worker.py 
Consumer 2: # python worker.py 
Producer:#[root@node 112  test]# for i in First Second Third Fourth Fifth ; do python new_task.py $i messages  ; done
  [x] Sent  'First messages'
  [x] Sent  'Second messages'
  [x] Sent  'Third messages'
  [x] Sent  'Fourth messages'
  [x] Sent  'Fifth messages'
验证:
Consumer 1:
[root@node 112  test]# python worker.py 
[*] Waiting for messages. To exit press CTRL+C
[x] Received  'Second messages'
[x] Done
[x] Received  'Fourth messages'
[x] Done
Consumer 2:
[root@node 112  test]# python worker.py 
[*] Waiting for messages. To exit press CTRL+C
[x] Received  'First messages'
[x] Done
[x] Received  'Third messages'
[x] Done
[x] Received  'Fifth messages'
[x] Done

默认情况下,RabbitMQ 会顺序的分发每个Message。当每个收到ack后,会将该Message删除,然后将下一个Message分发到下一个Consumer。这种分发方式叫做round-robin。


3.消息确认

no-ack:Consumer收到消息后,RabbitMQ Server会立即把这个message标记为完成,然后从queue中退出 //

ack:数据被接收并且被处理后(RabbitMQ Server收到ACK)才会去安全的删除数据

如果Consumer退出了但是没有发送ack,RabbitMQ会把这个Message发送到下一个Consumer。保证在Consumer异常退出的情况下数据不会丢失。

这里并没有用到超时机制。RabbitMQ仅仅通过Consumer的连接中断来确认该Message并没有被正确处理。也就是说,RabbitMQ给了Consumer足够长的时间来做数据处理。

默认情况下,消息确认是打开的(enabled)。

1
2
3
4
5
6
7
def  callback(ch, method, properties, body):
print  " [x] Received %r"  %  (body,)  
time.sleep( body.count( '.' ) )  
print  " [x] Done"  
ch.basic_ack(delivery_tag  =  method.delivery_tag)  
channel.basic_consume(callback,  
queue = 'hello' )

这样即使你通过Ctr-C中断了worker.py,那么Message也不会丢失了,它会被分发到下一个Consumer。

如果忘记了ack,那么后果很严重。当Consumer退出时,Message会重新分发。然后RabbitMQ会占用越来越多的内存,由于RabbitMQ会长时间运行,因此这个“内存泄漏”是致命的。去调试这种错误,可以通过一下命令打印un-acked Messages:

$ sudo rabbitmqctl list_queues name messages_ready messages_unacknowledged  


4.消息持久化

将queue和Message持久化

队列持久化:channel.queue_declare(queue='hello', durable=True)  

再次强调,Producer和Consumer都应该去创建这个queue,尽管只有一个地方的创建是真正起作用的:

接下来,需要持久化Message,即在Publish的时候指定一个properties,方式如下:

1
2
3
4
5
6
channel.basic_publish(exchange = '',  
routing_key = "task_queue" ,  
body = message,  
properties = pika.BasicProperties(  
delivery_mode  =  2 # make message persistent  
))

防止数据丢失:

1.Consumer在数据处理结束后发送ack,这样RabbitMQ Server会认为Message Deliver 成功。

2.持久化queue,可以防止RabbitMQ Server 重启或者crash引起的数据丢失。

3.持久化Message,理由同上。

但是数据依然存在丢失的风险。//例如在存储到磁盘的时间过程中

RabbitMQ并不是为每个Message都做fsync:它可能仅仅是把它保存到Cache里,还没来得及保存到物理磁盘上。

方案:把每次的publish放到一个transaction中。这个transaction的实现需要user defined codes。

或者在{系统panic/异常重启/断电}时,给各个应用留出时间去flash cache,保证每个应用都能exit gracefully。


5.公平分发

默认状态下,RabbitMQ将第n个Message分发给第n个Consumer。当然n是取余后的。它不管Consumer是否还有unacked Message,只是按照这个默认机制进行分发。

那么如果有个Consumer工作比较重,那么就会导致有的Consumer基本没事可做,有的Consumer却是毫无休息的机会。那么,RabbitMQ是如何处理这种问题呢?

通过 basic.qos 方法设置prefetch_count=1 。这样RabbitMQ就会使得每个Consumer在同一个时间点最多处理一个Message。换句话说,在接收到该Consumer的ack前,他它不会将新的Message分发给它。 设置方法如下:

channel.basic_qos(prefetch_count=1)  


注意:这种方法可能会导致queue满。当然,这种情况下你可能需要添加更多的Consumer,或者创建更多的virtualHost来细化你的设计。

转自:http://blog.csdn.net/column/details/rabbitmq.html

官网:http://www.rabbitmq.com

http://blog.csdn.net/anzhsoft/article/details/19563091











本文转自MT_IT51CTO博客,原文链接:http://blog.51cto.com/hmtk520/2051211,如需转载请自行联系原作者
相关实践学习
快速体验阿里云云消息队列RocketMQ版
本实验将带您快速体验使用云消息队列RocketMQ版Serverless系列实例进行获取接入点、创建Topic、创建订阅组、收发消息、查看消息轨迹和仪表盘。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
存储 缓存 弹性计算
|
5月前
|
人工智能 数据可视化 JavaScript
2.1k star! 抓紧冲,DeepChat:连接AI与个人世界的智能助手的开源项目
DeepChat是一款开源跨平台智能助手工具,采用Apache-2.0协议,已获2.1k星标。它支持Windows/macOS/Linux,兼容20+主流AI服务,如OpenAI、Gemini等,还支持本地部署的开源模型。其核心功能包括全模型兼容架构、智能搜索增强、可视化工具平台(MCP)、多模态呈现及隐私安全设计。DeepChat适用于程序员助手、内容创作、学习研究、数据分析和智能办公五大场景。项目优势在于智能搜索决策、零配置工具链和企业级扩展性,是探索AI应用的理想选择。项目地址:https://github.com/ThinkInAIXYZ/deepchat。
330 0
|
10月前
|
存储 自然语言处理 API
打破文本边界:如何进行多模态RAG评估
一般的检索增强生成(RAG,Retrieval-Augmented Generation)方法主要依赖于文本数据,常常忽略了图像中的丰富信息。那么应该如何解决呢?本文带你了解一下这个模型。
打破文本边界:如何进行多模态RAG评估
|
存储 SQL 数据库
SQL 语言:存储过程和触发器
SQL 语言:存储过程和触发器
263 6
|
Java Linux 容器
JVM内存问题之什么是OOM-Killer,它通常会在什么情况下触发
JVM内存问题之什么是OOM-Killer,它通常会在什么情况下触发
324 2
伪随机序列——m序列及MATLAB仿真
伪随机序列——m序列及MATLAB仿真
1510 2
|
缓存 Java 索引
一文读懂注解的底层原理
一文读懂注解的底层原理
534 1
|
安全 JavaScript 前端开发
CSP(Content Security Policy)可以解决什么问题?
CSP(Content Security Policy)可以解决什么问题?
166 0
|
Kubernetes 安全 数据安全/隐私保护
ACK场景下应用程序安全访问云资源最佳实践
在实际的容器安全实践中,怎么样避免应用程序永久访问密钥。本文会介绍基于云原生的产品能力来实现无AK方案。
524 6
ACK场景下应用程序安全访问云资源最佳实践

热门文章

最新文章