在使用任何消息中间件的过程中,难免会出现某条消息异常丢失的情况。对于RabbitMQ而言,可能是因为生产者或消费者与RabbitMQ断开了连接,而它们与RabbitMQ又采用了不同的确认机制;也有可能是因为交换器与队列之间不同的转发策略;甚至是交换器并没有与任何队列进行绑定,生产者又不感知或者没有采取相应的措施;另外RabbitMQ本身的集群策略也可能导致消息的丢失。这个时候就需要有一个较好的机制跟踪记录消息的投递过程,以此协助开发和运维人员进行问题的定位。
在RabbitMQ中可以使用Firehose功能来实现消息追踪,Firehose可以记录每一次发送或者消费消息的记录,方便使用RabbitMQ的使用者进行调试、排错等。
Firehose的机制是将生产者投递给RabbitMQ的消息,或者是RabbitMQ投递给消费者的消息按照指定的格式发送到默认的交换器上。这个默认的交换器的名称为amq.rabbitmq.trace,它是一个topic类型的交换器。发送到这个交换器上的消息的routingKey为publish.exchangename和deliver.queuename。其中exchangename和queuename为实际的交换器和队列的名称,分别对应生产者投递到交换器的消息和消费者从队列中获取的消息。
开启Firehose命令:
rabbitmqctl trace_on [-p vhost]
其中[-p vhost]是可选参数,用来指定vhost。
对应的关闭命令为:
rabbitmqctl trace_off [-p vhost]
注意Firehose默认情况处于关闭状态,并且Firehose的状态也是非持久化的,会在RabbitMQ服务重启的时候还原成默认的状态。Firehose开启之后多少会影响服务的性能,因为它会引起额外的消息生成、路由和存储。
下面我们举例说明下Firehose的用法。需要做一下准备工作,确保Firehose处于开启状态,创建7个队列:queue、queue.another、queue1、queue2、queue3、queue4和queue5。之后再创建2个交换器exchange和exchange.another,分别通过绑定键rk和rk.another与queue和queue.another进行绑定。最后将amq.rabbitmq.trace这个关键的交换器与queue1、queue2、queue3、queue4和queue5绑定,详细可以参考下图。
分别用客户端向exchange和exchange.another中发送一条消息“trace test payload.”,然后再用客户端消费队列queue和queue.another中的消息。
此时queue1中有2条消息,queue2中有2条消息,queue3中有4条消息,而queue4和queue5中只有一条消息。在向exchange发送一条消息后,amq.rabbitmq.trace分别向queue1、queue3和queue4发送一条内部封装的消息。同样,在想exchange.another中发送一条消息之后,对应的队列queue1和queue3中会多一条消息。消费队列queue的时候,queue2、queue3和queue5中会多一条消息,消费队列queue.another的时候,queue2和queue3会多一条消息。“publish.#”匹配发送到所有交换器的消息,“deliver.#”匹配消费所有队列的消息,而“#”则包含了“publish.#”和“deliver.#”。
在Firehose开启状态下,当有客户端发送或者消费消息时,Firehose会自动封装相应的消息体,并添加详细的headers属性。对于前面的将“trace test payload.”这条消息发送到交换器exchange来说,Firehore会将其封装成如下的内容:
在消费queue时,会将这条消息封装成如下的内容:
headers中的exchange_name表示发送此条消息的交换器;routing_keys表示与exchange_name对应的路由键列表;properties表示消息本身的属性,比如delivery_mode=2表示消息需要持久化处理。