rabbitmq工作队列实现高性能任务的负载分发[python实例]

简介:

更详细的rabbitmq实例文章在我的独立博客里面,有兴趣的朋友可以去看看。 


 http://xiaorui.cc/2014/11/17/%E4%BD%BF%E7%94%A8rabbitmq%E5%B7%A5%E4%BD%9C%E9%98%9F%E5%88%97%E5%AE%9E%E7%8E%B0%E4%BB%BB%E5%8A%A1%E7%9A%84%E8%B4%9F%E8%BD%BD%E5%88%86%E5%8F%91/


烦人的官方化介绍:

RabbitMQ是实现AMQP(高级消息队列协议)的消息中间件的一种,最初起源于金融系统,用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗。消息中间件主要用于组件之间的解耦,消息的发送者无需知道消息使用者的存在,反之亦然

先来理解他的一些个专有的名词 ~

概念:

channel:通道,amqp支持一个tcp连接上启用多个mq通信通道,每个通道都可以被作为通信流。


producer:生产者,是消息产生的源头。


exchange:交换机,可以理解为具有路由表的路由规则。


queues:队列,装载消息的缓存容器。


consumer:消费者,连接到队列并取走消息的客户端。


核心思想:在RabbitMQ中,生产者从不直接将消息发送给队列。


事实上,有些生产者甚至不知道消息是否被送到某个队列中去了。生产者只负责将消息送给交换机,而交换机确切地知道什么消息应该送到哪。


bind:绑定,实际上可以理解为交换机的路由规则。每个消息都有一个称为路由键的属性(routing key),就是一个简单的字符串。一个绑定将【交换机,路由键,消息送达队列】三者绑定在一起,形成一条路由规则。


exchange type:交换机类型:


fanout:不处理路由键,转发到所有绑定的队列上


direct:处理路由键,必须完全匹配,即路由键字符串相同才会转发


topic:路由键模式匹配,此时队列需要绑定要一个模式上。符号“#”匹配一个或多个词,符号“*”匹配不多不少一个词。因此“audit.#”能够匹配到“audit.irs.corporate”,但是“audit.*” 只会匹配到“audit.irs”


关于传说中的性能~

下图是一些个大牛做的综合的测试。

081656454.jpg

对于rabbitmq zeromq,我自己虽然没有专门的测试,但是实际应用中都有应用的。 zeromq的速度不用质疑,确实很快很快,但是他不做数据的存储持久化和可用性,要是client没有开启的话,他照样会把信息pub出去,不管你存不存在。但是rabbitmq就考虑了很多,看下图大家就知道了。

对头,rabbitmq虽然性能不能和zeromq相比,但是在项目中应用还算不错的。


081610883.png




要是不会安装,请看下图

082222887.jpg

下面是他所依赖的包~ 看到这个,大家应该知道他是erlang写的吧 !

082240413.jpg


写的一个小demo 发送端【生产者】,可以想成一个老板,把这次要做的事情到分给大家。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import  pika
import  sys
import  random
def  makepassword(rang  =  "23456789qwertyupasdfghjkzxcvbnm" , size  =  8 ):
     return  string.join(random.sample(rang, size)).replace( " " ,"")
parameters  =  pika.ConnectionParameters(host  =  'localhost'  )
connection  =  pika.BlockingConnection(parameters)
channel  =  connection.channel()
channel.queue_declare(queue  =  'task_queue'  , durable  =  True  )
message = makepassword()
channel.basic_publish(exchange  =  '',
                        routing_key  =  'task_queue'  ,
                        body  =  message,
                        properties  =  pika.BasicProperties(
                           delivery_mode  =  2  # make message persistent
                        ))


接收端【消费者】,可以想成是工人的角色。有几个工人,就几个工人,就几个工人一块干活。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import  pika
import  time
connection  =  pika.BlockingConnection(pika.ConnectionParameters(host  =  'localhost'  ))
channel  =  connection.channel()
channel.queue_declare(queue  =  'task_queue'  , durable  =  True  )
print  ' [*] Waiting for messages. To exit press CTRL+C'
                                                                                                                                                                                                                                                                                                                               
def  callback(ch, method, properties, body):
      print  " [x] Received %r"  %  (body,)
      print  " [x] Done"
      ch.basic_ack(delivery_tag  =  method.delivery_tag)
                                                                                                                                                                                                                                                                                                                               
channel.basic_qos(prefetch_count  =  1  )
channel.basic_consume(callback,
                        queue  =  'task_queue'  )
channel.start_consuming()


默认来说,RabbitMQ会按顺序得把消息发送给每个消费者(consumer)。平均每个消费者都会收到同等数量得消息。这种发送消息得方式叫做——轮询(round-robin)。试着添加三个或更多得工作者(workers)。


从上面来看,每个工作者,都会依次分配到任务。那么如果一个工作者,在处理任务的时候挂掉,这个任务就没有完成,应当交由其他工作者处理。所以应当有一种机制,当一个工作者完成任务时,会反馈消息。

1
2
3
4
5
6
7
def callback(ch, method, properties, body):
     print  " [x] Received %r"  % (body,)
     print “正在搞呀”
     print  " [x] Done"
     ch.basic_ack(delivery_tag = method.delivery_tag)
channel.basic_consume(callback,
                       queue= 'hello' )


运行上面的代码,我们发现即使使用CTRL+C杀掉了一个工作者(worker)进程,消息也不会丢失。当工作者(worker)挂掉这后,所有没有响应的消息都会重新发送。


如果你没有特意告诉RabbitMQ,那么在它退出或者崩溃的时候,它将会流失所有的队列和消息。为了确保信息不会丢失,有两个事情是需要注意的:我们必须把“队列”和“消息”设为持久化。

1
channel.queue_declare(queue= 'task_queue' , durable=True)


这两条就是为啥rabbitmq比zeromq更可靠的原因 !

还有一个就是针对消费者的分发的策略。

实现负载均衡,可以在消费者端通知RabbitMQ,一个消息处理完之后才会接受下一个消息。

162748309.png


1
channel.basic_qos(prefetch_count= 1 )


注意:要防止如果所有的消费者都在处理中,则队列中的消息会累积的情况。


对于这样的情况,要不就先用redis中转下,要不就多加work工作者来更多的处理任务。


总结:

   rabbitmq的这种任务派发用法还是不错的,值得使用~







 本文转自 rfyiamcool 51CTO博客,原文链接:http://blog.51cto.com/rfyiamcool/1322428,如需转载请自行联系原作者


相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
12天前
|
数据采集 存储 监控
21个Python脚本自动执行日常任务(2)
21个Python脚本自动执行日常任务(2)
52 7
21个Python脚本自动执行日常任务(2)
|
15天前
|
Python
Python中的函数是**一种命名的代码块,用于执行特定任务或计算
Python中的函数是**一种命名的代码块,用于执行特定任务或计算
42 18
|
16天前
|
数据采集 分布式计算 大数据
构建高效的数据管道:使用Python进行ETL任务
在数据驱动的世界中,高效地处理和移动数据是至关重要的。本文将引导你通过一个实际的Python ETL(提取、转换、加载)项目,从概念到实现。我们将探索如何设计一个灵活且可扩展的数据管道,确保数据的准确性和完整性。无论你是数据工程师、分析师还是任何对数据处理感兴趣的人,这篇文章都将成为你工具箱中的宝贵资源。
|
29天前
|
运维 监控 网络安全
自动化运维的崛起:如何利用Python脚本简化日常任务
【10月更文挑战第43天】在数字化时代的浪潮中,运维工作已从繁琐的手工操作转变为高效的自动化流程。本文将引导您了解如何运用Python编写脚本,以实现日常运维任务的自动化,从而提升工作效率和准确性。我们将通过一个实际案例,展示如何使用Python来自动部署应用、监控服务器状态并生成报告。文章不仅适合运维新手入门,也能为有经验的运维工程师提供新的视角和灵感。
|
1月前
|
运维 监控 Python
自动化运维:使用Python脚本简化日常任务
【10月更文挑战第36天】在数字化时代,运维工作的效率和准确性成为企业竞争力的关键。本文将介绍如何通过编写Python脚本来自动化日常的运维任务,不仅提高工作效率,还能降低人为错误的风险。从基础的文件操作到进阶的网络管理,我们将一步步展示Python在自动化运维中的应用,并分享实用的代码示例,帮助读者快速掌握自动化运维的核心技能。
81 3
|
1月前
|
运维 监控 Linux
自动化运维:如何利用Python脚本优化日常任务##
【10月更文挑战第29天】在现代IT运维中,自动化已成为提升效率、减少人为错误的关键技术。本文将介绍如何通过Python脚本来简化和自动化日常的运维任务,从而让运维人员能够专注于更高层次的工作。从备份管理到系统监控,再到日志分析,我们将一步步展示如何编写实用的Python脚本来处理这些任务。 ##
|
29天前
|
调度 数据库 Python
掌握Python中的异步编程,提升I/O密集型任务的性能
掌握Python中的异步编程,提升I/O密集型任务的性能
40 0
|
消息中间件 Linux
centos7 yum快速安装rabbitmq服务
centos7 yum快速安装rabbitmq服务
233 0
|
消息中间件 中间件 微服务
RabbitMQ 入门简介及安装
RabbitMQ 入门简介及安装
126 0
|
消息中间件 Ubuntu Shell
ubuntu安装rabbitmq教程 避坑
ubuntu安装rabbitmq教程 避坑
518 0
下一篇
DataWorks