RabbitMQ学习(三):工作队列

简介: 工作队列(又称任务队列)的主要思想是避免立即执行资源密集型任务,而不得不等待它完成。相反我们安排任务在之后执行。我们把任务封装为消息并将其发送到队列。在后台运行的工作进程将弹出任务并最终执行作业。当有多个工作线程时,这些工作线程将一起处理这些任务

一、工作队列原理



工作队列(又称任务队列)的主要思想是避免立即执行资源密集型任务,而不得不等待它完成。相反我们安排任务在之后执行。我们把任务封装为消息并将其发送到队列。在后台运行的工作进程将弹出任务并最终执行作业。当有多个工作线程时,这些工作线程将一起处理这些任务


二、案例演示


     

在这个案例中我将启动两个工作线程、一个消息发送线程,看看这两个工作线程是如何工作的。

84a5cfd2b16f46a3932cd18f763e82c1.png

1、抽取工具类


public class RabbitMQUtils {
    public static Channel getChannel() throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("47.242.xx.xx");    //填写自己的ip地址
        factory.setUsername("root");        //填写自己的用户名和密码
        factory.setPassword("root");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        return channel;
    }
}


2、工作线程代码准备


package com.shang.two;
import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import com.shang.utils.RabbitMQUtils;
/**
 * 这是一个工作线程(相当于之前的消费者)
 */
public class Worker01 {
    private static final String QUEUE_NAME="hello";
    public static void main(String[] args) throws Exception {
        Channel channel = RabbitMQUtils.getChannel();
        DeliverCallback deliverCallback=(consumerTag, delivery)->{
            String receivedMessage = new String(delivery.getBody());
            System.out.println("接收到消息:"+receivedMessage);
        };
        CancelCallback cancelCallback=(consumerTag)->{
            System.out.println(consumerTag+"消费者取消消费接口回调逻辑");
        };
        System.out.println("C1 消费者启动等待消费......");
        /**
         * 消费者接受消息 channel.basicConsume()
         * 参数:
         * 1 消费哪个队列
         * 2 消费成功后是否要自动应答
         * 3 消费者未成功消费的回调
         * 4 消费者取录消费的回调
         */
        channel.basicConsume(QUEUE_NAME,true,deliverCallback,cancelCallback);
    }
}


启动之后,开启多线程

6955ec7dfaa94af69b9c906dfe29874d.pngab694d6619544ef895f8b23d74b340f2.png


为了区别两个工作线程,将工作线程的代码改为C2,再次启动

65df153d387442c78bf9f25d6f16c16e.png


此时我们可以看到下方有两个线程在运行,一个是C1,一个是C2:

46cde0b86a284a28b178dbda95cee56e.png


3、消息发送线程代码准备


public class Task01 {
    private static final String QUEUE_NAME="hello";
    public static void main(String[] args) throws Exception {
        try(Channel channel= RabbitMQUtils.getChannel();) {
            /**
             * 生成一个队列
             * 参数:
             * 1 队列名称
             * 2 队列里的消息是否持久化,默认情况消息存储在内存中
             * 3 该队列是否供多个消费者进行消费,是否进行消息共享
             * 4 是否自动删除
             * 5 其他参数
             */
            channel.queueDeclare(QUEUE_NAME,false,false,false,null);
            //从控制台当中接受信息
            Scanner scanner = new Scanner(System.in);
            while (scanner.hasNext()){
                String message = scanner.next();
                channel.basicPublish("",QUEUE_NAME,null,message.getBytes());
                System.out.println("发送消息完成:"+message);
            }
        }
    } }


4、启动消息发送线程,发送消息

     

发送AA

fa7ad0c3bbdc481bb0a73c99812787e5.png


C1接收到

19b565f67ace4556bfbef05757f6736a.png


C2并没有接收到f9f02de99ab2431b9cf3989410e7bc5e.png


继续发送消息BB、CC、DD

ab12537068f9470cb2a8b814327f7fd1.png


可以看到,C1接收到了AA和CC,C2接收到了BB和DD

054cffe899af496a9c97ff41c4e3557c.pnga6945e07cfbb4dbfa2b50c9ed28e693c.png


可以看出,当有大量消息发送出来时,如果有多个工作队列,他们可以共同接收消息,你一条我一条。

相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
3月前
|
消息中间件 存储 数据库
深入学习RocketMQ的底层存储设计原理
文章深入探讨了RocketMQ的底层存储设计原理,分析了其如何通过将数据和索引映射到内存、异步刷新磁盘以及消息内容的混合存储来实现高性能的读写操作,从而保证了RocketMQ作为一款低延迟消息队列的读写性能。
|
27天前
|
消息中间件 存储 监控
RabbitMQ 队列之战:Classic 和 Quorum 的性能洞察
RabbitMQ 是一个功能强大的消息代理,用于分布式应用程序间的通信。它通过队列临时存储消息,支持异步通信和解耦。经典队列适合高吞吐量和低延迟场景,而仲裁队列则提供高可用性和容错能力,适用于关键任务系统。选择哪种队列取决于性能、持久性和容错性的需求。
98 6
|
2月前
|
消息中间件 JSON Java
|
2月前
|
消息中间件
rabbitmq,&队列
rabbitmq,&队列
|
2月前
|
消息中间件 JSON Java
玩转RabbitMQ声明队列交换机、消息转换器
玩转RabbitMQ声明队列交换机、消息转换器
81 0
|
3月前
|
消息中间件 存储 NoSQL
MQ的顺序性保证:顺序队列、消息编号、分布式锁,一文全掌握!
【8月更文挑战第24天】消息队列(MQ)是分布式系统的关键组件,用于实现系统解耦、提升可扩展性和可用性。保证消息顺序性是其重要挑战之一。本文介绍三种常用策略:顺序队列、消息编号与分布式锁,通过示例展示如何确保消息按需排序。这些方法各有优势,可根据实际场景灵活选用。提供的Java示例有助于加深理解与实践应用。
79 2
|
4月前
|
消息中间件 RocketMQ
MetaQ/RocketMQ 原理问题之当消费集群规模较大时,处理分配不到队列的Consumer的问题如何解决
MetaQ/RocketMQ 原理问题之当消费集群规模较大时,处理分配不到队列的Consumer的问题如何解决
|
4月前
|
消息中间件 Java Kafka
说说RabbitMQ延迟队列实现原理?
说说RabbitMQ延迟队列实现原理?
66 0
说说RabbitMQ延迟队列实现原理?
|
3月前
|
消息中间件
快来体验 消息队列RabbitMQ版入门训练营 打卡学习领好礼
快来体验 消息队列RabbitMQ版入门训练营 打卡学习领好礼
61 0
|
4月前
|
消息中间件 NoSQL 关系型数据库
【RocketMQ系列十三】RocketMQ的集群核心概念之消费重试&死信队列&幂等消息的出现以及处理
【RocketMQ系列十三】RocketMQ的集群核心概念之消费重试&死信队列&幂等消息的出现以及处理
121 1