基本介绍
消息的可靠性投递就是要保证消息投递过程中每一个环节都要成功,那么这肯定会牺牲一些性能,性能与可靠性是无法兼得的;
如果业务实时一致性要求不是特别高的场景,可以牺牲一些可靠性来换取性能。
- ① 代表消息从生产者发送到Exchange;
- ② 代表消息从Exchange路由到Queue;
- ③ 代表消息在Queue中存储;
- ④ 代表消费者监听Queue并消费消息;
rabbitmq 整个消息投递的路径为:
producer—>rabbitmq broker—>exchange—>queue—>consumer
消息从 producer 到 exchange 则会返回一个 confirmCallback 。
消息从 exchange–>queue 投递失败则会返回一个 returnCallback 。
producer->exchange:确保消息发送到RabbitMQ服务器的交换机上
消息从 producer 到 exchange 则会返回一个 confirmCallback
Confirm模式
消息的confirm确认机制,是指生产者投递消息后,到达了消息服务器Broker里面的exchange交换机,则会给生产者一个应答,生产者接收到应答,用来确定这条消息是否正常的发送到Broker的exchange中,这也是消息可靠性投递的重要保障;
具体代码设置
- 配置文件application.yml 开启确认模式:spring.rabbitmq.publisher-confirm-type=correlated
- 写一个类实现implements RabbitTemplate.ConfirmCallback,判断成功和失败的ack结果,可以根据具体的结果,如果ack为false,对消息进行重新发送或记录日志等处理;设置rabbitTemplate的确认回调方法
- rabbitTemplate.setConfirmCallback(messageConfirmCallBack);
@Component public class MessageConfirmCallBack implements RabbitTemplate.ConfirmCallback { /** * 交换机收到消息后,会回调该方法 * * @param correlationData 相关联的数据 * @param ack 有两个取值,true和false,true表示成功:消息正确地到达交换机,反之false就是消息没有正确地到达交换机 * @param cause 消息没有正确地到达交换机的原因是什么 */ @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { System.out.println("correlationData = " + correlationData); System.out.println("ack = " + ack); System.out.println("cause = " + cause); if (ack) { //正常 } else { //不正常的,可能需要记日志或重新发送 } } }
发消息参考代码(需要对RabbitTemplate 进行初始化)
@Service public class MessageService { @Resource private RabbitTemplate rabbitTemplate; @Resource private MessageConfirmCallBack messageConfirmCallBack; @PostConstruct //bean在初始化的时候,会调用一次该方法,只调用一次,起到初始化的作用 public void init() { rabbitTemplate.setConfirmCallback(messageConfirmCallBack); } /** * 发送消息 */ public void sendMessage() { //关联数据对象 CorrelationData correlationData = new CorrelationData(); correlationData.setId("O159899323"); //比如设置一个订单ID,到时候在confirm回调里面,你就可以知道是哪个订单没有发送到交换机上去 rabbitTemplate.convertAndSend(RabbitConfig.EXCHANGE + 123, "info", "hello", correlationData); System.out.println("消息发送完毕......"); } }
Transaction(事务)模式
RabbitMQ支持事务(transaction),RabbitMQ中与事务机制有关的方法有三个:txSelect(), txCommit()以及txRollback()。
- (1)txSelect用于将当前channel设置成transaction模式,通过调用tx.select方法开启事务模式。
- (2)txCommit用于提交事务。当开启了事务模式后,只有当一个消息被所有的镜像队列保存完毕后,RabbitMQ才会调用tx.commit-ok返回给客户端。
- (3)txRollback用于回滚事务,在通过txSelect开启事务之后,我们便可以发布消息给broker代理服务器了,如果txCommit提交成功了,则消息一定到达了broker了,如果在txCommit执行之前broker异常崩溃或者由于其他原因抛出异常,这个时候我们便可以捕获异常通过txRollback回滚事务了。
channel.txSelect(); // 将当前channel设置成事务模式 /** * ConfirmConfig.exchangeName(交换机名称) * ConfirmConfig.routingKey(路由键) * message (消息内容) */ channel.basicPublish(ConfirmConfig.exchangeName, ConfirmConfig.routingKey, MessageProperties.PERSISTENT_TEXT_PLAIN, message)); channel.txCommit(); // 提交事务 channel.txRollback(); // 回滚事务
注: 事务确实能够解决producer与broker之间消息确认的问题,只有消息成功被broker接受,事务提交才能成功,否则我们便可以在捕获异常进行事务回滚操作同时进行消息重发。
事务机制的缺点 :
- 使用事务机制的话会降低RabbitMQ的性能。
- 会导致生产者和RabbitMq之间产生同步(等待确认),这也违背了我们使用RabbitMq的初衷,所以一般很少采用。
exchange -> queue:确保消息从交换机发到队列
可能因为路由关键字错误,或者队列不存在,或者队列名称错误导致②失败。使用return模式,可以实现消息无法路由的时候返回给生产者;当然在实际生产环境下,我们不会出现这种问题,我们都会进行严格测试才会上线(很少有这种问题);
消息从 exchange –> queue 投递失败则会返回一个 returnCallback
return模式
开启确认模式;使用rabbitTemplate.setConfirmCallback设置回调函数,当消息发送到exchange后回调confirm方法。在方法中判断ack,如果为true,则发送成功,如果为false,则发送失败,需要处理;
注意配置文件中,开启 退回模式;
spring.rabbitmq.publisher-returns: true |
使用rabbitTemplate.setReturnCallback设置退回函数,当消息从exchange路由到queue失败后,则会将消息退回给producer,并执行回调函数returnedMessage;
@Component public class MessageReturnCallBack implements RabbitTemplate.ReturnsCallback { /** * 当消息从交换机 没有正确地 到达队列,则会触发该方法 * 如果消息从交换机 正确地 到达队列了,那么就不会触发该方法 * * @param returned */ @Override public void returnedMessage(ReturnedMessage returned) { System.out.println("消息return模式:" + returned); } }
@Service public class MessageService { @Resource private RabbitTemplate rabbitTemplate; @Resource private MessageReturnCallBack messageReturnCallBack; @PostConstruct //bean在初始化的时候,会调用一次该方法,只调用一次,起到初始化的作用 public void init() { rabbitTemplate.setReturnsCallback(messageReturnCallBack); } /** * 发送消息 */ public void sendMessage() { rabbitTemplate.convertAndSend(RabbitConfig.EXCHANGE, "info123", "hello"); System.out.println("消息发送完毕......"); } }
备份交换机(alternate-exchange)
使用备份交换机(alternate-exchange),无法路由的消息会发送到这个备用交换机上。
备份 交换机可以理解为 RabbitMQ 中交换机的“备胎”,当我们为某一个交换机声明一个对应的备份交换机时, 就是为它创建一个备胎,当交换机接收到一条不可路由消息时,将会把这条消息转发到备份交换机中,由 备份交换机来进行转发和处理,通常备份交换机的类型为 Fanout ,这样就能把所有消息都投递到与其绑 定的队列中,然后我们在备份交换机下绑定一个队列,这样所有那些原交换机无法被路由的消息,就会都 进入这个队列了。到达这个队列以后消费者可以进行处理,通知开发人员进行查看。
当消息经过交换器准备路由给队列的时候,发现没有对应的队列可以投递信息,在rabbitmq中会默认丢弃消息,如果我们想要监测哪些消息被投递到没有对应的队列,我们可以用备用交换机来实现,可以接收备用交换机的消息,然后记录日志或发送报警信息。
设置参考代码
Map<String, Object> arguments = new HashMap<>(); //指定当前正常的交换机的备用交换机是谁 arguments.put("alternate-exchange", EXCHANGE_ALTERNATE); //DirectExchange(String name, boolean durable, boolean autoDelete, Map<String, Object> arguments) return new DirectExchange(EXCHANGE, true, false, arguments); //return ExchangeBuilder.directExchange(EXCHANGE).withArguments(args).build();
确保消息在队列正确地存储
可能因为系统宕机、重启、关闭等等情况导致存储在队列的消息丢失,即③出现问题;
队列持久化
QueueBuilder.durable(QUEUE).build();
交换机持久化
ExchangeBuilder.directExchange(EXCHANGE).durable(true).build();
消息持久化
MessageProperties messageProperties = new MessageProperties(); //设置消息持久化,当然它默认就是持久化,所以可以不用设置,可以查看源码 messageProperties.setDeliveryMode(MessageDeliveryMode.PERSISTENT);
集群,镜像队列,高可用
允许消费者和生产者在RabbitMQ节点崩溃的情况下继续运行
如果RabbitMQ集群中只有一个Broker节点,那么该节点的失效将导致整体服务的临时性不可用,并且也可能会导致消息的丢失。可以将所有消息都设置为持久化,并且对应队列的durable属性也设置为true,但是这样仍然无法避免由于缓存导致的问题:因为消息在发送之后和被写入磁盘井执行刷盘动作之间存在一个短暂却会产生问题的时间窗。通过publisherconfirm机制能够确保客户端知道哪些消息己经存入磁盘,尽管如此,一般不希望遇到因单点故障导致的服务不可用。
引入镜像队列(Mirror Queue)的机制,可以将队列镜像到集群中的其他Broker节点之上,如果集群中的一个节点失效了,队列能自动地切换到镜像中的另一个节点上以保证服务的可用性。
确保消息从队列正确地投递到消费者
采用消息消费时的手动ack确认机制来保证;如果消费者收到消息后未来得及处理即发生异常,或者处理过程中发生异常,会导致④失败。为了保证消息从队列可靠地达到消费者,RabbitMQ提供了消息确认机制(message acknowledgement);
#开启手动ack消息消费确认 spring.rabbitmq.listener.simple.acknowledge-mode=manual
消费者在订阅队列时,通过上面的配置,不自动确认,采用手动确认,RabbitMQ会等待消费者显式地回复确认信号后才从队列中删除消息;
如果消息消费失败,也可以调用basicReject()或者basicNack()来拒绝当前消息而不是确认。如果requeue参数设置为true,可以把这条消息重新存入队列,以便发给下一个消费者(当然,只有一个消费者的时候,这种方式可能会出现无限循环重复消费的情况,可以投递到新的队列中,或者只打印异常日志);