【RabbitMQ三】——RabbitMQ工作队列模式(Work Queues)(下)

简介: 【RabbitMQ三】——RabbitMQ工作队列模式(Work Queues)(下)

消息持久化

我们已经学会了如何确保即使消费者死亡,消息也不会丢失。但是如果RabbitMQ服务器停止,我们的消息仍然会丢失。

当RabbitMQ退出或崩溃时,它会忘记队列和消息,除非你告诉它不要这样做。为了确保消息不会丢失,需要做两件事:我们需要将队列和消息都标记为持久的。


首先,我们需要确保队列在RabbitMQ节点重启后仍然存在。为了做到这一点,我们需要声明它是持久的

boolean durable = true;
channel.queueDeclare("task_queue2", durable, false, false, null);

注意:
RabbitMQ不允许你用不同的参数重新定义一个现有的队列,并且会向任何试图这样做的程序返回一个错误。

这个queueDeclare更改需要同时应用于生产者和使用者代码。
此时我们可以确定task_queue队列不会丢失,即使RabbitMQ重新启动。现在我们需要将消息标记为持久消息——通过将MessageProperties(实现BasicProperties)设置为值PERSISTENT_TEXT_PLAIN。

import com.rabbitmq.client.MessageProperties;
channel.basicPublish("", "task_queue2",
            MessageProperties.PERSISTENT_TEXT_PLAIN,
            message.getBytes());

公平调度

您可能已经注意到,调度仍然没有完全按照我们想要的方式工作。例如,在有两个工作人员的情况下,当所有奇数消息都很重而偶数消息都很轻时,一个工作人员将一直很忙,而另一个几乎不做任何工作。RabbitMQ对此一无所知,它仍然会均匀地分发消息。

这是因为RabbitMQ只是在消息进入队列时分派消息。它不会查看消费者的未确认消息的数量。它只是盲目地将每n个消息发送给第n个消费者。


为了克服这个问题,我们可以使用basicQos方法,设置prefetchCount = 1。这告诉RabbitMQ一次不要给一个消费者提供超过一条消息。或者,换句话说,在消费者处理并确认前一条消息之前,不要向它发送新消息。相反,它将把它分派给下一个不忙的工作人员。

int prefetchCount = 1;
channel.basicQos(prefetchCount);

消费者完整代码:

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
 * @author : [WangWei]
 * @version : [v1.0]
 * @className : ConsumerOne
 * @description : [消费者1]
 * @createTime : [2023/1/17 9:07]
 * @updateUser : [WangWei]
 * @updateTime : [2023/1/17 9:07]
 * @updateRemark : [描述说明本次修改内容]
 */
public class ConsumerOne {
    private static final String TASK_QUEUE_NAME = "task_queue";
    public static void main(String[] args) throws IOException, TimeoutException {
     RabbitMQUtils.getConnection();
        Channel channel = RabbitMQUtils.getChannel();
        channel.queueDeclare(TASK_QUEUE_NAME, false, false, false, null);
        System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
        int prefetchCount = 1;
    channel.basicQos(prefetchCount);
        //使用额外的DeliverCallback接口来缓冲服务器推送给我们的消息。
        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), "UTF-8");
            System.out.println(" [x] Received '" + message + "'");
            try {
                doWork(message);
            } finally {
                System.out.println(" [x] Done");
                channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
            }
        };
        boolean autoAck = false;
        channel.basicConsume(TASK_QUEUE_NAME, autoAck, deliverCallback, consumerTag -> { });
    }
    /*
     * @version V1.0
     * Title: doWork
     * @author Wangwei
     * @description 模拟任务的执行时间
     * @createTime  2023/1/18 9:21
     * @param [task]
     * @return void
     */
    private static void doWork(String task) {
        for (char ch : task.toCharArray()) {
            //如果消息中存在.则1秒之后继续,有几个.停止几秒
            if (ch == '.') {
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException _ignored) {
                    Thread.currentThread().interrupt();
                }
            }
        }
    }
}

验证公平调度

现在将消费者1中的Thread.sleep(1000)改为Thread.sleep(3000);不添加公平调度相关代码进行测试。

生产者完整代码:

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.MessageProperties;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;
/**
 * @author : [WangWei]
 * @version : [v1.0]
 * @className : Producer
 * @description : [生产者1]
 * @createTime : [2023/1/17 8:48]
 * @updateUser : [WangWei]
 * @updateTime : [2023/1/17 8:48]
 * @updateRemark : [描述说明本次修改内容]
 */
public class Producer {
    private static final String TASK_QUEUE_NAME = "task_queue1";
    public static void main(String[] args) throws IOException, TimeoutException {
        //连接RabbitMQ
        RabbitMQUtils.getConnection();
        //获取信道
        Channel channel = RabbitMQUtils.getChannel();
        /*
        * TASK_QUEUE_NAME 队列名称
        * durable 队列是否持久化,true表示队列为持久化, 持久化的队列会存盘,在服务器重启的时候会保证不丢失相关信息
        * exclusive 设置是否排他true表示队列为排他的, 如果一个队列被设置为排他队列,该队列仅对首次声明它的连接可见, 并在连接断开时自动删除,
            (这里需要注意三点:1.排他队列是基于连接Connection可见的,
            同一个连接的不同信道Channel是可以同时访问同一连接创建的排他队列;
            "首次"是指如果一个连接己经声明了一个排他队列,其他连接是不允许建立同名的排他队列的,
            这个与普通队列不同;即使该队列是持久化的,一旦连接关闭或者客户端退出,
            该排他队列都会被自动删除,这种队列适用于一个客户端同时发送和读取消息的应用场景)
        *autoDelete 设置是否自动删除。为true 则设置队列为自动删除。自动删除的前提是, 至少有一个消费者连接到这个队列,
            之后所有与这个队列连接的消费者都断开时,才会自动删除。
            不能把这个参数错误地理解为: "当连接到此队列的所有客户端断开时,这个队列自动删除",
            因为生产者客户端创建这个队列,或者没有消费者客户端与这个队列连接时,都不会自动删除这个队列
        *arguments 可以设置队列其它参数,设置队列的有效期,消息的最大长度,队列的消息生命周期等等
        *
        * */
        channel.queueDeclare(TASK_QUEUE_NAME,false,false,false,null);
        //发送10条消息
        for (int i = 0; i <10 ; i++) {
            String message="廊坊师范..."+i;
            /*
            *交换机命名,不填写使用默认的交换机
            * routingKey -路由键道具-消息的其他属性-路由头等正文
            * 消息正文
            * **/
            //推送消息
            channel.basicPublish("",TASK_QUEUE_NAME, null,message.getBytes(StandardCharsets.UTF_8));
            System.out.println(" [x] Sent '" + message + "'");
        }
    }
}

消费者1完整代码:

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
 * @author : [WangWei]
 * @version : [v1.0]
 * @className : ConsumerOne
 * @description : [消费者1]
 * @createTime : [2023/1/17 9:07]
 * @updateUser : [WangWei]
 * @updateTime : [2023/1/17 9:07]
 * @updateRemark : [描述说明本次修改内容]
 */
public class ConsumerOne {
    private static final String TASK_QUEUE_NAME = "task_queue1";
    public static void main(String[] args) throws IOException, TimeoutException {
     RabbitMQUtils.getConnection();
        Channel channel = RabbitMQUtils.getChannel();
        channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);
        System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
        //channel.basicQos(1);
        //使用额外的DeliverCallback接口来缓冲服务器推送给我们的消息。
        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), "UTF-8");
            System.out.println(" [x] Received '" + message + "'");
            try {
                doWork(message);
            } finally {
                System.out.println(" [x] Done");
                channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
            }
        };
        boolean autoAck = false;
        channel.basicConsume(TASK_QUEUE_NAME, autoAck, deliverCallback, consumerTag -> { });
    }
    /*
     * @version V1.0
     * Title: doWork
     * @author Wangwei
     * @description 模拟任务的执行时间
     * @createTime  2023/1/18 9:21
     * @param [task]
     * @return void
     */
    private static void doWork(String task) {
        for (char ch : task.toCharArray()) {
            //如果消息中存在.则1秒之后继续,有几个.停止几秒
            if (ch == '.') {
                try {
                    Thread.sleep(3000);
                } catch (InterruptedException _ignored) {
                    Thread.currentThread().interrupt();
                }
            }
        }
    }
}

消费者2完整代码:

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
 * @author : [WangWei]
 * @version : [v1.0]
 * @className : ConsumerTwo
 * @description : [消费者2]
 * @createTime : [2023/1/17 9:10]
 * @updateUser : [WangWei]
 * @updateTime : [2023/1/17 9:10]
 * @updateRemark : [描述说明本次修改内容]
 */
public class ConsumerTwo {
    private static final String TASK_QUEUE_NAME = "task_queue1";
    public static void main(String[] args) throws IOException, TimeoutException {
        RabbitMQUtils.getConnection();
        Channel channel = RabbitMQUtils.getChannel();
        channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);
        System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
        //channel.basicQos(1);
        //使用额外的DeliverCallback接口来缓冲服务器推送给我们的消息。
        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), "UTF-8");
            System.out.println(" [x] Received '" + message + "'");
            try {
                doWork(message);
            } finally {
                System.out.println(" [x] Done");
                channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
            }
        };
        boolean autoAck = false;
        channel.basicConsume(TASK_QUEUE_NAME, autoAck, deliverCallback, consumerTag -> { });
    }
    /*
     * @version V1.0
     * Title: doWork
     * @author Wangwei
     * @description 模拟任务的执行时间
     * @createTime  2023/1/18 9:21
     * @param [task]
     * @return void
     */
    private static void doWork(String task) {
        for (char ch : task.toCharArray()) {
            //如果消息中存在.则1秒之后继续,有几个.停止几秒
            if (ch == '.') {
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException _ignored) {
                    Thread.currentThread().interrupt();
                }
            }
        }
    }
}

执行结果:

消费者1:


消费者2:


结论:

有两个消费者的情况下,一个消费者1一直很忙(完成一条消息需要3秒),消费者2一致很轻松(完成一条消息需要1秒)。

RabbitMQ对此一无所知,它仍然会均匀地分发消息。导致消费者1和消费者2执行了同样的消息数量。

现在将消费者1中的Thread.sleep(1000)改为Thread.sleep(3000);添加公平调度相关代码进行测试。**

消费者1完整代码

        import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.DeliverCallback;
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    /**
     * @author : [WangWei]
     * @version : [v1.0]
     * @className : ConsumerOne
     * @description : [消费者1]
     * @createTime : [2023/1/17 9:07]
     * @updateUser : [WangWei]
     * @updateTime : [2023/1/17 9:07]
     * @updateRemark : [描述说明本次修改内容]
     */
    public class ConsumerOne {
        private static final String TASK_QUEUE_NAME = "task_queue1";
        public static void main(String[] args) throws IOException, TimeoutException {
            RabbitMQUtils.getConnection();
            Channel channel = RabbitMQUtils.getChannel();
            channel.queueDeclare(TASK_QUEUE_NAME, false, false, false, null);
            System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
            channel.basicQos(1);
            //使用额外的DeliverCallback接口来缓冲服务器推送给我们的消息。
            DeliverCallback deliverCallback = (consumerTag, delivery) -> {
                String message = new String(delivery.getBody(), "UTF-8");
                System.out.println(" [x] Received '" + message + "'");
                try {
                    doWork(message);
                } finally {
                    System.out.println(" [x] Done");
                    channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
                }
            };
            boolean autoAck = false;
            channel.basicConsume(TASK_QUEUE_NAME, autoAck, deliverCallback, consumerTag -> { });
        }
        /*
         * @version V1.0
         * Title: doWork
         * @author Wangwei
         * @description 模拟任务的执行时间
         * @createTime  2023/1/18 9:21
         * @param [task]
         * @return void
         */
        private static void doWork(String task) {
            for (char ch : task.toCharArray()) {
                //如果消息中存在.则1秒之后继续,有几个.停止几秒
                if (ch == '.') {
                    try {
                        Thread.sleep(3000);
                    } catch (InterruptedException _ignored) {
                        Thread.currentThread().interrupt();
                    }
                }
            }
        }
    }

消费者2完整代码:

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
 * @author : [WangWei]
 * @version : [v1.0]
 * @className : ConsumerTwo
 * @description : [消费者2]
 * @createTime : [2023/1/17 9:10]
 * @updateUser : [WangWei]
 * @updateTime : [2023/1/17 9:10]
 * @updateRemark : [描述说明本次修改内容]
 */
public class ConsumerTwo {
    private static final String TASK_QUEUE_NAME = "task_queue1";
    public static void main(String[] args) throws IOException, TimeoutException {
        RabbitMQUtils.getConnection();
        Channel channel = RabbitMQUtils.getChannel();
        channel.queueDeclare(TASK_QUEUE_NAME, false, false, false, null);
        System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
        channel.basicQos(1);
        //使用额外的DeliverCallback接口来缓冲服务器推送给我们的消息。
        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), "UTF-8");
            System.out.println(" [x] Received '" + message + "'");
            try {
                doWork(message);
            } finally {
                System.out.println(" [x] Done");
                channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
            }
        };
        boolean autoAck = false;
        channel.basicConsume(TASK_QUEUE_NAME, autoAck, deliverCallback, consumerTag -> { });
    }
    /*
     * @version V1.0
     * Title: doWork
     * @author Wangwei
     * @description 模拟任务的执行时间
     * @createTime  2023/1/18 9:21
     * @param [task]
     * @return void
     */
    private static void doWork(String task) {
        for (char ch : task.toCharArray()) {
            //如果消息中存在.则1秒之后继续,有几个.停止几秒
            if (ch == '.') {
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException _ignored) {
                    Thread.currentThread().interrupt();
                }
            }
        }
    }
}

执行结果:

消费者1:


消费者2:


结论:

置prefetchCount = 1。这告诉RabbitMQ一次不要给一个消费者提供超过一条消息。或者,换句话说,在消费者处理并确认前一条消息之前,不要向它发送新消息。相反,它将把它分派给下一个不忙的消费者。

总结

学习一门知识需要亲自动手去验证去证明这种方式是可行了,这样对于这个知识点才算是理解的更深。按照这样做一定行,而不是应该可以,大概可以吧。

如果博主的文章对您有所帮助,可以评论、点赞、收藏,支持一下博主!!!

相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
目录
相关文章
|
2月前
|
消息中间件
RabbitMQ的 RPC 消息模式你会了吗?
【9月更文挑战第11天】RabbitMQ 的 RPC(远程过程调用)消息模式允许客户端向服务器发送请求并接收响应。其基本原理包括:1) 客户端发送请求,创建回调队列并设置关联标识符;2) 服务器接收请求并发送响应至回调队列;3) 客户端根据关联标识符接收并匹配响应。实现步骤涵盖客户端和服务器的连接、信道创建及请求处理。注意事项包括关联标识符唯一性、回调队列管理、错误处理及性能考虑。RPC 模式适用于构建可靠的分布式应用程序,但需根据需求调整优化。
|
3月前
|
消息中间件 开发者
【RabbitMQ深度解析】Topic交换器与模式匹配:掌握消息路由的艺术!
【8月更文挑战第24天】在消息队列(MQ)体系中,交换器作为核心组件之一负责消息路由。特别是`topic`类型的交换器,它通过模式匹配实现消息的精准分发,适用于发布-订阅模式。不同于直接交换器和扇形交换器,`topic`交换器支持更复杂的路由策略,通过带有通配符(如 * 和 #)的模式字符串来定义队列与交换器间的绑定关系。
65 2
|
3月前
|
消息中间件
RabbitMQ广播模式
RabbitMQ广播模式
55 1
|
3月前
|
消息中间件 应用服务中间件 网络安全
rabbitMQ镜像模式搭建
rabbitMQ镜像模式搭建
|
4月前
|
消息中间件 传感器 负载均衡
消息队列 MQ使用问题之如何配置一主一从的同步复制模式
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
消息队列 MQ使用问题之如何配置一主一从的同步复制模式
|
4月前
|
消息中间件 存储 Kafka
MetaQ/RocketMQ 原理问题之RocketMQ DLedger融合模式的问题如何解决
MetaQ/RocketMQ 原理问题之RocketMQ DLedger融合模式的问题如何解决
|
3月前
|
消息中间件 Java Maven
RabbitMQ通配符模式
RabbitMQ通配符模式
59 0
|
4月前
|
消息中间件 Java Apache
消息队列 MQ使用问题之如何在内外网环境下使用单组节点单副本模式
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
|
4月前
|
消息中间件 负载均衡 RocketMQ
MetaQ/RocketMQ 原理问题之在广播模式下,RebalanceService工作的问题如何解决
MetaQ/RocketMQ 原理问题之在广播模式下,RebalanceService工作的问题如何解决
|
5月前
|
消息中间件
RabbitMQ配置单活模式队列
RabbitMQ配置单活模式队列
119 0