RabbitMQ 可靠投递

简介: RabbitMQ 可靠投递 标签: RabbitMQ shovel-plugin ConfirmCallback RabbitMQ消息投递 背景 confirmCallback 确认模式 returnCallback 未投递到 queue 退回模式 shovel-plugin 跨机房可靠投递 背景 在使用 RabbitMQ 的时候,作为消息发送方希望杜绝任何消息丢失或者投递失败场景。
  • 背景
  • confirmCallback 确认模式
  • returnCallback 未投递到 queue 退回模式
  • shovel-plugin 跨机房可靠投递

背景

在使用 RabbitMQ 的时候,作为消息发送方希望杜绝任何消息丢失或者投递失败场景。__RabbitMQ__ 为我们提供了两个选项用来控制消息的投递可靠性模式。

rabbitmq 整个消息投递的路径为:
producer->rabbitmq broker cluster->exchange->queue->consumer

messageproducerrabbitmq broker cluster 则会返回一个 confirmCallback
messageexchange->queue 投递失败则会返回一个 returnCallback 。我们将利用这两个 callback 控制消息的最终一致性和部分纠错能力。

confirmCallback 确认模式

在创建 connectionFactory 的时候设置 PublisherConfirms(true) 选项,开启 confirmcallback

CachingConnectionFactory factory = new CachingConnectionFactory();
factory.setPublisherConfirms(true);//开启confirm模式
RabbitTemplate rabbitTemplate = new RabbitTemplate(factory);
rabbitTemplate.setConfirmCallback((data, ack, cause) -> {
        if (!ack) {
               log.error("消息发送失败!" + cause + data.toString());
        } else {
            log.info("消息发送成功,消息ID:" + (data != null ? data.getId() : null));
        }
    });

我们来看下 ConfirmCallback 接口。

public interface ConfirmCallback {

        /**
         * Confirmation callback.
         * @param correlationData correlation data for the callback.
         * @param ack true for ack, false for nack
         * @param cause An optional cause, for nack, when available, otherwise null.
         */
        void confirm(CorrelationData correlationData, boolean ack, String cause);

    }

重点是 CorrelationData 对象,每个发送的消息都需要配备一个 CorrelationData 相关数据对象,__CorrelationData__ 对象内部只有一个 id 属性,用来表示当前消息唯一性。

发送的时候创建一个 CorrelationData 对象。

User user = new User();
user.setID(1010101L);
user.setUserName("plen");

rabbitTemplate.convertAndSend(exchange, routing, user,
        message -> {
        message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.NON_PERSISTENT);
            return message;
        },
new CorrelationData(user.getID().toString()));

这里将 user ID 设置为当前消息 CorrelationData id 。当然这里是纯粹 demo__,真实场景是需要做业务无关消息 __ID 生成,同时要记录下这个 id 用来纠错和对账。

消息只要被 rabbitmq broker 接收到就会执行 confirmCallback__,如果是 __cluster 模式,需要所有 broker 接收到才会调用 __confirmCallback__。

broker 接收到只能表示 message 已经到达服务器,并不能保证消息一定会被投递到目标 queue 里。所以需要用到接下来的 returnCallback

returnCallback 未投递到queue退回模式

confrim 模式只能保证消息到达 broker__,不能保证消息准确投递到目标 __queue 里。在有些业务场景下,我们需要保证消息一定要投递到目标 queue 里,此时就需要用到 return 退回模式。

同样创建 ConnectionFactory 到时候需要设置 PublisherReturns(true) 选项。

CachingConnectionFactory factory = new CachingConnectionFactory();
factory.setPublisherReturns(true);//开启return模式
rabbitTemplate.setMandatory(true);//开启强制委托模式

rabbitTemplate.setReturnCallback((message, replyCode, replyText,
                    exchange, routingKey) ->
    log.info(MessageFormat.format("消息发送ReturnCallback:{0},{1},{2},{3},{4},{5}", message, replyCode, replyText, exchange, routingKey)));

这样如果未能投递到目标 queue 里将调用 returnCallback ,可以记录下详细到投递数据,定期的巡检或者自动纠错都需要这些数据。

shovel-plugin 跨机房可靠投递

RabbitMQ 在跨机房集成提供了一个不错的插件 shovel 。使用 shovel-plugin 插件非常方便,__shovel__ 可以接受机房之间的网络断开、机器下线等不稳定因素。

这里有两个 broker

10.211.55.3 rabbit_node1

10.211.55.4 rabbit_node2

我们希望将发送给 rabbit_node1 plen.queue 的消息传输到 rabbit_node2 plen.queue 中。我们先开启 rabbit_node1 的 __shovel-plugin__。

先看下当前 RabbitMQ 版本是否安装了 __shovel-plugin__,如果有的话直接开启。

rabbitmq-plugins  list
rabbitmq-plugins  enable rabbitmq_shovel
rabbitmq-plugins  enable rabbitmq_shovel_management

然后就可以在 Admin 面板里看到这个设置选项,怎么设置这里就不介绍了。主要就是配置下 amqp 协议地址,___amqp://user:password@server-name/my-vhost___ 。

如果配置没有问题的话,应该是这样的一个状态,说明已经顺利连接到 rabbit_node2 broker


我们来看下 rabbit_node1rabbit_node2Connections 面板。
rabbit_node1(10.211.55.3):

rabbit_node2(10.211.55.4):

RabbitMQ shovel-plugin 插件在 rabbit_node1 broker 创建了两个 tcp 连接,__端口 39544__ 连接是用来消费 plen.queue 里的消息,__端口 55706__ 连接是用来推送消息给 rabbit_node2

我们来看下 rabbit_node1 tcp 连接状态:

tcp6       0      0 10.211.55.3:5672        10.211.55.3:39544       ESTABLISHED
tcp        0      0 10.211.55.3:55706       10.211.55.4:5672        ESTABLISHED

rabbit_node2 tcp 连接状态:

tcp6       0      0 10.211.55.4:5672        10.211.55.3:55706       ESTABLISHED

为了验证 shovel-plugin 稳定性,我们将 rabbit_node2 下线。

然后再发送消息,发现消息会现在 rabbit_node1 plen.queue 里待着,一旦 shovel-plugin 连接恢复将消费 rabbit_node1 plen.queue 消息,然后投递给 rabbit_node2 plen.queue

作者:王清培 (沪江集团资深JAVA架构师)

相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
目录
相关文章
|
3月前
|
消息中间件 存储 RocketMQ
消息队列 MQ使用问题之进行超过3天的延迟消息投递,采用多次投递的策略是否有风险
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
|
5月前
|
消息中间件 调度 UED
百度搜索:蓝易云【利用RabbitMQ实现消息投递削峰填谷】
总之,通过利用RabbitMQ实现消息投递削峰填谷,我们可以提高系统的可伸缩性和稳定性,有效应对高并发场景。这种方案可以帮助提升系统的性能和用户体验,是一种可行且有效的解决方案。
47 0
|
消息中间件 存储 缓存
RabbitMQ之消息可靠性投递解读
RabbitMQ之消息可靠性投递解读
|
消息中间件
rabbitmq消息的可靠性投递
rabbitmq消息的可靠性投递
|
消息中间件 Java 数据库
RabbitMQ:第二章:Spring整合RabbitMQ(简单模式,广播模式,路由模式,通配符模式,消息可靠性投递,防止消息丢失,TTL,死信队列,延迟队列,消息积压,消息幂等性)
RabbitMQ:第二章:Spring整合RabbitMQ(简单模式,广播模式,路由模式,通配符模式,消息可靠性投递,防止消息丢失,TTL,死信队列,延迟队列,消息积压,消息幂等性)
364 0
RabbitMQ:第二章:Spring整合RabbitMQ(简单模式,广播模式,路由模式,通配符模式,消息可靠性投递,防止消息丢失,TTL,死信队列,延迟队列,消息积压,消息幂等性)
|
消息中间件 Java 关系型数据库
SpringBoot RabbitMQ实现消息可靠投递
SpringBoot RabbitMQ实现消息可靠投递
SpringBoot RabbitMQ实现消息可靠投递
|
消息中间件 网络架构
面试官:RabbitMQ怎么实现消费的可靠投递
本文讲解RabbitMQ如何实现消费的可靠投递。
112 0
面试官:RabbitMQ怎么实现消费的可靠投递
|
消息中间件 NoSQL Java
JAVA使用RabbitMQ解决生产端消息投递可靠性,消费端幂等性问题
我们发送一个消息没办法知道我们发的消息消费端是否接收到,假如消费端没有接收到那么我们需要触发补偿机制来重新发送一个消息,这个时候我们为了解决这个问题就需要将消息落库,每次将准备发送的消息存入到数据库中,并设置一个状态为待发送。 等消费端接收到消息并给我们反馈后,我们将数据库中的消息状态改为已完成。
246 0
JAVA使用RabbitMQ解决生产端消息投递可靠性,消费端幂等性问题
|
消息中间件 存储 SQL
RabbitMQ精讲4:深入RabbitMQ高级特性-可靠性投递、幂等性消费、Confirm确认消息、Return返回消息
RabbitMQ精讲4:深入RabbitMQ高级特性-可靠性投递、幂等性消费、Confirm确认消息、Return返回消息
639 0
RabbitMQ精讲4:深入RabbitMQ高级特性-可靠性投递、幂等性消费、Confirm确认消息、Return返回消息
|
消息中间件 NoSQL 算法
RabbitMQ 消息 100% 投递的解决方案!
现在大多都使用 MQ 来做系统的异构,来做系统的解耦,系统的的模块相当于寄信者与收信者,MQ 则扮演者邮局的角色。作为一个中转的角色,就需要确保消息的100%投递。 今天我们就来研究一下如何确保消息的100%的投递。
200 0
RabbitMQ 消息 100% 投递的解决方案!