<rabbitmq实践>,讲得比较细,
我也用python实打了一下,
以后再用celery时,理解会更深入。
hello_world_producer.py
#coding=utf-8
import pika, sys
#建立到代理服务器的连接
credentials = pika.PlainCredentials("guest", "guest")
conn_params = pika.ConnectionParameters("localhost", credentials=credentials)
conn_broker = pika.BlockingConnection(conn_params)
#获得信道
channel = conn_broker.channel()
#声明交换器
channel.exchange_declare(exchange="hello-exchange",
exchange_type="direct",
passive=False,
durable=True,
auto_delete=False)
#从命令行获取纯文本消息
msg = sys.argv[1]
msg_props = pika.BasicProperties()
msg_props.content_type = "text/plain"
#发布消息
channel.basic_publish(body=msg,
exchange="hello-exchange",
properties=msg_props,
routing_key="hola")
hello_world_consumer.py
#coding=utf-8
import pika, sys
#建立到代理服务器的连接
credentials = pika.PlainCredentials("guest", "guest")
conn_params = pika.ConnectionParameters("localhost", credentials=credentials)
conn_broker = pika.BlockingConnection(conn_params)
#获得信道
channel = conn_broker.channel()
#声明交换器
channel.exchange_declare(exchange="hello-exchange",
exchange_type="direct",
passive=False,
durable=True,
auto_delete=False)
#声明队列
channel.queue_declare(queue="hello-queue")
#通过hola路由键将队列和交换器绑定
channel.queue_bind(queue="hello-queue",
exchange="hello-exchange",
routing_key="hola")
# 用于处理传入的消息的函数
def msg_consumer(channel, method, header, body):
# 消息确认
channel.basic_ack(delivery_tag=method.delivery_tag)
if body == b"quit":
#信上消费并退出
channel.basic_cancel(consumer_tag="hello-consumer")
channel.stop_consuming()
else:
print(body)
return
channel.basic_consume(msg_consumer,
queue="hello-queue",
consumer_tag="hello-consumer")
#开始消费
channel.start_consuming()