【RabbitMQ七】——RabbitMQ发布确认模式(Publisher Confirms)

简介: 【RabbitMQ七】——RabbitMQ发布确认模式(Publisher Confirms)

RabbitMQ发布确认模式

前言

发布确认是解决消息不丢失的重要环节,在设置队列持久化、消息持久化的基础上,设置发布确认,一旦生产者投递消息之后,如果Broker接收到消息,会给生产者一个应答。
生产者进行接收应答,用来确认这条消息是否正常发送到Broker。生产者也可以根据收没有收到这条消息的应答进行相应的处理。

如何实现发布确认

发布者确认在通道级别使用confirmSelect方法启用

Channel channel = connection.createChannel();
channel.confirmSelect();

发布确认模式有三种策略

  1. 单独发布消息,同步等待确认:简单,但吞吐量非常有限。
  2. 批量发布消息,等待批量的同步确认:简单、合理的吞吐量,但是很难判断出什么时候出了问题。
  3. 异步处理:最佳的性能和资源利用,良好的控制情况下的错误,但涉及到正确的实现,相对复杂

单独发布消息

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.MessageProperties;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;
/**
 * @author : [WangWei]
 * @version : [v1.0]
 * @className : Producer
 * @description : [生产者]
 * @createTime : [2023/2/1 9:38]
 * @updateUser : [WangWei]
 * @updateTime : [2023/2/1 9:38]
 * @updateRemark : [描述说明本次修改内容]
 */
public class Producer {
    private static final String QUEUE_NAME = "Confirm";
    public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
        //建立连接
        RabbitMQUtils.getConnection();
        //声明通道
        Channel channel = RabbitMQUtils.getChannel();
        //开启确认模式
        channel.confirmSelect();
        channel.queueDeclare(QUEUE_NAME,true,false,false,null);
        long start = System.currentTimeMillis();
        //循环发送2条消息
        for (int i = 0; i <200 ; i++) {
            String msg="消息确认模式消息:"+i;
            /*推送消息
             *交换机命名,不填写使用默认的交换机
             * routingKey -路由键-
             * props:消息的其他属性-路由头等正文
             * msg消息正文
             */
            channel.basicPublish("",QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN,msg.getBytes(StandardCharsets.UTF_8));
            // uses a 5 second timeout
            //如果超时过期,则抛出TimeoutException。如果任何消息被nack(丢失), waitForConfirmsOrDie将抛出IOException。
            channel.waitForConfirmsOrDie(5_000);
        }
        long end = System.currentTimeMillis();
        System.out.println("发送200条消息使用时间:"+(end-start));
    }
}

执行结果


waitForConfirmsOrDie(long)方法等待其确认。该方法在确认消息后立即返回。如果消息在超时内没有得到确认,或者消息被nack-ed(意味着代理由于某种原因无法处理它),该方法将抛出异常。异常的处理通常包括记录错误消息和/或重新尝试发送该消息。


这种技术非常简单,但也有一个主要缺点:它大大减慢了发布速度,因为对消息的确认会阻塞所有后续消息的发布。这种方法交付的吞吐量不会超过每秒发布的几百条消息。

批量发布消息

发布一批消息并等待整个批消息被确认。以100个为例

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.MessageProperties;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;
/**
 * @author : [WangWei]
 * @version : [v1.0]
 * @className : Producer
 * @description : [生产者]
 * @createTime : [2023/2/1 9:38]
 * @updateUser : [WangWei]
 * @updateTime : [2023/2/1 9:38]
 * @updateRemark : [描述说明本次修改内容]
 */
public class Producer2 {
    private static final String QUEUE_NAME = "Confirm";
    public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
        //用于记录每发布100条消息进行一次确认
        int batchSize = 100;
        int outstandingMessageCount = 0;
        //建立连接
        RabbitMQUtils.getConnection();
        //声明通道
        Channel channel = RabbitMQUtils.getChannel();
        //开启确认模式
        channel.confirmSelect();
        //声明持久化队列
        channel.queueDeclare(QUEUE_NAME,true,false,false,null);
        long start = System.currentTimeMillis();
        //循环发送200条消息
        for (int i = 0; i <200 ; i++) {
            String msg="消息确认模式消息:"+i;
            /*推送消息
             *交换机命名,不填写使用默认的交换机
             * routingKey -路由键-
             * props:消息的其他属性-路由头等正文
             * msg消息正文
             */
            channel.basicPublish("",QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN,msg.getBytes(StandardCharsets.UTF_8));
            outstandingMessageCount++;
            //每100次确认一次
            if (outstandingMessageCount == batchSize) {
                // uses a 5 second timeout
                //如果超时过期,则抛出TimeoutException。如果任何消息被nack(丢失), waitForConfirmsOrDie将抛出IOException。
                channel.waitForConfirmsOrDie(5_000);
                outstandingMessageCount = 0;
            }
        }
        if (outstandingMessageCount > 0) {
            channel.waitForConfirmsOrDie(5_000);
        }
        long end = System.currentTimeMillis();
        System.out.println("发送200条消息使用时间:"+(end-start));
    }
}

执行结果


与等待单个消息的确认相比,等待一批消息被确认大大提高了吞吐量(对于远程RabbitMQ节点,最多可提高20-30倍)。一个缺点是,在失败的情况下,我们无法确切地知道哪里出了问题,因此我们可能不得不在内存中保留整个批处理,以记录有意义的内容或重新发布消息。而且这种解决方案仍然是同步的,因此它会阻止消息的发布。

异步处理发布确认

异步处理发行商确认通常需要以下步骤:

  1. 提供一种将发布序列号与消息关联起来的方法。
  2. 在通道上注册一个确认侦听器,以便在发布者acks/nacks到达时得到通知,以执行适当的操作,如记录日志或重新发布nack-ed消息。在此步骤中,序列号到消息的关联机制也可能需要进行一些清理。
  3. 在发布消息之前跟踪发布序列号。
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.ConfirmCallback;
import com.rabbitmq.client.MessageProperties;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.concurrent.ConcurrentNavigableMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.TimeoutException;
/**
 * @author : [WangWei]
 * @version : [v1.0]
 * @className : Producer
 * @description : [生产者]
 * @createTime : [2023/2/1 9:38]
 * @updateUser : [WangWei]
 * @updateTime : [2023/2/1 9:38]
 * @updateRemark : [描述说明本次修改内容]
 */
public class Producer3 {
    private static final String QUEUE_NAME = "Confirm";
    public static void main(String[] args) throws IOException, TimeoutException {
        //建立连接
        RabbitMQUtils.getConnection();
        //声明通道
        Channel channel = RabbitMQUtils.getChannel();
        //开启确认模式
        channel.confirmSelect();
        //用于保存序列号与消息之前映射的容器
        ConcurrentNavigableMap<Long, String> outstandingConfirms = new ConcurrentSkipListMap<>();
        //清理映射关系的回调
        //multiple 一个布尔值,如果为false则说明只有一条消息被确认或者丢失,如果为true这表示有小于或等于sequenceNumber的消息被确认或者丢失。
        ConfirmCallback cleanOutstandingConfirms = (sequenceNumber, multiple) -> {
            if (multiple) {
                //如果包含值为true则返回该映射的部分视图,其键值小于或等于,sequenceNumber
                ConcurrentNavigableMap<Long, String> confirmed = outstandingConfirms.headMap(
                        sequenceNumber, true
                );
                //移除所有key和value
                confirmed.clear();
            } else {
                //移除序列号对象的消息
                outstandingConfirms.remove(sequenceNumber);
            }
        };
        //两个回调一个用于确认消息,另一个用于nack-ed消息(可以被代理视为丢失的消息)。
        //sequenceNumber用于标识确认的消息或者丢失的消息
        //
//        channel.addConfirmListener((sequenceNumber,multiple)->{
//            String body = outstandingConfirms.get(sequenceNumber);
//            System.out.println("ack message:"+body);
//        },(sequenceNumber,multiple)->{
//            //当消息被丢失时。。。。
//            String body = outstandingConfirms.get(sequenceNumber);
//            System.out.println("no ack message:"+body);
//        });
        //当消息缺失的回调
        channel.addConfirmListener(cleanOutstandingConfirms, (sequenceNumber, multiple) -> {
            String body = outstandingConfirms.get(sequenceNumber);
            System.out.println("没有确认的消息:"+body);
            //重新使用回调来清理映射
            cleanOutstandingConfirms.handle(sequenceNumber, multiple);
        });
        //声明持久化队列
        channel.queueDeclare(QUEUE_NAME,true,false,false,null);
        long start = System.nanoTime();
        //循环发送2条消息
        for (int i = 0; i <200 ; i++) {
            String msg="消息确认模式消息:"+i;
            //使用map将发布序列号与消息的字符串体关联起来
            //channel.getNextPublishSeqNo()获取下一个消息的序列号
            outstandingConfirms.put(channel.getNextPublishSeqNo(), msg);
            /*推送消息
             *交换机命名,不填写使用默认的交换机
             * routingKey -路由键-
             * props:消息的其他属性-路由头等正文
             * msg消息正文
             */
            channel.basicPublish("",QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN,msg.getBytes(StandardCharsets.UTF_8));
                System.out.println(msg+"发布成功");
        }
        long end = System.nanoTime();
        System.out.println("发送200条消息使用时间:"+ Duration.ofNanos(end - start).toMillis());
    }
}

执行结果



思考点

如何追踪未完成的确认?

示例使用ConcurrentNavigableMap来跟踪未完成的确认。这种数据结构的方便有以下几个原因。

它允许轻松地将序列号与消息(无论消息数据是什么)关联起来,并轻松地将条目清理到给定的序列id(以处理多个确认/nack)。

最后,它支持并发访问,因为确认回调是在客户端库拥有的线程中调用的,应该与发布线程保持不同。

除了使用复杂的映射实现之外,还有其他方法可以跟踪未完成的确认,比如使用简单的并发散列映射和变量来跟踪发布序列的下界,但它们通常更复杂,

重新发布丢失的消息

从相应的回调中重新发布nack-ed消息可能很诱人,但应该避免这种情况,因为确认回调是在I/O线程中分派的,其中通道不应该执行操作。更好的解决方案是将消息放入由发布线程轮询的内存队列中。像ConcurrentLinkedQueue这样的类可以很好地在确认回调和发布线程之间传输消息。

总结

在某些应用程序中,确保发布的消息到达代理非常重要。发布者确认RabbitMQ的特性可以帮助满足这一要求。发布者确认本质上是异步的,但也可以同步地处理它们。没有确定的方法来实现发布者确认,这通常归结于应用程序和整个系统中的约束。典型的技术有:

单独发布消息,同步等待确认:简单,但吞吐量非常有限。

批量发布消息,等待批量的同步确认:简单、合理的吞吐量,但是很难判断出什么时候出了问题。

异步处理:最佳的性能和资源利用,良好的控制情况下的错误,但是实现较为复杂

收获

1.从官网学习rabbitmq的使用,严格落实先宏观,后微观,结合思维导图和三遍读书法.

2.学习的过程中,锤炼英文阅读,动手查字典和利用工具.

3. 理论和实践结合,理论学习完成之后,及时进行实践包括先代码实现,阶段总结等等.

相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
目录
相关文章
|
4月前
|
消息中间件
RabbitMQ的 RPC 消息模式你会了吗?
【9月更文挑战第11天】RabbitMQ 的 RPC(远程过程调用)消息模式允许客户端向服务器发送请求并接收响应。其基本原理包括:1) 客户端发送请求,创建回调队列并设置关联标识符;2) 服务器接收请求并发送响应至回调队列;3) 客户端根据关联标识符接收并匹配响应。实现步骤涵盖客户端和服务器的连接、信道创建及请求处理。注意事项包括关联标识符唯一性、回调队列管理、错误处理及性能考虑。RPC 模式适用于构建可靠的分布式应用程序,但需根据需求调整优化。
|
25天前
|
消息中间件 网络协议 RocketMQ
RocketMQ Controller 模式 始终更新成本机ip
ontrollerAddr=192.168.24.241:8878 但是日志输出Update controller leader address to 127.0.0.1:8878。导致访问失败
50 3
|
5月前
|
消息中间件 开发者
【RabbitMQ深度解析】Topic交换器与模式匹配:掌握消息路由的艺术!
【8月更文挑战第24天】在消息队列(MQ)体系中,交换器作为核心组件之一负责消息路由。特别是`topic`类型的交换器,它通过模式匹配实现消息的精准分发,适用于发布-订阅模式。不同于直接交换器和扇形交换器,`topic`交换器支持更复杂的路由策略,通过带有通配符(如 * 和 #)的模式字符串来定义队列与交换器间的绑定关系。
96 2
|
5月前
|
消息中间件
RabbitMQ广播模式
RabbitMQ广播模式
93 1
|
5月前
|
消息中间件 应用服务中间件 网络安全
rabbitMQ镜像模式搭建
rabbitMQ镜像模式搭建
|
6月前
|
消息中间件 传感器 负载均衡
消息队列 MQ使用问题之如何配置一主一从的同步复制模式
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
消息队列 MQ使用问题之如何配置一主一从的同步复制模式
|
6月前
|
消息中间件 存储 Kafka
MetaQ/RocketMQ 原理问题之RocketMQ DLedger融合模式的问题如何解决
MetaQ/RocketMQ 原理问题之RocketMQ DLedger融合模式的问题如何解决
|
5月前
|
消息中间件 Java Maven
RabbitMQ通配符模式
RabbitMQ通配符模式
86 0
|
6月前
|
消息中间件 Java Apache
消息队列 MQ使用问题之如何在内外网环境下使用单组节点单副本模式
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
|
6月前
|
消息中间件 负载均衡 RocketMQ
MetaQ/RocketMQ 原理问题之在广播模式下,RebalanceService工作的问题如何解决
MetaQ/RocketMQ 原理问题之在广播模式下,RebalanceService工作的问题如何解决