一、消息的可靠性投递
1、消息的可靠性投递
RabbitMq为我们提供了两种方式来控制消息投递的可靠性模式。
- confirm 确认模式,消息从生产者到交换机会返回一个confirmCallback
- return 回退模式,交换机到队列投递失败则会返回一个returnCallback
利用这两种模式可以控制消息投递的可靠性
2、Consumer Ack
消费者有三种确认模式
- 自动确认:acknowledge="none",当消息一旦被消费者收到后,会自动确认收到,并将message从消息缓存中移除。但是实际业务中可能业务处理出现 异常,那么消息就会丢失。
- 手动确认:acknowledge="manual",业务处理成功后,调用channel.basicAck(),进行手动确认,如果出现异常,则调用channel.basicNack()方法,让其自动重新发送消息。
- 根据异常情况确认:acknowledge="auto",根据业务系统出现的异常类型,做出不同的处理情况,比较麻烦
3、小结
- 持久化
- exchange持久化
- queue持久化
- message持久化
- 生产者确认confirm
- 消费者确认Ack
- Brocker高可用
二、消息限流
- 在中配置prefetch属性,设置消费者一次性拉取多少条消息
- 消费者的确认模式一定要改为手动确认。acknowledge="manual"
三、TTL
TTL全称Time To Live(存活时间/过期时间)
- 当消息到达存活时间后,还没有被消费,会被自动清除
- RabbitMQ可以对消息设置过期时间,也可以对这个队列设置过期时间
- 如果设置了消息的过期时间,也设置了队列的过期时间,那么会以时间短的为准
- 队列过期后,会将队列所有消息全部移除
- 消息过期后,只有消息在队列顶端的时候(即将被消费的时候),才会判断其是否过期(过期移除)
四、死信队列(DLX)
Dead Letter Exchange(死信交换机)。消息成为Dead Message后,会被发送到DLX。
生产者发送到A交换机后,交换机发送消息到A队列,假设此时交换机中存在设置过期时间的消息,当消息过期后,消息不会直接丢弃,消息会发送到DLX,此时消息再由DLX发送到B队列,供其他消费者进行消费。
1消息成为死信的几种情况:
- 队列消息长度到达限制
- 消费者拒绝消费消息,basicNack/basicReject,不把消息重新放入原目标队列,requeue=false
- 原队列消息存在过期设置,消息到达超时时间未被消费
2如何给队列绑定死信交换机呢?
需要给队列设置参数:x-dead-letter-exchange和x-dead-letter-routing-key
五、延迟队列
延迟队列:消息进入队列后不会被立即消费,只有到达指定时间后,才会被消费。
RabbitMQ并未提供延迟队列,但是可以使用TTL+死信队列的方式实现该效果。
六、日志与监控
1、日志
- 日志默认存放在/var/log/rabbitmq/rabbit@xxx.log
- 日志包含了RabbitMQ的版本号、服务节点名称、cookie的哈希值、配置文件地址、内存/磁盘限制、默认账户guest的创建以及权限配置等
2、监控
- 可以通过mq的后台管理页面查看,http://localhost:15672
- 可以通过命令的方式查看
命令 |
注释 |
rabbitmqctl environment |
查看环境变量 |
rabbitmqctl list_users |
查看用户 |
rabbitmqctl list_connections |
查看连接 |
rabbitmqctl list_queues |
查看队列 |
rabbitmqctl list_exchanges |
查看交换机 |
rabbitmqctl list_queues name messages_unacknowledged |
查看未被确认的队列 |
rabbitmqctl list_consumers |
查看消费者信息 |
rabbitmqctl list_queues name memory |
查看单个队列的内存使用 |
rabbitmqctl list_queues name messages_ready |
查看准备就绪的队列 |
七、消息追踪
在使用任何消息中间件的过程中,难免会出现消息丢失的情况,对于RabbitMQ而言
- 可能是因为生产者或者消费者与RabbitMQ断开了连接,而它们与RabbitMQ又采用了不同的确认机制
- 可能是因为交换机与队列之间不同的转发策略
- 可能是交换机并没有与任何队列进行绑定,生产者又不感知或者没有采取相应地措施
- MQ本身的集群策略也可能导致消息丢失
这个时候就需要有一个较好的机制记录消息的投递、消费过程。
在RabbitMQ中可以使用Firehose和rabbitmq_tracing插件功能来实现消息追踪。要注意的是生产环境开启功能会导致降低mq的性能。
1、Firehose
Firehose的机制是将生产者投递给rabbitmq的消息/rabbitmq投递给消费者的消息按照指定的格式发送到默认的交换机上。这个默认的交换机是amq.rabbitmq.trace,它是一个topic类型的交换机。发送到这个交换机上的消息的路由key为publish.exchangesname和deliver.queuename。其中exchangesname和queuename为实际交换机和队列的名称,费别对应生产者投递到交换机的消息和消费者从队列上获取的消息。
- rabbitmqctl trace_on:开启Firehose命令
- rabbitmqctl trace_off:关闭Firehose命令
2、rabbitmq_tracing
就实现而言和Firehose如出一辙,只不过rabbitmq_tracing的方式多了 一层GUI的包装,更容易使用和管理。
- rabbitmq-plugins enable rabbitmq_tracing:开启插件