RabbitMQ发布确认模式
前言
发布确认是解决消息不丢失的重要环节,在设置队列持久化、消息持久化的基础上,设置发布确认,一旦生产者投递消息之后,如果Broker接收到消息,会给生产者一个应答。
生产者进行接收应答,用来确认这条消息是否正常发送到Broker。生产者也可以根据收没有收到这条消息的应答进行相应的处理。
如何实现发布确认
发布者确认在通道级别使用confirmSelect方法启用
Channel channel = connection.createChannel(); channel.confirmSelect();
发布确认模式有三种策略
- 单独发布消息,同步等待确认:简单,但吞吐量非常有限。
- 批量发布消息,等待批量的同步确认:简单、合理的吞吐量,但是很难判断出什么时候出了问题。
- 异步处理:最佳的性能和资源利用,良好的控制情况下的错误,但涉及到正确的实现,相对复杂
单独发布消息
import com.rabbitmq.client.Channel; import com.rabbitmq.client.MessageProperties; import java.io.IOException; import java.nio.charset.StandardCharsets; import java.util.concurrent.TimeoutException; /** * @author : [WangWei] * @version : [v1.0] * @className : Producer * @description : [生产者] * @createTime : [2023/2/1 9:38] * @updateUser : [WangWei] * @updateTime : [2023/2/1 9:38] * @updateRemark : [描述说明本次修改内容] */ public class Producer { private static final String QUEUE_NAME = "Confirm"; public static void main(String[] args) throws IOException, TimeoutException, InterruptedException { //建立连接 RabbitMQUtils.getConnection(); //声明通道 Channel channel = RabbitMQUtils.getChannel(); //开启确认模式 channel.confirmSelect(); channel.queueDeclare(QUEUE_NAME,true,false,false,null); long start = System.currentTimeMillis(); //循环发送2条消息 for (int i = 0; i <200 ; i++) { String msg="消息确认模式消息:"+i; /*推送消息 *交换机命名,不填写使用默认的交换机 * routingKey -路由键- * props:消息的其他属性-路由头等正文 * msg消息正文 */ channel.basicPublish("",QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN,msg.getBytes(StandardCharsets.UTF_8)); // uses a 5 second timeout //如果超时过期,则抛出TimeoutException。如果任何消息被nack(丢失), waitForConfirmsOrDie将抛出IOException。 channel.waitForConfirmsOrDie(5_000); } long end = System.currentTimeMillis(); System.out.println("发送200条消息使用时间:"+(end-start)); } }
执行结果
waitForConfirmsOrDie(long)方法等待其确认。该方法在确认消息后立即返回。如果消息在超时内没有得到确认,或者消息被nack-ed(意味着代理由于某种原因无法处理它),该方法将抛出异常。异常的处理通常包括记录错误消息和/或重新尝试发送该消息。
这种技术非常简单,但也有一个主要缺点:它大大减慢了发布速度,因为对消息的确认会阻塞所有后续消息的发布。这种方法交付的吞吐量不会超过每秒发布的几百条消息。
批量发布消息
发布一批消息并等待整个批消息被确认。以100个为例
import com.rabbitmq.client.Channel; import com.rabbitmq.client.MessageProperties; import java.io.IOException; import java.nio.charset.StandardCharsets; import java.util.concurrent.TimeoutException; /** * @author : [WangWei] * @version : [v1.0] * @className : Producer * @description : [生产者] * @createTime : [2023/2/1 9:38] * @updateUser : [WangWei] * @updateTime : [2023/2/1 9:38] * @updateRemark : [描述说明本次修改内容] */ public class Producer2 { private static final String QUEUE_NAME = "Confirm"; public static void main(String[] args) throws IOException, TimeoutException, InterruptedException { //用于记录每发布100条消息进行一次确认 int batchSize = 100; int outstandingMessageCount = 0; //建立连接 RabbitMQUtils.getConnection(); //声明通道 Channel channel = RabbitMQUtils.getChannel(); //开启确认模式 channel.confirmSelect(); //声明持久化队列 channel.queueDeclare(QUEUE_NAME,true,false,false,null); long start = System.currentTimeMillis(); //循环发送200条消息 for (int i = 0; i <200 ; i++) { String msg="消息确认模式消息:"+i; /*推送消息 *交换机命名,不填写使用默认的交换机 * routingKey -路由键- * props:消息的其他属性-路由头等正文 * msg消息正文 */ channel.basicPublish("",QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN,msg.getBytes(StandardCharsets.UTF_8)); outstandingMessageCount++; //每100次确认一次 if (outstandingMessageCount == batchSize) { // uses a 5 second timeout //如果超时过期,则抛出TimeoutException。如果任何消息被nack(丢失), waitForConfirmsOrDie将抛出IOException。 channel.waitForConfirmsOrDie(5_000); outstandingMessageCount = 0; } } if (outstandingMessageCount > 0) { channel.waitForConfirmsOrDie(5_000); } long end = System.currentTimeMillis(); System.out.println("发送200条消息使用时间:"+(end-start)); } }
执行结果
与等待单个消息的确认相比,等待一批消息被确认大大提高了吞吐量(对于远程RabbitMQ节点,最多可提高20-30倍)。一个缺点是,在失败的情况下,我们无法确切地知道哪里出了问题,因此我们可能不得不在内存中保留整个批处理,以记录有意义的内容或重新发布消息。而且这种解决方案仍然是同步的,因此它会阻止消息的发布。
异步处理发布确认
异步处理发行商确认通常需要以下步骤:
- 提供一种将发布序列号与消息关联起来的方法。
- 在通道上注册一个确认侦听器,以便在发布者acks/nacks到达时得到通知,以执行适当的操作,如记录日志或重新发布nack-ed消息。在此步骤中,序列号到消息的关联机制也可能需要进行一些清理。
- 在发布消息之前跟踪发布序列号。
import com.rabbitmq.client.Channel; import com.rabbitmq.client.ConfirmCallback; import com.rabbitmq.client.MessageProperties; import java.io.IOException; import java.nio.charset.StandardCharsets; import java.time.Duration; import java.util.concurrent.ConcurrentNavigableMap; import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.TimeoutException; /** * @author : [WangWei] * @version : [v1.0] * @className : Producer * @description : [生产者] * @createTime : [2023/2/1 9:38] * @updateUser : [WangWei] * @updateTime : [2023/2/1 9:38] * @updateRemark : [描述说明本次修改内容] */ public class Producer3 { private static final String QUEUE_NAME = "Confirm"; public static void main(String[] args) throws IOException, TimeoutException { //建立连接 RabbitMQUtils.getConnection(); //声明通道 Channel channel = RabbitMQUtils.getChannel(); //开启确认模式 channel.confirmSelect(); //用于保存序列号与消息之前映射的容器 ConcurrentNavigableMap<Long, String> outstandingConfirms = new ConcurrentSkipListMap<>(); //清理映射关系的回调 //multiple 一个布尔值,如果为false则说明只有一条消息被确认或者丢失,如果为true这表示有小于或等于sequenceNumber的消息被确认或者丢失。 ConfirmCallback cleanOutstandingConfirms = (sequenceNumber, multiple) -> { if (multiple) { //如果包含值为true则返回该映射的部分视图,其键值小于或等于,sequenceNumber ConcurrentNavigableMap<Long, String> confirmed = outstandingConfirms.headMap( sequenceNumber, true ); //移除所有key和value confirmed.clear(); } else { //移除序列号对象的消息 outstandingConfirms.remove(sequenceNumber); } }; //两个回调一个用于确认消息,另一个用于nack-ed消息(可以被代理视为丢失的消息)。 //sequenceNumber用于标识确认的消息或者丢失的消息 // // channel.addConfirmListener((sequenceNumber,multiple)->{ // String body = outstandingConfirms.get(sequenceNumber); // System.out.println("ack message:"+body); // },(sequenceNumber,multiple)->{ // //当消息被丢失时。。。。 // String body = outstandingConfirms.get(sequenceNumber); // System.out.println("no ack message:"+body); // }); //当消息缺失的回调 channel.addConfirmListener(cleanOutstandingConfirms, (sequenceNumber, multiple) -> { String body = outstandingConfirms.get(sequenceNumber); System.out.println("没有确认的消息:"+body); //重新使用回调来清理映射 cleanOutstandingConfirms.handle(sequenceNumber, multiple); }); //声明持久化队列 channel.queueDeclare(QUEUE_NAME,true,false,false,null); long start = System.nanoTime(); //循环发送2条消息 for (int i = 0; i <200 ; i++) { String msg="消息确认模式消息:"+i; //使用map将发布序列号与消息的字符串体关联起来 //channel.getNextPublishSeqNo()获取下一个消息的序列号 outstandingConfirms.put(channel.getNextPublishSeqNo(), msg); /*推送消息 *交换机命名,不填写使用默认的交换机 * routingKey -路由键- * props:消息的其他属性-路由头等正文 * msg消息正文 */ channel.basicPublish("",QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN,msg.getBytes(StandardCharsets.UTF_8)); System.out.println(msg+"发布成功"); } long end = System.nanoTime(); System.out.println("发送200条消息使用时间:"+ Duration.ofNanos(end - start).toMillis()); } }
执行结果
思考点
如何追踪未完成的确认?
示例使用ConcurrentNavigableMap来跟踪未完成的确认。这种数据结构的方便有以下几个原因。
它允许轻松地将序列号与消息(无论消息数据是什么)关联起来,并轻松地将条目清理到给定的序列id(以处理多个确认/nack)。
最后,它支持并发访问,因为确认回调是在客户端库拥有的线程中调用的,应该与发布线程保持不同。
除了使用复杂的映射实现之外,还有其他方法可以跟踪未完成的确认,比如使用简单的并发散列映射和变量来跟踪发布序列的下界,但它们通常更复杂,
重新发布丢失的消息
从相应的回调中重新发布nack-ed消息可能很诱人,但应该避免这种情况,因为确认回调是在I/O线程中分派的,其中通道不应该执行操作。更好的解决方案是将消息放入由发布线程轮询的内存队列中。像ConcurrentLinkedQueue这样的类可以很好地在确认回调和发布线程之间传输消息。
总结
在某些应用程序中,确保发布的消息到达代理非常重要。发布者确认RabbitMQ的特性可以帮助满足这一要求。发布者确认本质上是异步的,但也可以同步地处理它们。没有确定的方法来实现发布者确认,这通常归结于应用程序和整个系统中的约束。典型的技术有:
单独发布消息,同步等待确认:简单,但吞吐量非常有限。
批量发布消息,等待批量的同步确认:简单、合理的吞吐量,但是很难判断出什么时候出了问题。
异步处理:最佳的性能和资源利用,良好的控制情况下的错误,但是实现较为复杂
收获
1.从官网学习rabbitmq的使用,严格落实先宏观,后微观,结合思维导图和三遍读书法.
2.学习的过程中,锤炼英文阅读,动手查字典和利用工具.
3. 理论和实践结合,理论学习完成之后,及时进行实践包括先代码实现,阶段总结等等.