RabbitMQ(二) -- Work Queues

简介:

  RabbitMQ使用Work Queues的主要目的是为了避免资源使用密集的任务,它不同于定时任务处理的方式,而是把任务封装为消息添加到队列中。而消息队列正是共享于多个工作者中使用,它们可以随意pop出数据进行处理。

消息的持久化 Message durability

为了保证`rabbitmq`意外重启等原因造成的消息丢失,通过设置消息的durable来实现数据的持久化,但是需要生产者和消费者同时设置持久化才能生效。

需要注意的是,`rabbitmq`并不允许更改已经创建的消息队列的属性,假如之前已经创建过非持久化的hello消息队列,那么会返回一个错误信息。

设置消息队列的可持久化属性(第二个参数):

channel.queue_declare(queue='hello', durable=True)

在消息发送时,需要指定`delivery_mode`来实现消息持久化:

channel.basic_publish(exchange='', routing_key="task_queue", body=message, properties=pika.BasicProperties(delivery_mode = 2, # make message persistent))

平均分配 Fair dispatch

`rabbitmq`实现了消息均分的功能,通过设置`basic.qos`方法的`prefetch_count`来实现。它会告诉`rabbitmq`的生产者不要给一个消费者分配过多的任务,也就是说不要在消费者处理完成已经接收到的任务之前分配新的任务。

channel.basic_qos(prefetch_count=1)

其中prefetch_count为可以接受处理的任务个数,如果未达到上限rabbitmq会继续向消费者推送任务。

实例

生产者

复制代码
#!/usr/bin/env python
# coding=utf-8

import pika
import time

connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()

channel.queue_declare(queue='task_queue', durable=True)

for i in range(100):
    message = str(i) + ' Hello World!'
    channel.basic_publish(exchange='',
    routing_key='task_queue',
    body=message,
    properties=pika.BasicProperties(delivery_mode = 2, # make message persistent))
    print " [x] Sent %r" % (message,)
    time.sleep(1)
connection.close()
复制代码

消费者

复制代码
#!/usr/bin/env python
# coding=utf-8

import pika
import time

connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()

channel.queue_declare(queue='task_queue', durable=True)
print ' [*] Waiting for messages. To exit press CTRL+C'

def callback(ch, method, properties, body):
    print " [x] Received %r" % (body,)
    time.sleep(2)
    print " [x] Done"
    ch.basic_ack(delivery_tag = method.delivery_tag)

channel.basic_qos(prefetch_count=1)
channel.basic_consume(callback, queue='task_queue')

channel.start_consuming()
复制代码

 

知识共享许可协议
本文 由 cococo点点 创作,采用 知识共享 署名-非商业性使用-相同方式共享 3.0 中国大陆 许可协议进行许可。欢迎转载,请注明出处:
转载自:cococo点点 http://www.cnblogs.com/coder2012


相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
8月前
|
消息中间件
RabbitMQ消息模型之Work Queues
RabbitMQ消息模型之Work Queues
61 1
RabbitMQ消息模型之Work Queues
|
消息中间件 网络协议 调度
【RabbitMQ三】——RabbitMQ工作队列模式(Work Queues)(上)
【RabbitMQ三】——RabbitMQ工作队列模式(Work Queues)(上)
87 1
|
8月前
|
消息中间件 存储
RabbitMQ插件详解:rabbitmq_recent_history_exchange【RabbitMQ 七】
RabbitMQ插件详解:rabbitmq_recent_history_exchange【RabbitMQ 七】
154 0
|
8月前
|
消息中间件 存储 Java
RabbitMQ中的Queue是什么?它的特点是什么?
RabbitMQ中的Queue是什么?它的特点是什么?
95 0
|
消息中间件 调度
【RabbitMQ三】——RabbitMQ工作队列模式(Work Queues)(下)
【RabbitMQ三】——RabbitMQ工作队列模式(Work Queues)(下)
141 1
|
消息中间件 网络协议
三、RabbitMQ Work Queues
三、RabbitMQ Work Queues
三、RabbitMQ Work Queues
|
消息中间件 网络协议
RabbitMQ从入门到进阶(Work Queues)
RabbitMQ从入门到进阶(Work Queues)
137 0
|
消息中间件
【RabbitMQ】——简单队列和work模式
【RabbitMQ】——简单队列和work模式
140 0
【RabbitMQ】——简单队列和work模式
|
消息中间件
Failed to start RabbitMQ broker
Failed to start RabbitMQ broker
441 0
Failed to start RabbitMQ broker
|
消息中间件
RabbitMQ工作模式2 Work queues工作队列模式
RabbitMQ工作模式2 Work queues工作队列模式
154 0
RabbitMQ工作模式2  Work queues工作队列模式

相关实验场景

更多