RabbitMQ (HelloWord 消息应答 持久化 不公平分发 预取值)2

本文涉及的产品
容器服务 Serverless 版 ACK Serverless,952元额度 多规格
容器服务 Serverless 版 ACK Serverless,317元额度 多规格
简介: RabbitMQ (HelloWord 消息应答 持久化 不公平分发 预取值)2

消息自动重新入队

如果消费者由于某些原因失去连接(其通道已关闭,连接已关闭或TCP连接丢失),导致消息未发送ACK确认,RabbitMQ将了解到消息未完全处理,并将对其重新排队。如果此时其他消费者可以处理,它将很快将其重新分发给另一个消费者。这样,即使某个消费者偶尔死亡,也可以确保不会丢失任何消息。



手动应答代码

消息手动应答(生产者)

public class Task2 {
    //队列名称
    public static final String TASK_QUEUE_NAME = "ack_queue";
    public static void main(String[] args) throws Exception {
        Channel channel = RabbitMqUtils.getChannel();
        //声明队列
        channel.queueDeclare(TASK_QUEUE_NAME,false,false,false,null);
        //从控制台中输入信息
        Scanner scanner = new Scanner(System.in);
        while (scanner.hasNext()){
            String message = scanner.next();
            channel.basicPublish("",TASK_QUEUE_NAME,null,message.getBytes());
            System.out.println("生产者发出消息:"+message);
        }
    }
}

消息手动应答(消费者)

public class Work03 {
    //队列名称
    public static final String TASK_QUEUE_NAME="ack_queue";
    //接收消息
    public static void main(String[] args) throws Exception {
        Channel channel = RabbitMqUtils.getChannel();
        System.out.println("C1等待接收消息处理时间较短");
        DeliverCallback deliverCallback = (consumeTag,message)->{
            //沉睡1S
            try {
                SleepUtils.sleep(1);
                System.out.println("接收到的消息:"+new String(message.getBody(),"UTF-8"));
                //手动应答
                //1.消息的标记 tag
                //2. 是否批量应答 false:不批量应答通信道中的消息 true:批量
                channel.basicAck(message.getEnvelope().getDeliveryTag(),false);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        };
        //采用手动应答
        boolean antoAck = false;
        channel.basicConsume(TASK_QUEUE_NAME,antoAck,deliverCallback,(consumerTag->{
            System.out.println(consumerTag+"消费者取消消费接口回调逻辑");
        }));
    }
}
public class Work04 {
    //队列名称
    public static final String TASK_QUEUE_NAME="ack_queue";
    //接收消息
    public static void main(String[] args) throws Exception {
        Channel channel = RabbitMqUtils.getChannel();
        System.out.println("C2等待接收消息处理时间较短");
        DeliverCallback deliverCallback = (consumeTag,message)->{
            //沉睡1S
            try {
                SleepUtils.sleep(30);
                System.out.println("接收到的消息:"+new String(message.getBody(),"UTF-8"));
                //手动应答
                //1.消息的标记 tag
                //2. 是否批量应答 false:不批量应答通信道中的消息 true:批量
                channel.basicAck(message.getEnvelope().getDeliveryTag(),false);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        };
        //采用手动应答
        boolean antoAck = false;
        channel.basicConsume(TASK_QUEUE_NAME,antoAck,deliverCallback,(consumerTag->{
            System.out.println(consumerTag+"消费者取消消费接口回调逻辑");
        }));
    }
}

消息手动应答(结果成功)






RabbitMQ持久化

刚刚我们已经看到了如何处理任务不丢失的情况,但是如何保障当Rabbi1MQJ服务停掉以后消息生产者发送过来的消息不丢失。默认情况下RahbitMQ退出或由于某种原因崩溃时,它忽视队列和消息,除非告知它不要这样做。确保消息不会丢尖需要做两件事:我们需要将队列和消息都标记为持久化。

当队列持久化的时候

此处会显示D,这个时候即使重启rabbitmq队列消息也依然存在,但是需要注意的就是如果之前声明的队列不是持久化的,需要把原先队列先删除,或者重新创建一个持久化的队列,不然就会出现错误

队列实现持久化

public class Task2 {
    //队列名称
    public static final String TASK_QUEUE_NAME = "ack_queue";
    public static void main(String[] args) throws Exception {
        Channel channel = RabbitMqUtils.getChannel();
        //声明队列
        boolean durable = true;//需要让Queue进行持久化
        channel.queueDeclare(TASK_QUEUE_NAME,durable,false,false,null);
        //从控制台中输入信息
        Scanner scanner = new Scanner(System.in);
        while (scanner.hasNext()){
            String message = scanner.next();
            channel.basicPublish("",TASK_QUEUE_NAME,null,message.getBytes());
            System.out.println("生产者发出消息:"+message);
        }
    }
}

消息实现持久化

我们上面说到的队列持久化,只能保证队列不丢失,但不能保证消息不丢失,所以我们还需要给消息添加一个持久化,要想让消息实现持久化需要在消息生产者修改代码,MessageProperties.PERSISTENT_TEXT_PLAIN添加这个属性

将消息标记为持久化并不能完全保证不会丢失消息。尽管它告诉RabbitMQ将消息保存到磁盘,但是这里依然存在当消息刚准备存储在磁盘的时候但是还没有存储完,消息还在缓存的一个间隔点。此时并没有真正写入磁盘。持久性保证并不强

不公平分发

在最开始的时候我们学习到RabbitMQ分发消息采用的轮训分发,但是在某种场景下这种策略并不是很好,比方说有两个消费者在处理任务,其中有个消费者1处理任务的速度非常快,而另外一个消费者2处理速度却很慢,这个时候我们还是采用轮训分发的化就会到这处理速度快的这个消费者很大一部分时间处于空闲状态,而处理慢的那个消费者一直在干活,这种分配方式在这种情况下其实就不太好,但是RabbitMQ并不知道这种情况它依然很公平的进行分发。

为了避免这种情况,我们可以设置参数channel.basicQos(1);



当这里设置成1的时候,就说明现在这个队列是不公平分发
如何更改不公平分发呢?


我们只需给消费者设置这样一个参数

预取值


这里如果是0的话是轮训分发,1的话是不公平分发,其它大于1值的话就是预取值,可以事先规定好给该队列分配几条数据

相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
目录
相关文章
|
6月前
|
消息中间件 网络协议
RabbitMQ消息的应答
RabbitMQ消息的应答
56 0
|
6月前
|
消息中间件 存储 缓存
RabbitMQ之消息应答和持久化
【1月更文挑战第11天】 一、消息应答 1.概念 2.自动应答 3.消息应答方法 4.Multiple 的解释 5.消息自动重新入队 6.消息手动应答代码 7.手动应答效果演示 二、RabbitMQ持久化 1.概念 2.队列如何实现持久化 3.消息实现持久化 4.不公平分发 5.预取值
302 7
|
消息中间件 存储 Kubernetes
k8s1.20版本部署RabbitMQ集群(持久化)——2023.05
k8s1.20版本部署RabbitMQ集群(持久化)——2023.05
678 1
|
2月前
|
消息中间件 存储 JSON
RocketMQ 消费进度持久化
本文介绍了RocketMQ中消费进度的持久化机制,包括普通消息和延迟消息的消费偏移量是如何存储的。普通消息的消费进度存储于`consumerOffset.json`文件,格式为`{Topic}@{ConsumerGroup}`,而延迟消息则存储于`delayOffset.json`文件,以`{delayLevel:offset}`的形式记录。文章详细分析了相关文件内容及代码实现,并指出Broker分别以5秒和10秒的间隔进行持久化操作。
|
消息中间件
消息中间件系列教程(16) -RabbitMQ-应答模式
消息中间件系列教程(16) -RabbitMQ-应答模式
66 0
|
6月前
|
消息中间件 存储 Java
RabbitMQ中的消息持久化是如何实现的?
RabbitMQ中的消息持久化是如何实现的?
112 0
|
消息中间件
我们一起来学RabbitMQ 三:RabbiMQ 死信队列,延迟队列,持久化等知识点
我们一起来学RabbitMQ 三:RabbiMQ 死信队列,延迟队列,持久化等知识点
|
消息中间件 Linux
centos7 yum快速安装rabbitmq服务
centos7 yum快速安装rabbitmq服务
224 0
|
消息中间件 中间件 微服务
RabbitMQ 入门简介及安装
RabbitMQ 入门简介及安装
120 0
|
消息中间件 Ubuntu Shell
ubuntu安装rabbitmq教程 避坑
ubuntu安装rabbitmq教程 避坑
479 0