RabbitMQ消息队列
RabbitMQ具有更加灵活的路由规则,且拥有消息确认机制,所以RabbitMQ比较适合作为任务池和指令池的载体。下面将对RabbitMQ的工作原理及常用场景进行深入讲解。
RabbitMQ的内部可以分成两部分:交换机部分和消息队列部分。一般情况下,交换机和消息队列都需要手动创建,且需要使用绑定键以绑定交换机和消息队列的关系。交换机和消息队列的绑定关系可以是多对多的,绑定键可以标识多个绑定关系。
当生产者程序发送消息时,需要指定交换机和路由键。生产者程序所发送的消息会先被指定交换机接收,之后RabbitMQ会根据三个因素把消息发送到相关的消息队列中。判断的三个因素为该交换机的类型(直发、广播等)、该交换机与消息队列的绑定关系,以及生产者程序指定的路由键与绑定键是否匹配。当消息匹配不到相关消息队列时,会被丢弃。
消费者程序监听指定消息队列,当指定消息队列接收到信息后,RabbitMQ会把消息发送到该消费者程序中。默认情况下,消息需要被确认(消费者程序向RabbitMQ发送消息确认信息)后才会被删除。在消息还没被确认的情况下,如果消费者程序异常退出(如断开连接),则该消息会被重新放回消息队列中。当然,在监听消息队列时如果设置了自动确认,则消息会被自动删除。另外,多个消费者程序可以监听同一个消息队列,但一个消息只会被一个消费者程序获取。RabbitMQ的工作原理如图5.19所示。
图5.19 RabbitMQ的工作原理
1.RabbitMQ的基本操作
RabbitMQ的基本操作如代码5.8所示,包括创建与销毁连接、创建与销毁交换机、创建与销毁消息队列、绑定交换机和消息队列、发送消息及接收消息。值得一提的是,无论是生产者程序还是消费者程序,都最好在发送消息或接受消息前做一下创建交换机、创建消息队列和绑定交换机与消息队列的动作,避免因为消息队列或交换机不存在而发生的错误。
说明:示例代码5.8是使用C++编写的,除了C++以外,RabbitMQ还支持其他语言,如Python、Java、Ruby、PHP等。虽然实际使用的开发语言不尽相同,但RabbitMQ的调用方式是大同小异的。
代码5.8 RabbitMQ的基本操作
//RabbitMQ相关头文件,需要先安装相关的库(librabbitmq-dev)
#include <amqp.h>
#include <amqp_tcp_socket.h>
//定义连接变量,connecton为连接,channel为连接的通道,与RabbitMQ通信都需要加上
二者
amqp_connection_state_t connection = amqp_new_connection();
int channel = 1;
//创建连接
//打开连接,并登录RabbitMQ,需要指定RabbitMQ的IP地址、端口、账号和密码
//"/"为默认的虚拟主机,一个RabbitMQ服务可以开设多个虚拟主机,用作服务隔离
//虚拟主机需要设置开通
amqp_socket_open(socket, "IP地址", "端口");amqp_socket_t *socket = amqp_tcp_socket_new(connection);
amqp_login(connection, "/", 0, 131072, 0, AMQP_SASL_METHOD_PLAIN,
"账号", "密码");
//建立通道,channel为通道序号。一个程序可以打开多个通道以达到建立多个连接的效果
amqp_channel_open(connection, channel);
//销毁连接
//关闭通道和关闭连接
amqp_channel_close(connection, channel, AMQP_REPLY_SUCCESS);
amqp_connection_close(connection, AMQP_REPLY_SUCCESS);
//创建与销毁交换机
//创建交换机,需要设置交换机名称和交换机类型
//交换机类型包括direct(直发,默认类型)、fanout(广播)和topic(主题)
amqp_exchange_declare(connection, channel, amqp_cstring_bytes("交换机
名称"),
amqp_cstring_bytes("交换机类型"), 0, 0, 0, 0,
amqp_empty_table);
//销毁交换机,需要设置交换机名称
amqp_exchange_delete(connection, channel, amqp_cstring_bytes("交换机
名称"), 1);
//创建与销毁消息队列
//创建消息队列
const char* queueNameStr = "消息队列名称";
//消息队列是否持久化(重启后消息仍不丢失),0/1对应false/true
amqp_boolean_t durable = 1;
//消息队列是否在断开连接后自动删除,0/1对应false/true
amqp_boolean_t autodelete = 0;
amqp_queue_declare(connection, channel, amqp_cstring_bytes
(queueNameStr), 0,
durable, 0, autodelete, amqp_empty_table);
//销毁消息队列
amqp_queue_delete(connection, channel, amqp_cstring_bytes
(queueNameStr), 1, 0);
//绑定与解绑
//绑定交换机与消息队列,交换机和消息队列可以绑定多个绑定键
const char* queueNameStr = "消息队列名称";
const char* exchange = "交换机名称";
//当交换机类型为fanout(广播)时不生效
const char* bindingkey = "绑定键名称";
amqp_queue_bind(connection, channel, amqp_cstring_bytes(queueNameStr),
amqp_cstring_bytes(exchange), amqp_cstring_bytes
(bindingkey),
amqp_empty_table);
//解除绑定
amqp_queue_unbind(connection, channel, amqp_cstring_bytes(queue
NameStr),
amqp_cstring_bytes(exchange), amqp_cstring_bytes
(bindingkey),amqp_empty_table);
//发送消息
const char* exchange = "交换机名称";
const char* routingkey = "路由键名称";
const char* messagebody = "发送的消息";
//设置消息的相关信息
amqp_basic_properties_t props;
props._flags = AMQP_BASIC_CONTENT_TYPE_FLAG |
AMQP_BASIC_DELIVERY_MODE_FLAG; //与下面的配置对应//消息主体的类型
props.content_type = amqp_cstring_bytes("text/plain");
props.delivery_mode = 2; //持久化消息
//发送消息
amqp_basic_publish(connection, channel, amqp_cstring_bytes(exchange),
amqp_cstring_bytes(routingkey), 0, 0, &props,
amqp_cstring_bytes(messagebody);
//获取消息与消息确认
//是否自动确认消息,0/1对应false/true,0代表需要手动确认
amqp_boolean_t noack = 0;
const char* queueNameStr = "消息队列名称";
//订阅消息队列,这个函数只需要调用一次,即使获取多次消息
amqp_basic_consume(connection, channel, amqp_cstring_bytes(queue
NameStr),
amqp_empty_bytes, 0, noack, 0,amqp_empty_table);
//获取消息,获取消息可以多次调用,无消息时会自动阻塞
amqp_envelope_t envelope; //定义接收消息的变量
amqp_maybe_release_buffers(connection); //清理buffersamqp_consume_message(connection, &envelope, NULL, 0); //获取消息
envelope.message.body.bytes; //消息主体的开始指针(char *)envelope.message.body.len; //消息主体的长度
//确认消息,如果订阅时设置为自动确认,则此处不需要调用
//envelope.delivery_tag为所获取信息的标识
amqp_basic_ack(connection, channel, envelope.delivery_tag, 0);
2.场景一:把消息发送到指定消息队列
当消息只需要被发送到一个指定消息队列时,可以直接使用默认的交换机(不需要另外创建交换机)。发送消息时,交换机设置为默认交换机,路由键设置为消息队列的名称,消息就可以发送到指定消息队列了。另外,默认的交换机与消息队列是自动绑定的,不需要额外手动绑定。把消息发送到指定消息队列的流程如图5.20所示。
图5.20 把消息发送到指定消息队列的流程
生产者程序利用默认交换机把消息发送到指定消息队列的代码如代码5.9所示,其中,默认交换机是默认存在的,消息队列创建后会自动与默认交换机绑定。
代码5.9 生产者程序利用默认交换机把消息发送到指定消息队列
const char* exchange = ""; //默认交换机名称(名称为空字符)
//路由键设置为要发送到的消息队列名称
const char* routingkey = "要发送到的消息队列名称";
//设置消息的相关信息
amqp_basic_properties_t props;
props._flags = AMQP_BASIC_CONTENT_TYPE_FLAG |
AMQP_BASIC_DELIVERY_MODE_FLAG; //与下面的配置对应props.content_type = amqp_cstring_bytes("text/plain"); //消息主体的类型
props.delivery_mode = 2; //持久化消息
//发送消息
const char* messagebody = "发送的消息";
amqp_basic_publish(connection, channel, amqp_cstring_bytes(exchange),
amqp_cstring_bytes(routingkey), 0, 0, &props,
amqp_cstring_bytes(messagebody);
默认情况下,当多个消费者程序监听同一个消息队列时,消息队列接收到消息后,会把该消息发送给其中一个消费者程序,不论该消费者程序是否还有未处理完的消息(未确认的消息)。这样的分发机制可能会造成有的消费者程序积压太多消息,而有的消费者程序只有几个消息的情况。因此,当多个消费者程序监听同一个消息队列时,一般需要保证分发公平。分发公平需要每个消费者程序在没确认消息之前,不接收新的消息,这个设置如代码5.10所示。
代码5.10 消费者程序在没确认消息之前不接收新消息的设置//设置在没确认消息之前不接收新消息,需要在订阅消息队列前设置
uint16_t prefetchCount = 1;
amqp_basic_qos(connection, channel, 0, prefetchCount, 0);
//订阅消息队列,只调用一次即可
amqp_boolean_t noack = 0; //需要设置为消息需要手动确认
const char* queueNameStr = "消息队列名称";
amqp_basic_consume(connection, channel, amqp_cstring_bytes
(queueNameStr),
amqp_empty_bytes, 0, noack, 0,amqp_empty_table);
//获取消息,可以多次调用,无消息时会自动阻塞
amqp_envelope_t envelope; //定义接收消息的变量
amqp_maybe_release_buffers(connection); //清理buffersamqp_consume_message(connection, &envelope, NULL, 0); //获取消息
envelope.message.body.bytes; //消息主体的开始指针(char *)envelope.message.body.len; //消息主体的长度
//确认消息,确认消息后,消息队列才会向消费者程序发送新的消息
//envelope.delivery_tag为所获取信息的标识
amqp_basic_ack(connection, channel, envelope.delivery_tag, 0);
3.场景二:把消息路由到消息队列
对于前面介绍的场景一(把消息发送到指定消息队列)而言,生产者程序需要知道消费者程序监听的消息队列名称。这样会产生一种强关联,一旦消息队列名称不固定(临时的消息队列),则会导致生产者程序不知道该把消息发送到哪个消息队列的问题发生。为此,RabbitMQ提供了消息路由的模式,生产者程序发送消息时指定交换机和路由键(无须关心具体消息队列名称),交换机收到消息后,根据路由键寻找与之匹配的消息队列并把消息发送到这些消息队列中。
如果要使用RabbitMQ的消息路由模式,首先需要创建一个直发模式的交换机(默认类型),然后把相关的消息队列与该交换机进行绑定(通过绑定键绑定)。生产者程序发送消息时,需要指定交换机和路由键。交换机接收到消息后,会根据路由键与绑定键进行匹配,匹配成功的消息队列会收到消息。把消息路由到消息队列的流程如图5.21所示。其中,多个消息队列可以用相同的绑定键绑定,如果路由键与多个绑定键匹配,则消息会发到多个消息队列当中。
如果没有匹配的消息队列,消息会被丢弃。另外,同一交换机和消息队列可以使用多个绑定键建立关系。
图5.21 把消息路由到消息队列的流程
生产者程序利用直发模式的交换机把消息路由到消息队列的代码如代码5.11所示,其中,消息队列与交换机的绑定一般交由消费者程序完成,这样能让消费者程序更灵活地使用消息队列。另外,消费者程序只需要正常监听消息队列即可。
代码5.11 生产者程序利用直发模式的交换机把消息路由到消息队列
const char* exchange = "交换机名称"; //交换机名称
const char* routingkey = "路由键名称"; //路由键名称
const char* bindingkey = "绑定键名称"; //绑定键名称
//创建直发模式的交换机,指定交换机类型为direct(直发)
amqp_exchange_declare(connection, channel, amqp_cstring_bytes(exchange),
amqp_cstring_bytes("direct"), 0, 0, 0, 0, amqp_empty
_table);
//绑定交换机与消息队列,绑定键需要被指定,同一交换机和消息队列可以使用多个绑定键建立
关系
const char* queueNameStr = "消息队列名称";
amqp_queue_bind(connection, channel, amqp_cstring_bytes(queueNameStr),
amqp_cstring_bytes(exchange), amqp_cstring_bytes(bindingkey),
amqp_empty_table);
//设置消息的相关信息
amqp_basic_properties_t props;
props._flags = AMQP_BASIC_CONTENT_TYPE_FLAG |
AMQP_BASIC_DELIVERY_MODE_FLAG; //与下面的配置对应props.content_type = amqp_cstring_bytes("text/plain"); //消息主体的类型
props.delivery_mode = 2; //持久化消息
//发送消息,路由键需要被指定,即routingkey需要与某个bindingkey相同
const char* messagebody = "发送的消息";
amqp_basic_publish(connection, channel, amqp_cstring_bytes(exchange),amqp_cstring_bytes(routingkey), 0, 0, &props,
amqp_cstring_bytes(messagebody);
4.场景三:把消息按主题分发到消息队列
对于前面介绍的场景二(把消息路由到消息队列)而言,有一个限制,即消息队列的绑定键需要与消息的路由键精准匹配后,消息队列才能接收到消息。那么,是否可以只要相似的绑定键与路由键匹配后(模糊匹配),对应的消息队列就能收到消息呢?这样可以大大提升消息发送的灵活度。为此,RabbitMQ提供了主题分发的模式,生产者程序发送消息时指定交换机和路由键,交换机收到消息后,会根据路由键寻找与之模糊匹配的消息队列并把消息发送到这些消息队列中。
如果要使用RabbitMQ的主题分发模式,首先需要创建一个主题模式的交换机,然后把相关的消息队列与该交换机进行绑定(通过绑定键绑定,绑定键可以使用通配符)。生产者程序发送消息时,需要指定交换机和路由键。交换机接收到消息后,会根据路由键与含有通配符的绑定键进行模糊匹配,匹配成功的消息队列会收到消息。把消息路由按主题分发到消息队列的流程如图5.22所示。其中,同一交换机和消息队列可以使用多个绑定键建立关系,如果匹配多个绑定键,则该消息队列也只会收到一条信息。
说明:主题分发模式的绑定键和路由键,都是以“.”分割字符的,如“test.mission”、“zoo.tiger.lily”等。另外,绑定键中可以使用两种通配符,即“*”和“#”,“*”代表任意一个单词,“#”代表0个或多个单词,单词是以“.”分割的。例如,A消息队列的绑定键是“zoo.#”,B消息队列的绑定键是“*.lily”,路由键是“zoo.tiger.lily”,则A消息队列收到消息,而B消息队列收不到消息,因为B消息队列的绑定键使用的是“*”,只能代表一个单词,而路由键“zoo.tiger.lily”中有三个单词(单词以“.”分割)。
图5.22 把消息按主题分发到消息队列的流程
生产者程序利用主题模式的交换机把消息按主题分发到消息队列的代码如代码5.12所示,其中,消息队列与交换机的绑定一般交由消费者程序完成,这样能让消费者程序更灵活地使用消息队列。另外,消费者程序只需要正常监听消息队列即可。
代码5.12 生产者程序利用主题模式的交换机把消息按主题分发到消息队列
const char* exchange = "交换机名称"; //交换机名称
const char* routingkey = "路由键名称"; //路由键名称,如“zoo.tiger.lily”
const char* bindingkey = "绑定键名称"; //绑定键名称,如“zoo.#”
//创建主题模式的交换机,指定交换机类型为topic(主题)
amqp_exchange_declare(connection, channel, amqp_cstring_bytes(exchange),
amqp_cstring_bytes("topic"), 0, 0, 0, 0, amqp_empty_
table);
//绑定交换机与消息队列,绑定键需要被指定,同一交换机和消息队列可以使用多个绑定键建立
关系
const char* queueNameStr = "消息队列名称";
amqp_queue_bind(connection, channel, amqp_cstring_bytes(queueNameStr),
amqp_cstring_bytes(exchange), amqp_cstring_bytes(bindingkey),
amqp_empty_table);
//设置消息的相关信息
amqp_basic_properties_t props;
props._flags = AMQP_BASIC_CONTENT_TYPE_FLAG |
AMQP_BASIC_DELIVERY_MODE_FLAG; //与下面的配置对应props.content_type = amqp_cstring_bytes("text/plain"); //消息主体的类型
props.delivery_mode = 2; //持久化消息
//发送消息,路由键需要被指定
const char* messagebody = "发送的消息";
amqp_basic_publish(connection, channel, amqp_cstring_bytes(exchange),amqp_cstring_bytes(routingkey), 0, 0, &props,
amqp_cstring_bytes(messagebody);
5.场景四:把消息广播到多个消息队列
有些时候,生产者程序发送的消息需要发送到多个消息队列中,且希望发送方式是直接的,不需要考虑匹配关系。RabbitMQ为这种场景提供了一种便捷的广播模式,生产者程序发送消息时只需要指定交换机,无须指定路由键,交换机收到消息后,会自动把消息发送到全部与该交换机有绑定关系的消息队列中。
如果要使用RabbitMQ的广播模式,首先需要创建一个广播模式的交换机,然后把相关消息队列与该交换机进行绑定(与广播模式的交换机绑定时不需要指定绑定键)。生产者程序发送消息时,只需要指定交换机即可,路由键无须指定。广播模式的交换机在接收到消息后,会自动把消息发送到与该交换机绑定的全部消息队列上。把消息广播到多个消息队列的工作流程如图5.23所示。
图5.23 把消息广播到多个消息队列的流程
生产者程序利用广播模式的交换机把消息广播到多个消息队列的代码如代码5.13所示,而消费者程序只需要正常监听消息队列即可。另外,交换机与消息队列的绑定可以交由各自的消费者程序完成,这样生产者程序就可以无须关心具体广播到哪些消息队列。
代码5.13 生产者程序利用广播模式的交换机把消息广播到多个消息队列
const char* exchange = "交换机名称"; //交换机名称
const char* routingkey = ""; //路由键不需要设置(名称设置为空字符)
const char* queueNameStr = "消息队列名称"; //消息队列名称const char* bindingkey = ""; //绑定键不需要设置(名称设置为空字符)
const char* messagebody = "发送的消息";
//创建广播模式的交换机,指定交换机类型为fanout(广播)
amqp_exchange_declare(connection, channel, amqp_cstring_bytes(exchange),
amqp_cstring_bytes("fanout"), 0, 0, 0, 0, amqp_empty
_table);
//绑定交换机与消息队列,绑定键无须指定,即bindingkey为空字符
amqp_queue_bind(connection, channel, amqp_cstring_bytes(queueNameStr),
amqp_cstring_bytes(exchange), amqp_cstring_bytes
(bindingkey),
amqp_empty_table);
//设置消息的相关信息
amqp_basic_properties_t props;
props._flags = AMQP_BASIC_CONTENT_TYPE_FLAG |
AMQP_BASIC_DELIVERY_MODE_FLAG; //与下面的配置对应props.content_type = amqp_cstring_bytes("text/plain"); //消息主体的类型
props.delivery_mode = 2; //持久化消息
//发送消息,路由键无须指定,即routingkey为空字符
amqp_basic_publish(connection, channel, amqp_cstring_bytes(exchange),
amqp_cstring_bytes(routingkey), 0, 0, &props,
amqp_cstring_bytes(messagebody);
6.场景五:消息回调
有些时候,消息发送的程序需要得到消息处理的结果后才能继续执行任务,但消息队列本身是一种异步通信,RabbitMQ也没有提供同步调用的方式(等待消息处理结果),所以需要转变思路,通过异步的方式达到同步通信的效果。
为了应对这种需要消息回调的场景,需要借助额外的临时消息队列。生产者程序在发送消息前,先建立一个临时的消息队列,在消息发送时,把这个临时的消息队列名称也一同发送给指定的消费者程序。发送消息之后,生产者程序需要监听这个临时的消息队列以等待消息处理的结果。消费者程序在处理完消息后,把结果发送到这个临时的消息队列即可。利用临时消息队列等待消息回调的流程如图5.24所示。
注意:虽然这种方式能间接地达到同步调用的效果,但这种方式一般是不被提倡的。因为如果生产者程序在监听临时消息队列时异常退出的话,则可能会造成数据处理不完整,残留中间结果的情况。另外,如果消费者程序需要较长时间处理消息的话,那么生产者程序会进入长时间等待,而长时间等待在一些软件中是不被允许的,例如后端应用程序如果长时间等待,会产生接口超时等错误。
图5.24 利用临时消息队列等待消息回调的流程
消息回调的代码如代码5.14所示,其中,临时消息队列的创建可以不指定消息名称(RabbitMQ会自动生成一个唯一标识),消费者程序发送消息结果时只需要通过默认交换机发送到指定临时消息队列即可。另外,临时消息队列可以设置为自动销毁,生产者程序断开连接后,会自动销毁该消息队列。
代码5.14 消息回调的代码
//生产者程序相关
//创建临时消息队列,无须指定消息队列名称,RabbitMQ会自动生成
amqp_boolean_t durable = 0; //设置消息队列不持久化
amqp_boolean_t autodelete = 1; //设置消息队列断开连接后自动删除
amqp_queue_declare_ok_t *result = amqp_queue_declare(connection, channel,
amqp_empty_bytes, 0, durable, 0,
autodelete,
amqp_empty_table);//自动生成的消息队列名称
amqp_bytes_t queuename = amqp_bytes_malloc_dup(result->queue);
//设置消息的相关信息
amqp_basic_properties_t props;
props._flags = AMQP_BASIC_CONTENT_TYPE_FLAG |
AMQP_BASIC_DELIVERY_MODE_FLAG|
AMQP_BASIC_REPLY_TO_FLAG; //新增一个标识
//消息主体的类型
props.content_type = amqp_cstring_bytes("text/plain");
props.delivery_mode = 2; //持久化消息
props.reply_to = queuename; //记录临时消息队列名称
//发送消息
const char* exchange = "交换机名称";
const char* bindingkey = "绑定键名称";
const char* messagebody = "发送的消息";
amqp_basic_publish(connection, channel, amqp_cstring_bytes(exchange),
amqp_cstring_bytes(routingkey), 0, 0, &props,
amqp_cstring_bytes(messagebody);
//监听临时消息队列,获取消息结果
amqp_envelope_t envelope; //定义接收消息的变量
amqp_boolean_t noack = 1; //自动确认消息
amqp_basic_consume(connection, channel, queuename, amqp_empty_bytes,
0, noack,
0,amqp_empty_table); //订阅消息队列
amqp_maybe_release_buffers(connection); //清理buffersamqp_consume_message(connection, &envelope, NULL, 0); //获取消息
envelope.message.body.bytes; //消息主体的开始指针(char *)envelope.message.body.len; //消息主体的长度
//消费者程序相关
//获取消息
amqp_envelope_t envelope; //定义接收消息的变量
amqp_maybe_release_buffers(connection); //清理buffersamqp_consume_message(connection, &envelope, NULL, 0); //获取消息
//记录回调消息队列名称
amqp_bytes_t queuename; //回调消息队列名称的变量
//判断并记录回调消息队列名称
if (envelope.message.properties._flags & AMQP_BASIC_REPLY_TO_FLAG) {
queuename = amqp_bytes_malloc_dup(envelope.message.properties.
reply_to);
}
//设置消息的相关信息
amqp_basic_properties_t props;
props._flags = AMQP_BASIC_CONTENT_TYPE_FLAG |
AMQP_BASIC_DELIVERY_MODE_FLAG; //与下面的配置对应//消息主体的类型props.content_type = amqp_cstring_bytes("text/plain");
props.delivery_mode = 2; //持久化消息
//发送消息结果,通过默认交换机发送到指定临时消息队列
const char* exchange = ""; //默认交换机名称(名称为空字符)
amqp_bytes_t routingkey = queuename; //路由键设置为临时消息队列的名称
const char* messagebody = "发送的结果消息";
amqp_basic_publish(connection, channel, amqp_cstring_bytes(exchange),
amqp_cstring_bytes(routingkey), 0, 0, &props,
amqp_cstring_bytes(messagebody);
本文给大家讲解的内容是云计算服务架构任务池与指令池的搭建和使用,RabbitMQ消息队列
- 下篇文章给大家讲解的内容是大型网站架构的技术细节:云计算服务架构任务池的搭建和使用
- 感谢大家的支持