【RabbitMQ三】——RabbitMQ工作队列模式(Work Queues)(上)

简介: 【RabbitMQ三】——RabbitMQ工作队列模式(Work Queues)(上)

RabbitMQ工作队列模式

为什么要有工作队列模式




1.rabbitmq所有模式都具备的优点:避免立即执行资源密集型型任务,并且不得不等待它完成。我们可以将这个资源密集型任务安排再以后完成。将任务封装为消息并将其发送到队列中,后台运行工作进行将从队列中获取任务,并执行任务。

2.在工作队列模式中可以有多个worker时,任务将在他们之间共享,可以并行处理任务。例如队列中一共有10个任务,有两个worker,他们将共同完成10个任务。可以是第一个worker完成4个任务,第二个worker完成6个任务。又或者是第一个worker完成5个任务,第二个worker完成5个任务。并行处理任务,大大缩短了时间,提高了执行效率。

如何使用工作队列模式

示例业务:一个生产者,生产10条消息(任务)并放入工作队列中。两个消费者(也可以理解为工人)共同执行消息(任务)

RabbitMQUtils工具类,用于创建连接和信道

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.MessageProperties;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
 * @author : [WangWei]
 * @version : [v1.0]
 * @className : RabbitMQUtils
 * @description : [rabbitmq工具类]
 * @createTime : [2023/1/17 8:49]
 * @updateUser : [WangWei]
 * @updateTime : [2023/1/17 8:49]
 * @updateRemark : [描述说明本次修改内容]
 */
public class RabbitMQUtils {
    /*
     * @version V1.0
     * Title: getConnection
     * @author Wangwei
     * @description 创建rabbitmq连接
     * @createTime  2023/1/17 8:52
     * @param []
     * @return com.rabbitmq.client.Connection
     */
    public static Connection getConnection() throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        //RabbitMQ服务器ip地址
        factory.setHost("ip");
        //端口号  默认值为5672
        factory.setPort(5672);
        //为用户分配的虚拟主机 默认值为/
        factory.setVirtualHost("/");
        //用户名  默认为guest
        factory.setUsername("用户名");
        //密码   默认为guest
        factory.setPassword("密码");
        //创建连接
        Connection connection=factory.newConnection();
        return  connection;
    }
    /*
     * @version V1.0
     * Title: getChannel
     * @author Wangwei
     * @description 创建信道
     * @createTime  2023/1/17 8:55
     * @param []
     * @return com.rabbitmq.client.Channel
     */
    public static Channel getChannel() throws IOException, TimeoutException {
        Connection connection=getConnection();
        Channel channel=connection.createChannel();
        return channel;
    }
}

Producer生产者:

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.MessageProperties;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;
/**
 * @author : [WangWei]
 * @version : [v1.0]
 * @className : Producer
 * @description : [生产者1]
 * @createTime : [2023/1/17 8:48]
 * @updateUser : [WangWei]
 * @updateTime : [2023/1/17 8:48]
 * @updateRemark : [描述说明本次修改内容]
 */
public class Producer {
    private static final String TASK_QUEUE_NAME = "task_queue";
    public static void main(String[] args) throws IOException, TimeoutException {
        //连接RabbitMQ
        RabbitMQUtils.getConnection();
        //获取信道
        Channel channel = RabbitMQUtils.getChannel();
        /*
        * TASK_QUEUE_NAME 队列名称
        * durable 队列是否持久化,true表示队列为持久化, 持久化的队列会存盘,在服务器重启的时候会保证不丢失相关信息
        * exclusive 设置是否排他true表示队列为排他的, 如果一个队列被设置为排他队列,该队列仅对首次声明它的连接可见, 并在连接断开时自动删除,
            (这里需要注意三点:1.排他队列是基于连接Connection可见的,
            同一个连接的不同信道Channel是可以同时访问同一连接创建的排他队列;
            "首次"是指如果一个连接己经声明了一个排他队列,其他连接是不允许建立同名的排他队列的,
            这个与普通队列不同;即使该队列是持久化的,一旦连接关闭或者客户端退出,
            该排他队列都会被自动删除,这种队列适用于一个客户端同时发送和读取消息的应用场景)
        *autoDelete 设置是否自动删除。为true 则设置队列为自动删除。自动删除的前提是, 至少有一个消费者连接到这个队列,
            之后所有与这个队列连接的消费者都断开时,才会自动删除。
            不能把这个参数错误地理解为: "当连接到此队列的所有客户端断开时,这个队列自动删除",
            因为生产者客户端创建这个队列,或者没有消费者客户端与这个队列连接时,都不会自动删除这个队列
        *arguments 可以设置队列其它参数,设置队列的有效期,消息的最大长度,队列的消息生命周期等等
        *
        * */
        channel.queueDeclare(TASK_QUEUE_NAME,false,false,false,null);
        //发送10条消息
        for (int i = 0; i <10 ; i++) {
            String message="廊坊师范..."+i;
            /*
            *交换机命名,不填写使用默认的交换机
            * routingKey -路由键道具-消息的其他属性-路由头等正文
            * 消息正文
            * **/
            //推送消息
            channel.basicPublish("",TASK_QUEUE_NAME, null,message.getBytes(StandardCharsets.UTF_8));
            System.out.println(" [x] Sent '" + message + "'");
        }
    }
}

ConsumerOne消费者1

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
 * @author : [WangWei]
 * @version : [v1.0]
 * @className : ConsumerOne
 * @description : [消费者1]
 * @createTime : [2023/1/17 9:07]
 * @updateUser : [WangWei]
 * @updateTime : [2023/1/17 9:07]
 * @updateRemark : [描述说明本次修改内容]
 */
public class ConsumerOne {
    private static final String TASK_QUEUE_NAME = "task_queue";
    public static void main(String[] args) throws IOException, TimeoutException {
     RabbitMQUtils.getConnection();
        Channel channel = RabbitMQUtils.getChannel();
        channel.queueDeclare(TASK_QUEUE_NAME, false, false, false, null);
        System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
        //使用额外的DeliverCallback接口来缓冲服务器推送给我们的消息。
        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), "UTF-8");
            System.out.println(" [x] Received '" + message + "'");
            try {
                doWork(message);
            } finally {
                System.out.println(" [x] Done");
                channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
            }
        };
        channel.basicConsume(TASK_QUEUE_NAME, true, deliverCallback, consumerTag -> { });
    }
    /*
     * @version V1.0
     * Title: doWork
     * @author Wangwei
     * @description 模拟任务的执行时间
     * @createTime  2023/1/18 9:21
     * @param [task]
     * @return void
     */
    private static void doWork(String task) {
        for (char ch : task.toCharArray()) {
            //如果消息中存在.则1秒之后继续,有几个.停止几秒
            if (ch == '.') {
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException _ignored) {
                    Thread.currentThread().interrupt();
                }
            }
        }
    }
}

ConsumerTwo消费者2

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
 * @author : [WangWei]
 * @version : [v1.0]
 * @className : ConsumerTwo
 * @description : [消费者2]
 * @createTime : [2023/1/17 9:10]
 * @updateUser : [WangWei]
 * @updateTime : [2023/1/17 9:10]
 * @updateRemark : [描述说明本次修改内容]
 */
public class ConsumerTwo {
    private static final String TASK_QUEUE_NAME = "task_queue";
    public static void main(String[] args) throws IOException, TimeoutException {
        RabbitMQUtils.getConnection();
        Channel channel = RabbitMQUtils.getChannel();
        channel.queueDeclare(TASK_QUEUE_NAME, false, false, false, null);
        System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
        //使用额外的DeliverCallback接口来缓冲服务器推送给我们的消息。
        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), "UTF-8");
            System.out.println(" [x] Received '" + message + "'");
            try {
                doWork(message);
            } finally {
                System.out.println(" [x] Done");
                channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
            }
        };
        channel.basicConsume(TASK_QUEUE_NAME, true, deliverCallback, consumerTag -> { });
    }
    /*
     * @version V1.0
     * Title: doWork
     * @author Wangwei
     * @description 模拟任务的执行时间
     * @createTime  2023/1/18 9:21
     * @param [task]
     * @return void
     */
    private static void doWork(String task) {
        for (char ch : task.toCharArray()) {
            //如果消息中存在.则1秒之后继续,有几个.停止几秒
            if (ch == '.') {
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException _ignored) {
                    Thread.currentThread().interrupt();
                }
            }
        }
    }
}

运行程序

先启动两个消费者,再启动生产者

可以看到生产者发送了10条消息

消费者1和消费者2分别消费了5条消息





轮询

默认情况下,RabbitMQ将按顺序将每条消息发送给下一个消费者。平均每个消费者都会收到相同数量的消息。这种分发消息的方式称为轮询。

从上面我们的示例中就可以看出,两个消费者分别消费了5条消息,并且都是消息的消费都是按照顺序进行消费。

消息确认

执行一个任务可能需要几秒钟的时间,您可能想知道如果使用者启动了一个较长的任务,并且在完成之前终止了该任务会发生什么。在我们当前的代码中,一旦RabbitMQ将消息传递给用户,它就会立即将其标记为删除。在这种情况下,如果终止一个消费者,它刚刚处理的消息就会丢失。发送给这个特定消费者但尚未处理的消息也会丢失。


但是我们不想失去任何消息。如果一个消费者死了,我们希望将任务传递给另一个消费者。


为了确保消息永远不会丢失,RabbitMQ支持消息确认。一个确认信息被消费者发送回来,告诉RabbitMQ已经接收、处理了一个特定的消息,并且RabbitMQ可以自由删除它。


如果一个消费者死了(它的通道关闭了,连接关闭了,或者TCP连接丢失了)而没有发送ack, RabbitMQ会理解消息没有被完全处理,并将其重新排队。如果同时有其他消费者在线,它将迅速将其重新发送给另一个消费者。这样你就可以确保没有信息丢失,即使消费者偶尔死亡。


对消费者交付确认强制执行时间(默认为30分钟)。这有助于检测从不确认交付的错误(卡住)消费者。您可以根据交付确认超时增加此超时时间。


默认情况下,手动消息确认是打开的。一旦我们完成了一个任务,是时候将这个标志设置为false并从worker发送一个适当的确认。

示例代码:

boolean autoAck = false;
channel.basicConsume(TASK_QUEUE_NAME, autoAck, deliverCallback, consumerTag -> { });

消费者完整代码

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
 * @author : [WangWei]
 * @version : [v1.0]
 * @className : ConsumerOne
 * @description : [消费者1]
 * @createTime : [2023/1/17 9:07]
 * @updateUser : [WangWei]
 * @updateTime : [2023/1/17 9:07]
 * @updateRemark : [描述说明本次修改内容]
 */
public class ConsumerOne {
    private static final String TASK_QUEUE_NAME = "task_queue";
    public static void main(String[] args) throws IOException, TimeoutException {
     RabbitMQUtils.getConnection();
        Channel channel = RabbitMQUtils.getChannel();
        channel.queueDeclare(TASK_QUEUE_NAME, false, false, false, null);
        System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
        //使用额外的DeliverCallback接口来缓冲服务器推送给我们的消息。
        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), "UTF-8");
            System.out.println(" [x] Received '" + message + "'");
            try {
                doWork(message);
            } finally {
                System.out.println(" [x] Done");
                channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
            }
        };
        boolean autoAck = false;
        channel.basicConsume(TASK_QUEUE_NAME, autoAck, deliverCallback, consumerTag -> { });
    }
    /*
     * @version V1.0
     * Title: doWork
     * @author Wangwei
     * @description 模拟任务的执行时间
     * @createTime  2023/1/18 9:21
     * @param [task]
     * @return void
     */
    private static void doWork(String task) {
        for (char ch : task.toCharArray()) {
            //如果消息中存在.则1秒之后继续,有几个.停止几秒
            if (ch == '.') {
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException _ignored) {
                    Thread.currentThread().interrupt();
                }
            }
        }
    }
}

使用这段代码,您可以确保即使使用CTRL+C终止一个正在处理消息的消费者,也不会丢失任何东西。在worker终止后不久,所有未确认的消息都将被重新传递。
确认必须在接收传递的同一通道上发送。尝试使用不同的通道进行确认将导致通道级协议异常。

验证消息确认

验证步骤:启动加入消息确认的两个消费者,并启动生产者。再中断消费者1进行观察控制台输出结果。

消费者代码:

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
 * @author : [WangWei]
 * @version : [v1.0]
 * @className : ConsumerOne
 * @description : [消费者1]
 * @createTime : [2023/1/17 9:07]
 * @updateUser : [WangWei]
 * @updateTime : [2023/1/17 9:07]
 * @updateRemark : [描述说明本次修改内容]
 */
public class ConsumerOne {
    private static final String TASK_QUEUE_NAME = "task_queue";
    public static void main(String[] args) throws IOException, TimeoutException {
     RabbitMQUtils.getConnection();
        Channel channel = RabbitMQUtils.getChannel();
        channel.queueDeclare(TASK_QUEUE_NAME, false, false, false, null);
        System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
        //使用额外的DeliverCallback接口来缓冲服务器推送给我们的消息。
        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), "UTF-8");
            System.out.println(" [x] Received '" + message + "'");
            try {
                doWork(message);
            } finally {
                System.out.println(" [x] Done");
                channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
            }
        };
        boolean autoAck = false;
        channel.basicConsume(TASK_QUEUE_NAME, autoAck, deliverCallback, consumerTag -> { });
    }
    /*
     * @version V1.0
     * Title: doWork
     * @author Wangwei
     * @description 模拟任务的执行时间
     * @createTime  2023/1/18 9:21
     * @param [task]
     * @return void
     */
    private static void doWork(String task) {
        for (char ch : task.toCharArray()) {
            //如果消息中存在.则1秒之后继续,有几个.停止几秒
            if (ch == '.') {
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException _ignored) {
                    Thread.currentThread().interrupt();
                }
            }
        }
    }
}

生产者:


消费者1

可以看到消费者1在收到第六条消息之后再处理过程中,消费者1被中断了。第六条消息并没有执行完。


消费者2

可以看到第六条消息被重新传递给了消费者2,第六条消息并没有丢失。


再次验证:将两个消费者的消息确认代码除去再进行验证。启动两个消费者,并启动生产者。再中断消费者1进行观察控制台输出结果。

消费者代码:

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
 * @author : [WangWei]
 * @version : [v1.0]
 * @className : ConsumerTwo
 * @description : [消费者2]
 * @createTime : [2023/1/17 9:10]
 * @updateUser : [WangWei]
 * @updateTime : [2023/1/17 9:10]
 * @updateRemark : [描述说明本次修改内容]
 */
public class ConsumerTwo {
    private static final String TASK_QUEUE_NAME = "task_queue";
    public static void main(String[] args) throws IOException, TimeoutException {
        RabbitMQUtils.getConnection();
        Channel channel = RabbitMQUtils.getChannel();
        channel.queueDeclare(TASK_QUEUE_NAME, false, false, false, null);
        System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
        //使用额外的DeliverCallback接口来缓冲服务器推送给我们的消息。
        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), "UTF-8");
            System.out.println(" [x] Received '" + message + "'");
            try {
                doWork(message);
            } finally {
                System.out.println(" [x] Done");
                channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
            }
        };
        channel.basicConsume(TASK_QUEUE_NAME, true, deliverCallback, consumerTag -> { });
    }
    /*
     * @version V1.0
     * Title: doWork
     * @author Wangwei
     * @description 模拟任务的执行时间
     * @createTime  2023/1/18 9:21
     * @param [task]
     * @return void
     */
    private static void doWork(String task) {
        for (char ch : task.toCharArray()) {
            //如果消息中存在.则1秒之后继续,有几个.停止几秒
            if (ch == '.') {
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException _ignored) {
                    Thread.currentThread().interrupt();
                }
            }
        }
    }
}

生产者:


消费者1:

消费者1接收到第七条消息时消费者1被中断。


消费者2:

第七条消息并没有被重新发送给消费者2,第七条消息丢失了。

相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
目录
相关文章
|
2月前
|
消息中间件
RabbitMQ的 RPC 消息模式你会了吗?
【9月更文挑战第11天】RabbitMQ 的 RPC(远程过程调用)消息模式允许客户端向服务器发送请求并接收响应。其基本原理包括:1) 客户端发送请求,创建回调队列并设置关联标识符;2) 服务器接收请求并发送响应至回调队列;3) 客户端根据关联标识符接收并匹配响应。实现步骤涵盖客户端和服务器的连接、信道创建及请求处理。注意事项包括关联标识符唯一性、回调队列管理、错误处理及性能考虑。RPC 模式适用于构建可靠的分布式应用程序,但需根据需求调整优化。
|
27天前
|
消息中间件 存储 监控
RabbitMQ 队列之战:Classic 和 Quorum 的性能洞察
RabbitMQ 是一个功能强大的消息代理,用于分布式应用程序间的通信。它通过队列临时存储消息,支持异步通信和解耦。经典队列适合高吞吐量和低延迟场景,而仲裁队列则提供高可用性和容错能力,适用于关键任务系统。选择哪种队列取决于性能、持久性和容错性的需求。
100 6
|
2月前
|
消息中间件 JSON Java
|
2月前
|
消息中间件
rabbitmq,&队列
rabbitmq,&队列
|
2月前
|
消息中间件 JSON Java
玩转RabbitMQ声明队列交换机、消息转换器
玩转RabbitMQ声明队列交换机、消息转换器
81 0
|
3月前
|
消息中间件 存储 NoSQL
MQ的顺序性保证:顺序队列、消息编号、分布式锁,一文全掌握!
【8月更文挑战第24天】消息队列(MQ)是分布式系统的关键组件,用于实现系统解耦、提升可扩展性和可用性。保证消息顺序性是其重要挑战之一。本文介绍三种常用策略:顺序队列、消息编号与分布式锁,通过示例展示如何确保消息按需排序。这些方法各有优势,可根据实际场景灵活选用。提供的Java示例有助于加深理解与实践应用。
80 2
|
3月前
|
消息中间件 开发者
【RabbitMQ深度解析】Topic交换器与模式匹配:掌握消息路由的艺术!
【8月更文挑战第24天】在消息队列(MQ)体系中,交换器作为核心组件之一负责消息路由。特别是`topic`类型的交换器,它通过模式匹配实现消息的精准分发,适用于发布-订阅模式。不同于直接交换器和扇形交换器,`topic`交换器支持更复杂的路由策略,通过带有通配符(如 * 和 #)的模式字符串来定义队列与交换器间的绑定关系。
65 2
|
3月前
|
消息中间件
RabbitMQ广播模式
RabbitMQ广播模式
55 1
|
3月前
|
消息中间件 应用服务中间件 网络安全
rabbitMQ镜像模式搭建
rabbitMQ镜像模式搭建
|
4月前
|
消息中间件 传感器 负载均衡
消息队列 MQ使用问题之如何配置一主一从的同步复制模式
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
消息队列 MQ使用问题之如何配置一主一从的同步复制模式