RabbitMQ学习(三):工作队列

简介: 工作队列(又称任务队列)的主要思想是避免立即执行资源密集型任务,而不得不等待它完成。相反我们安排任务在之后执行。我们把任务封装为消息并将其发送到队列。在后台运行的工作进程将弹出任务并最终执行作业。当有多个工作线程时,这些工作线程将一起处理这些任务

一、工作队列原理



工作队列(又称任务队列)的主要思想是避免立即执行资源密集型任务,而不得不等待它完成。相反我们安排任务在之后执行。我们把任务封装为消息并将其发送到队列。在后台运行的工作进程将弹出任务并最终执行作业。当有多个工作线程时,这些工作线程将一起处理这些任务


二、案例演示


     

在这个案例中我将启动两个工作线程、一个消息发送线程,看看这两个工作线程是如何工作的。

84a5cfd2b16f46a3932cd18f763e82c1.png

1、抽取工具类


public class RabbitMQUtils {
    public static Channel getChannel() throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("47.242.xx.xx");    //填写自己的ip地址
        factory.setUsername("root");        //填写自己的用户名和密码
        factory.setPassword("root");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        return channel;
    }
}


2、工作线程代码准备


package com.shang.two;
import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import com.shang.utils.RabbitMQUtils;
/**
 * 这是一个工作线程(相当于之前的消费者)
 */
public class Worker01 {
    private static final String QUEUE_NAME="hello";
    public static void main(String[] args) throws Exception {
        Channel channel = RabbitMQUtils.getChannel();
        DeliverCallback deliverCallback=(consumerTag, delivery)->{
            String receivedMessage = new String(delivery.getBody());
            System.out.println("接收到消息:"+receivedMessage);
        };
        CancelCallback cancelCallback=(consumerTag)->{
            System.out.println(consumerTag+"消费者取消消费接口回调逻辑");
        };
        System.out.println("C1 消费者启动等待消费......");
        /**
         * 消费者接受消息 channel.basicConsume()
         * 参数:
         * 1 消费哪个队列
         * 2 消费成功后是否要自动应答
         * 3 消费者未成功消费的回调
         * 4 消费者取录消费的回调
         */
        channel.basicConsume(QUEUE_NAME,true,deliverCallback,cancelCallback);
    }
}


启动之后,开启多线程

6955ec7dfaa94af69b9c906dfe29874d.pngab694d6619544ef895f8b23d74b340f2.png


为了区别两个工作线程,将工作线程的代码改为C2,再次启动

65df153d387442c78bf9f25d6f16c16e.png


此时我们可以看到下方有两个线程在运行,一个是C1,一个是C2:

46cde0b86a284a28b178dbda95cee56e.png


3、消息发送线程代码准备


public class Task01 {
    private static final String QUEUE_NAME="hello";
    public static void main(String[] args) throws Exception {
        try(Channel channel= RabbitMQUtils.getChannel();) {
            /**
             * 生成一个队列
             * 参数:
             * 1 队列名称
             * 2 队列里的消息是否持久化,默认情况消息存储在内存中
             * 3 该队列是否供多个消费者进行消费,是否进行消息共享
             * 4 是否自动删除
             * 5 其他参数
             */
            channel.queueDeclare(QUEUE_NAME,false,false,false,null);
            //从控制台当中接受信息
            Scanner scanner = new Scanner(System.in);
            while (scanner.hasNext()){
                String message = scanner.next();
                channel.basicPublish("",QUEUE_NAME,null,message.getBytes());
                System.out.println("发送消息完成:"+message);
            }
        }
    } }


4、启动消息发送线程,发送消息

     

发送AA

fa7ad0c3bbdc481bb0a73c99812787e5.png


C1接收到

19b565f67ace4556bfbef05757f6736a.png


C2并没有接收到f9f02de99ab2431b9cf3989410e7bc5e.png


继续发送消息BB、CC、DD

ab12537068f9470cb2a8b814327f7fd1.png


可以看到,C1接收到了AA和CC,C2接收到了BB和DD

054cffe899af496a9c97ff41c4e3557c.pnga6945e07cfbb4dbfa2b50c9ed28e693c.png


可以看出,当有大量消息发送出来时,如果有多个工作队列,他们可以共同接收消息,你一条我一条。

相关实践学习
RocketMQ一站式入门使用
从源码编译、部署broker、部署namesrv,使用java客户端首发消息等一站式入门RocketMQ。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
3月前
|
消息中间件 Java API
RocketMQ事务消息, 图文、源码学习探究~
介绍 RocketMQ是阿里巴巴开源的分布式消息中间件,它是一个高性能、低延迟、可靠的消息队列系统,用于在分布式系统中进行异步通信。 从4.3.0版本开始正式支持分布式事务消息~ RocketMq事务消息支持最终一致性:在普通消息基础上,支持二阶段的提交能力。将二阶段提交和本地事务绑定,实现全局提交结果的一致性。 原理、流程 本质上RocketMq的事务能力是基于二阶段提交来实现的 在消息发送上,将二阶段提交与本地事务绑定 本地事务执行成功,则事务消息成功,可以交由Consumer消费 本地事务执行失败,则事务消息失败,Consumer无法消费 但是,RocketMq只能保证本地事务
|
3月前
|
消息中间件 监控 Java
Spring Boot中的RabbitMQ死信队列魔法:从异常到延迟,一网打尽【RabbitMQ实战 一】
Spring Boot中的RabbitMQ死信队列魔法:从异常到延迟,一网打尽【RabbitMQ实战 一】
93 0
|
2月前
|
消息中间件 存储 NoSQL
RabbitMQ的幂等性、优先级队列和惰性队列
**摘要:** 本文讨论了RabbitMQ中的幂等性、优先级队列和惰性队列。幂等性确保了重复请求不会导致副作用,关键在于消费端的幂等性保障,如使用唯一ID和Redis的原子性操作。优先级队列适用于处理不同重要性消息,如大客户订单优先处理,通过设置`x-max-priority`属性实现。惰性队列自3.6.0版起提供,用于延迟将消息加载到内存,适合大量消息存储和消费者延迟消费的场景。
28 4
|
15天前
|
消息中间件 存储 监控
写了10000字:全面学习RocketMQ中间件
以上是 V 哥在授课时整理的全部 RocketMQ 的内容,在学习时重点要理解其中的含义,正所谓知其然知其所以然,希望这篇文章可以帮助兄弟们搞清楚RocketMQ的来龙去脉,必竟这是一个非常常用的分布式应用的中间件,好了,今天的内容就分享到这,我靠!已经 00:36分,建议收藏起来,慢慢消化,创作不易,喜欢请点赞转发。
|
29天前
|
消息中间件 Java API
RabbitMQ入门指南(五):Java声明队列、交换机以及绑定
RabbitMQ是一个高效、可靠的开源消息队列系统,广泛用于软件开发、数据传输、微服务等领域。本文主要介绍了Java声明队列、交换机以及绑定队列和交换机等内容。
32 0
|
1月前
|
消息中间件 Java Maven
springboot 使用注解的方式创建rabbitmq的交换机、路由key、以及监听队列的名称
springboot 使用注解的方式创建rabbitmq的交换机、路由key、以及监听队列的名称
|
1月前
|
消息中间件 存储 负载均衡
消息队列学习之RabbitMQ
【4月更文挑战第3天】消息队列学习之RabbitMQ,一种基于erlang语言开发的流行的开源消息中间件。
20 0
|
1月前
|
消息中间件 存储 NoSQL
rocketmq实现延迟队列思路探讨
本文介绍了两种实现RocketMQ延迟消息的方法。非任意时间延迟可通过在服务器端配置`messageDelayLevel`实现,但需重启服务。任意时间延迟则分为两种策略:一是结合原生逻辑和时间轮,利用RocketMQ的默认延迟等级组合支持任意延迟,但可能丢失1分钟内的数据;二是使用存储介质(如Redis)加时间轮,消息存储和定时发送结合,能处理数据不一致和丢失问题,但涉及更多组件。推荐项目[civism-rocket](https://github.com/civism/civism-rocket)作为参考。
75 1
|
1月前
|
消息中间件 存储 缓存
消息队列学习之rocketmq
【4月更文挑战第1天】消息队列学习之rocketmq
18 0
|
2月前
|
消息中间件 前端开发 算法
【十七】RabbitMQ基础篇(延迟队列和死信队列实战)
【十七】RabbitMQ基础篇(延迟队列和死信队列实战)
45 1