RabbitMQ中的消息确认机制是什么?为什么需要消息确认?

简介: RabbitMQ中的消息确认机制是什么?为什么需要消息确认?

RabbitMQ中的消息确认机制是什么?为什么需要消息确认?

RabbitMQ中的消息确认机制是指生产者发送消息后,等待消费者确认消息已经被正确接收和处理的一种机制。消息确认机制的主要目的是确保消息的可靠传递和处理,以避免消息丢失或重复处理的情况发生。

为什么需要消息确认机制呢?在分布式系统中,消息的发送和接收是异步的过程,可能会存在以下情况:

  1. 消息丢失:在消息发送过程中,可能由于网络故障、硬件故障或其他原因导致消息丢失。如果没有消息确认机制,生产者无法得知消息是否成功传递给消费者,从而无法保证消息的可靠性。
  2. 消息重复:在消息发送过程中,可能由于网络超时、消费者故障或其他原因导致消息重复发送。如果没有消息确认机制,消费者可能会多次处理同一条消息,导致重复操作和数据不一致的问题。

为了解决以上问题,RabbitMQ引入了消息确认机制。消息确认机制包括两个重要的概念:发布确认(Publish Confirm)和消费确认(Consumer Acknowledgement)。

发布确认是指生产者发送消息后,等待RabbitMQ服务器返回确认消息的过程。生产者可以通过调用channel.confirmSelect()方法启用发布确认模式,然后使用channel.waitForConfirms()方法等待服务器返回确认消息。如果服务器成功接收到消息并进行处理,会返回一个确认消息给生产者。

下面是一个使用Java编写的代码案例,演示了如何使用消息确认机制:

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.ConfirmListener;
import java.io.IOException;
import java.util.Collections;
import java.util.SortedSet;
import java.util.TreeSet;
import java.util.concurrent.TimeoutException;
public class MessageConfirmationExample {
    private final static String QUEUE_NAME = "my_queue";
    private final static String MESSAGE = "Hello, RabbitMQ!";
    public static void main(String[] args) throws IOException, TimeoutException {
        // 创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        // 创建连接
        Connection connection = factory.newConnection();
        // 创建通道
        Channel channel = connection.createChannel();
        // 声明队列
        channel.queueDeclare(QUEUE_NAME, true, false, false, null);
        // 启用发布确认模式
        channel.confirmSelect();
        // 创建一个有序集合,用于保存未确认的消息的Delivery Tag
        SortedSet<Long> unconfirmedSet = Collections.synchronizedSortedSet(new TreeSet<>());
        // 添加发布确认监听器
        channel.addConfirmListener(new ConfirmListener() {
            @Override
            public void handleAck(long deliveryTag, boolean multiple) throws IOException {
                if (multiple) {
                    // 多条消息被确认,移除所有小于等于deliveryTag的消息
                    unconfirmedSet.headSet(deliveryTag + 1).clear();
                } else {
                    // 单条消息被确认,移除该消息
                    unconfirmedSet.remove(deliveryTag);
                }
                System.out.println("Message confirmed: " + deliveryTag);
            }
            @Override
            public void handleNack(long deliveryTag, boolean multiple) throws IOException {
                if (multiple) {
                    // 多条消息未被确认,重新发送所有小于等于deliveryTag的消息
                    unconfirmedSet.headSet(deliveryTag + 1).forEach(tag -> {
                        try {
                            sendMessage(channel, tag);
                        } catch (IOException e) {
                            e.printStackTrace();
                        }
                    });
                } else {
                    // 单条消息未被确认,重新发送该消息
                    sendMessage(channel, deliveryTag);
                }
                System.out.println("Message not confirmed: " + deliveryTag);
            }
        });
        // 发送消息
        for (int i = 1; i <= 10; i++) {
            long deliveryTag = sendMessage(channel, i);
            unconfirmedSet.add(deliveryTag);
        }
        // 等待所有消息被确认
        try {
            channel.waitForConfirmsOrDie();
            System.out.println("All messages confirmed!");
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        // 关闭通道和连接
        channel.close();
        connection.close();
    }
    private static long sendMessage(Channel channel, int messageNumber) throws IOException {
        long deliveryTag = channel.getNextPublishSeqNo();
        channel.basicPublish("", QUEUE_NAME, null, (MESSAGE + " " + messageNumber).getBytes());
        System.out.println("Sent message: " + messageNumber);
        return deliveryTag;
    }
}

在上面的代码中,首先我们创建了一个连接工厂,并设置RabbitMQ服务器的主机地址。然后,我们使用连接工厂创建了一个连接,并使用连接创建了一个通道。接下来,我们声明了一个名为"my_queue"的队列。然后,我们启用了发布确认模式,通过调用channel.confirmSelect()方法。同时,我们创建了一个有序集合unconfirmedSet,用于保存未确认的消息的Delivery Tag。

然后,我们添加了一个发布确认监听器ConfirmListener,在监听器中实现了handleAck和handleNack方法。当消息被确认时,handleAck方法会被调用,我们可以在该方法中处理确认的逻辑,例如从unconfirmedSet中移除已确认的消息。当消息未被确认时,handleNack方法会被调用,我们可以在该方法中处理未确认的逻辑,例如重新发送未确认的消息。

接着,我们使用sendMessage方法发送了10条消息,并将每条消息的Delivery Tag保存到unconfirmedSet中。然后,我们使用channel.waitForConfirmsOrDie()方法等待所有消息被确认。如果在指定的时间内没有收到所有消息的确认消息,会抛出InterruptedException异常。

相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
7月前
|
消息中间件 存储 监控
|
消息中间件 存储 负载均衡
一文读懂RocketMQ的高可用机制——消息发送高可用
一文读懂RocketMQ的高可用机制——消息发送高可用
416 1
|
7月前
|
消息中间件 存储 运维
|
7月前
|
消息中间件 负载均衡 Java
【深入浅出RocketMQ原理及实战】「底层原理挖掘系列」透彻剖析贯穿RocketMQ的消息消费长轮训机制体系的原理分析
【深入浅出RocketMQ原理及实战】「底层原理挖掘系列」透彻剖析贯穿RocketMQ的消息消费长轮训机制体系的原理分析
105 0
|
7月前
|
消息中间件 存储 安全
【深入浅出RocketMQ原理及实战】「底层原理挖掘系列」透彻剖析贯穿RocketMQ的消息顺序消费和并发消费机制体系的原理分析
【深入浅出RocketMQ原理及实战】「底层原理挖掘系列」透彻剖析贯穿RocketMQ的消息顺序消费和并发消费机制体系的原理分析
105 0
|
5月前
|
消息中间件 JavaScript RocketMQ
消息队列 MQ使用问题之过期删除机制的触发条件是什么
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
消息队列 MQ使用问题之过期删除机制的触发条件是什么
|
4月前
|
消息中间件 RocketMQ
RocketMQ - 消费者进度保存机制
RocketMQ - 消费者进度保存机制
83 0
|
4月前
|
消息中间件 RocketMQ
RocketMQ - 消费者Rebalance机制
RocketMQ - 消费者Rebalance机制
71 0
|
4月前
|
消息中间件 存储 缓存
RocketMQ - 消费者启动机制
RocketMQ - 消费者启动机制
63 0
|
6月前
|
消息中间件 Apache RocketMQ
消息队列 MQ产品使用合集之是否提供机制检测消费的状态
阿里云消息队列MQ(Message Queue)是一种高可用、高性能的消息中间件服务,它允许您在分布式应用的不同组件之间异步传递消息,从而实现系统解耦、流量削峰填谷以及提高系统的可扩展性和灵活性。以下是使用阿里云消息队列MQ产品的关键点和最佳实践合集。