一 、说明
使用Python操作RabbitMQ的书籍以及例子,少之又少。翻遍了网上所有的例子,发现十个有9个半不能运行的,这半个你还得修改。
原因很简单,要么例子的Python版本太低了,要么例子的RabbitMQ的版本太低了。所以造成了一系列文字。
让我很痛苦,决定下笔写一篇关于这个的文章。
Python3.x+RabbitMQ+Docker+Centos
二、安装RabbitMQ
为了此篇文章只突出Python+RabbitMQ,就单独写了一篇文章给大家:
三、编写操作的代码
6种模式
这里我们使用pika来操作RabbitMQ
pip install pika
(一)、简单的RabbitMQ消息队列(不安全,不能持久化)
发送端
send.py
import pika #你的RabbitMQ的地址 host = "替换成自己的RabbitMQ服务器的IP" #RabbitMQ端口号 post = 5672 #创建的账号,当然了也可以使用默认的guest账号,密码也是guest username = "admin" #账号的密码 password = "123456" # 创建一个有凭证的新实例 credentials = pika.PlainCredentials(username, password) # 使用凭证连接RabbitMQ服务器 connection = pika.BlockingConnection(pika.ConnectionParameters(host,post,credentials=credentials)) #声明一个管道 channel = connection.channel() #指定队列的名字 queueName="hello" #说明使用的队列,如果没有会自动创建 channel.queue_declare(queueName) #发送的msg消息 msg = "Hello TrueDei" #n RabbitMQ a message can never be sent directly to the queue, it always needs to go through an exchange. channel.basic_publish(exchange='', routing_key=queueName, body=msg) print(" [x] Sent 'Hello TrueDei'") connection.close()
发送结果:
可以再web上看到,也收到了
接收端
resv.py
import pika #你的RabbitMQ的地址 host = "替换成自己的IP" #RabbitMQ端口号 post = 5672 #创建的账号,当然了也可以使用默认的guest账号,密码也是guest username = "admin" #账号的密码 password = "123456" # 创建一个有凭证的新实例 credentials = pika.PlainCredentials(username, password) # 使用凭证连接RabbitMQ服务器 connection = pika.BlockingConnection(pika.ConnectionParameters(host,post,credentials=credentials)) #声明一个管道 channel = connection.channel() #指定队列的名字 queueName="hello" #说明使用的队列,如果没有会自动创建 channel.queue_declare(queueName) #将ReceivedMessage添加到队列中,同时替换通道实现。 #返回的结果,会返回到这里面,如果有兴趣可以点开basic_consume方法去看看源代码 def callback(ch, method, properties, body): print(" [x] Received %r" % body) #从服务器队列消费。 # no_ack=True ,是需要是否确定消息的处理了,告诉服务端 # no_ack=False ,默认是False,可以不写 channel.basic_consume(queueName,callback,False) print(' [*] Waiting for messages. To exit press CTRL+C') channel.start_consuming()
Python收消息:
注意:接收到处于死循环,一直在等待接收,发送一个数据,就收到一个数据
(二)、深入理解消息队列
1、当有1个生产者,n个消费者时
基于上面的代码不做任何修改 把上面的消费者开N个就是想要的结果。 如下: 运行3个消费者,生产者生成的消息队列依次被接收者接收
2、处理消息安全问题(缺持久化)
基于上面代码,如果消费者出问题了,消息发送将无人接收。 即便再次启动消费者,之前发生的消息将一直存在队列中
生产者
send_msg_safe.py
import pika import time #你的RabbitMQ的地址 host = "替换成自己的IP" #RabbitMQ端口号 post = 5672 #创建的账号,当然了也可以使用默认的guest账号,密码也是guest username = "admin" #账号的密码 password = "123456" # 创建一个有凭证的新实例 credentials = pika.PlainCredentials(username, password) # 使用凭证连接RabbitMQ服务器 connection = pika.BlockingConnection(pika.ConnectionParameters(host,post,credentials=credentials)) #声明一个管道 channel = connection.channel() message = "Hello World! %s" % time.time() print("产生的消息:" ,message) channel.basic_publish(exchange='', routing_key='task_queue', # body=message, # properties=pika.BasicProperties( # delivery_mode=2, # make message persistent # ) ) print(" [x] Sent %r" % message) connection.close()
消费者
recv_msg_safe.py
import pika, time #你的RabbitMQ的地址 host = "替换成自己的IP" #RabbitMQ端口号 post = 5672 #创建的账号,当然了也可以使用默认的guest账号,密码也是guest username = "admin" #账号的密码 password = "123456" # 创建一个有凭证的新实例 credentials = pika.PlainCredentials(username, password) # 使用凭证连接RabbitMQ服务器 connection = pika.BlockingConnection(pika.ConnectionParameters(host,post,credentials=credentials)) #声明一个管道 channel = connection.channel() def callback(ch, method, properties, body): print(" [x] Received %r" % body) time.sleep(20) print(" [x] Done") print("method.delivery_tag",method.delivery_tag) ch.basic_ack(delivery_tag=method.delivery_tag) # 再进行手动确认 channel.basic_consume('task_queue',callback,False) print(' [*] Waiting for messages. To exit press CTRL+C') channel.start_consuming()
存在问题:
在于消费者:消费者处理好的消息,需要给服务端回信息 # no_ack=True ,是需要是否确定消息的处理了,告诉服务端 # no_ack=False ,默认是False,可以不写 # callback 函数后面需要添加 ch.basic_ack(delivery_tag=method.delivery_tag) # 再进行手动确认
3、处理消息安全且持久化
基于上面的代码,如果重启了rabbitmq,则存在的消息就消失。需要做到消息持久化。(消息安全且持久化)
生产者
send_msg.py
import pika #你的RabbitMQ的地址 host = "替换成自己的IP" #RabbitMQ端口号 post = 5672 #创建的账号,当然了也可以使用默认的guest账号,密码也是guest username = "admin" #账号的密码 password = "123456" # 创建一个有凭证的新实例 credentials = pika.PlainCredentials(username, password) # 使用凭证连接RabbitMQ服务器 connection = pika.BlockingConnection(pika.ConnectionParameters(host,post,credentials=credentials)) #声明一个管道 channel = connection.channel() # durable=True:在代理重新启动后仍然存在 channel.queue_declare(queue='hello10',durable=True) channel.basic_publish(exchange='', routing_key='hello10', body='Hello World!', properties = pika.BasicProperties( #消息持久化 delivery_mode = 2,) ) print(" [x] Sent 'Hello World!'") # 关闭队列 connection.close()
消费者
recv_msg.py
import pika,time #你的RabbitMQ的地址 host = "替换成自己的IP" #RabbitMQ端口号 post = 5672 #创建的账号,当然了也可以使用默认的guest账号,密码也是guest username = "admin" #账号的密码 password = "123456" # 创建一个有凭证的新实例 credentials = pika.PlainCredentials(username, password) # 使用凭证连接RabbitMQ服务器 connection = pika.BlockingConnection(pika.ConnectionParameters(host,post,credentials=credentials)) #声明一个管道 channel = connection.channel() channel.queue_declare(queue='hello10',durable=True) def callback(ch, method, properties, body): print(" [x] Received %r" % body) time.sleep(10) ch.basic_ack(delivery_tag=method.delivery_tag) channel.basic_consume('hello10',callback,False) print(' [*] Waiting for messages. To exit press CTRL+C') channel.start_consuming()
存在问题:
问题再于生产者的消息需要被持久化 durable=True:功能是,告诉服务,重启后消息依然存在 channel.queue_declare(queue='hello10',durable=True) properties = pika.BasicProperties( delivery_mode = 2,) )
(三)、消息的发布、订阅以及广播模式
之前的例子都基本都是1对1的消息发送和接收,即消息只能发送到指定的queue里, 但有些时候你想让你的消息被所有的Queue收到,类似广播的效果,这时候就要用到exchange了, Exchange在定义的时候是有类型的,以决定到底是哪些Queue符合条件,可以接收消息 fanout: 所有bind到此exchange的queue都可以接收消息 direct: 通过routingKey和exchange决定的那个唯一的queue可以接收消息 topic:所有符合routingKey(此时可以是一个表达式)的routingKey所bind的queue可以接收消息 表达式符号说明:#代表一个或多个字符,*代表任何字符 例:#.a会匹配a.a,aa.a,aaa.a等 *.a会匹配a.a,b.a,c.a等 注:使用RoutingKey为#,Exchange Type为topic的时候相当于使用fanout
三种最常用的交换机
direct:“直接连接交换机” topic:“主题路由匹配交换机” fanout:“无路由交换机”
1、fanout交换类型
fanout类型的Exchange路由规则非常简单,它会把所有发送到该Exchange的消息路由到所有与它绑定的Queue中。
上图中,生产者(P)发送到Exchange(X)的所有消息都会路由到图中的两个Queue, 并最终被两个消费者(C1与C2)消费。
2、direct交换类型
direct类型的Exchange路由规则也很简单, 它会把消息路由到那些binding key与routing key完全匹配的Queue中。
以上图的配置为例,我们以routingKey=”error”发送消息到Exchange,则消息会路由到Queue1(amqp.gen-S9b…,这是由RabbitMQ自动生成的Queue名称)和Queue2(amqp.gen-Agl…);如果我们以routingKey=”info”或routingKey=”warning”来发送消息,则消息只会路由到Queue2。如果我们以其他routingKey发送消息,则消息不会路由到这两个Queue中。
3、topic交换类型
前面讲到direct类型的Exchange路由规则是完全匹配binding key与routing key, 但这种严格的匹配方式在很多情况下不能满足实际业务需求。 topic类型的Exchange在匹配规则上进行了扩展,它与direct类型的Exchage相似, 也是将消息路由到binding key与routing key相匹配的Queue中,但这里的匹配规则有些不同,它约定:
- routing key为一个句点号“. ”分隔的字符串(我们将被句点号“. ”分隔开的每一段独立的字符串称为一个单词),如“stock.usd.nyse”、“nyse.vmw”、“quick.orange.rabbit”
- binding key与routing key一样也是句点号“. ”分隔的字符串
- binding key中可以存在两种特殊字符“”与“#”,用于做模糊匹配,其中“”用于匹配一个单词,“#”用于匹配多个单词(可以是零个)
以上图中的配置为例,routingKey=”quick.orange.rabbit”的消息会同时路由到Q1与Q2,routingKey=”lazy.orange.fox”的消息会路由到Q1与Q2,routingKey=”lazy.brown.fox”的消息会路由到Q2,routingKey=”lazy.pink.rabbit”的消息会路由到Q2(只会投递给Q2一次,虽然这个routingKey与Q2的两个bindingKey都匹配);routingKey=”quick.brown.fox”、routingKey=”orange”、routingKey=”quick.orange.male.rabbit”的消息将会被丢弃,因为它们没有匹配任何bindingKey。
1、广播模式(fanout,直接连接交换机),发送一个消息,无论有多少接收端,只要在,就能收到,不在就不能收到
生产者
send.py
import pika #你的RabbitMQ的地址 host = "你的RabbitMQ的地址" #RabbitMQ端口号 post = 5672 #创建的账号,当然了也可以使用默认的guest账号,密码也是guest username = "admin" #账号的密码 password = "123456" # 创建一个有凭证的新实例 credentials = pika.PlainCredentials(username, password) # 使用凭证连接RabbitMQ服务器 connection = pika.BlockingConnection(pika.ConnectionParameters(host,post,credentials=credentials)) #声明一个管道 channel = connection.channel() #设置交换器的名字和要使用的交换类型 channel.exchange_declare(exchange='logs',exchange_type='fanout') #设置消息 message = "info: Hello World!" channel.basic_publish(exchange='logs', routing_key='', #不指定,留空 body=message) print(" [x] Sent %r" % message) connection.close()
消费者
recv.py
import pika #你的RabbitMQ的地址 host = "你的RabbitMQ的地址" #RabbitMQ端口号 post = 5672 #创建的账号,当然了也可以使用默认的guest账号,密码也是guest username = "admin" #账号的密码 password = "123456" # 创建一个有凭证的新实例 credentials = pika.PlainCredentials(username, password) # 使用凭证连接RabbitMQ服务器 connection = pika.BlockingConnection(pika.ConnectionParameters(host,post,credentials=credentials)) #声明一个管道 channel = connection.channel() #设置交换器的名字和要使用的交换类型 channel.exchange_declare(exchange='logs',exchange_type='fanout') # 不指定queue名字,rabbit会随机分配一个名字,exclusive=True会在使用此queue的消费者断开后,自动将queue删除 result = channel.queue_declare(queue='',exclusive=True) #获取生成的队列名字 queue_name = result.method.queue channel.queue_bind(exchange='logs', queue=queue_name) print(' [*] Waiting for logs. To exit press CTRL+C') def callback(ch, method, properties, body): print(" [x] %r" % body) channel.basic_consume(queue_name,callback, True) channel.start_consuming()
2、组播模式(direct,直接连接交换机)。
RabbitMQ还支持根据关键字发送,即:队列绑定关键字,发送者将数据根据关键字发送到消息exchange,exchange根据 关键字 判定应该将数据发送至指定队列。
send端根据关键字指定发送内容
可发送info,warning,error
recv端根据关键字指定接收内容
可接收info,warning,error
生产者
send.py
import pika import sys #sys模块再运行的时候,可以接受用户输入的值 #你的RabbitMQ的地址 host = "你的RabbitMQ的地址" #RabbitMQ端口号 post = 5672 #创建的账号,当然了也可以使用默认的guest账号,密码也是guest username = "admin" #账号的密码 password = "123456" # 创建一个有凭证的新实例 credentials = pika.PlainCredentials(username, password) # 使用凭证连接RabbitMQ服务器 connection = pika.BlockingConnection(pika.ConnectionParameters(host,post,credentials=credentials)) #声明一个管道 channel = connection.channel() channel.exchange_declare(exchange='direct_logs',exchange_type='direct') #如果运行(python test.py hello) 的话,sys.argv[1]就可以拿到这个hello这个词 severity = sys.argv[1] if len(sys.argv) > 1 else 'info' #如果运行的时候,后面跟了数据,就替换info这个值,否则默认就是info # severity = 'info' #如果运行的时候,后面跟了数据,就替换info这个值,否则默认就是info #消息体 message = ' '.join(sys.argv[2:]) or 'Hello World!' #拿到后面跟的第二个值,默认是Hello World! #绑定交换的类型,绑定队列,绑定发送的消息 channel.basic_publish(exchange='direct_logs',routing_key=severity,body=message) print(" [x] Sent %r:%r" % (severity, message)) connection.close()
消费者
recv.py
import pika import sys #你的RabbitMQ的地址 host = "你的RabbitMQ的地址" #RabbitMQ端口号 post = 5672 #创建的账号,当然了也可以使用默认的guest账号,密码也是guest username = "admin" #账号的密码 password = "123456" # 创建一个有凭证的新实例 credentials = pika.PlainCredentials(username, password) # 使用凭证连接RabbitMQ服务器 connection = pika.BlockingConnection(pika.ConnectionParameters(host,post,credentials=credentials)) #声明一个管道 channel = connection.channel() channel.exchange_declare(exchange='direct_logs', exchange_type='direct') result = channel.queue_declare(queue='',exclusive=True) queue_name = result.method.queue severities = sys.argv[1:] if not severities: sys.stderr.write("Usage: %s [info] [warning] [error]\n" % sys.argv[0]) sys.exit(1) for severity in severities: channel.queue_bind( queue_name, 'direct_logs', routing_key=severity) print(' [*] Waiting for logs. To exit press CTRL+C') def callback(ch, method, properties, body): print(" [x] %r:%r" % (method.routing_key, body)) channel.basic_consume( queue_name, callback, True) channel.start_consuming()
结果:建议放大看
说明:
在上图中可以看到运行的命令:
send.py 执行示例,需说明发送的内容级别 根据Python代码可以给出公式:python send.py 模式 发送的消息 python send.py info TestInfo1 python send.py warning TestWarning1 python send.py error TestError1 recv.py 执行示例,需说明接收代码内容 执行示例,需说明发送的内容级别 根据Python代码可以给出公式:python recv.py 模式 python recv.py info python recv.py warning python recv.py error
3、组播模式之基于组播模式(topic,主题路由匹配交换机),实现更细致的划分不同种类的信息。
形象点可以如下图:
生产者
topic_send.py
import pika import sys #你的RabbitMQ的地址 host = "你的RabbitMQ的地址" #RabbitMQ端口号 post = 5672 #创建的账号,当然了也可以使用默认的guest账号,密码也是guest username = "admin" #账号的密码 password = "123456" # 创建一个有凭证的新实例 credentials = pika.PlainCredentials(username, password) # 使用凭证连接RabbitMQ服务器 connection = pika.BlockingConnection(pika.ConnectionParameters(host,post,credentials=credentials)) #声明一个管道 channel = connection.channel() #指定使用的交换类型和交换器 channel.exchange_declare(exchange='topic_logs',exchange_type='topic') #如果有输入,就拿到输入的第一个数据为队列,否则默认为:anonymous.info(匿名的,当然了,可以随便修改哦) routing_key = sys.argv[1] if len(sys.argv) > 1 else 'anonymous.info' #如果输入的数据,存在第二个,那么就把第二个当作消息,发送出去。否则默认消息就是:Hello World! message = ' '.join(sys.argv[2:]) or 'Hello World!' #绑定交换的类型,绑定队列,绑定发送的消息 channel.basic_publish(exchange='topic_logs', routing_key=routing_key, body=message) #提示 print(" [x] Sent %r:%r" % (routing_key, message)) #关闭 connection.close() # python topic_send.py python.error test 发送了一条python的错误信息,错误内容为test # python topic_send.py mysql.info hello 发送了一条mysql的信息,信息内容为hello
消费者
topic_recv.py
import pika import sys #你的RabbitMQ的地址 host = "你的RabbitMQ的地址" #RabbitMQ端口号 post = 5672 #创建的账号,当然了也可以使用默认的guest账号,密码也是guest username = "admin" #账号的密码 password = "123456" # 创建一个有凭证的新实例 credentials = pika.PlainCredentials(username, password) # 使用凭证连接RabbitMQ服务器 connection = pika.BlockingConnection(pika.ConnectionParameters(host,post,credentials=credentials)) #声明一个管道 channel = connection.channel() #指定使用的交换类型和交换器 channel.exchange_declare(exchange='topic_logs',exchange_type='topic') #自动产生一个队列,exclusive=True:自动销毁 result = channel.queue_declare(queue='',exclusive=True) #获取自己产生的队列名字 queue_name = result.method.queue #拿到输入的第一个数为key,就在这个上面监听 binding_keys = sys.argv[1:] if not binding_keys: sys.stderr.write("Usage: %s [binding_key]...\n" % sys.argv[0]) sys.exit(1) for binding_key in binding_keys: channel.queue_bind(exchange='topic_logs',queue=queue_name,routing_key=binding_key) print(' [*] Waiting for logs. To exit press CTRL+C') def callback(ch, method, properties, body): print(" [x] %r:%r" % (method.routing_key, body)) channel.basic_consume( queue_name,callback,True) channel.start_consuming() # python topic_recvive.py # 使用" # "号收所有 # python topic_recvive.py mysql.* 使用"mysql.* "号收来自mysql的信息 # python topic_recvive.py mysql.error.* 使用"mysql.error.* "号收来自mysql的错误信息 # python topic_recvive.py *.django.* 使用"*.django.* "号收来自所有Django的信息
测试结果1
测试结果2
还有监听全部的可以收到
测试结果3
总结:
发送端1
python topic_send.py mysql.info ThisMysqlInfoMsg [x] Sent 'mysql.info':'ThisMysqlInfoMsg' python topic_send.py mysql.error.Insert ThisMysqlErrorInsertMsg [x] Sent 'mysql.error.Insert':'ThisMysqlErrorInsertMsg' python topic_send.py python.data Pythonaaaa [x] Sent 'python.data':'Pythonaaaa'
接收端1
python topic_recv.py # [*] Waiting for logs. To exit press CTRL+C [x] 'mysql.info':b'ThisMysqlInfoMsg' [x] 'mysql.error.Insert':b'ThisMysqlErrorInsertMsg' [x] 'python.data':b'Pythonaaaa'
接收端2
python topic_recv.py mysql.* [*] Waiting for logs. To exit press CTRL+C [x] 'mysql.info':b'ThisMysqlInfoMsg'
接收端3
python topic_recv.py mysql.error.* [*] Waiting for logs. To exit press CTRL+C [x] 'mysql.error.Insert':b'ThisMysqlErrorInsertMsg'
接收端4
python topic_recv.py python.* [*] Waiting for logs. To exit press CTRL+C [x] 'python.data':b'Pythonaaaa'
四、问题集整理以及常见的错误
1、错误码403
账号密码错误
2、错误码404
出现404,大多数就是连接的地址有问题,或者断网了也会造成
3、错误码405
出现这个405,肯定是有已经在运行的程序了,被占用了。要先结束掉,才可以运行这个
4、新版与老版本的常见问题
第一处:关于callback与queue_name的位置
老版本:callback与queue_name的位置换了
channel.basic_consume( callback,queue_name ,True)
新版本:callback与queue_name的位置换了
channel.basic_consume( queue_name,callback,True)
第二处:关于队列名
老版本:可以不指定队列,就会自动生成
result = channel.queue_declare(exclusive=True)
新版本:必须指定一个空的就行
result = channel.queue_declare(queue='',exclusive=True)
参考文章
https://blog.csdn.net/fwk19840301/article/details/92986072
https://blog.csdn.net/banzhi8397/article/details/101392965