你了解RabbitMQ吗?我这就教你学会Work-Queue模式下的RabbitMQ。

简介: 教你学会Work-Queue模式下的RabbitMQ
🏇 小肖来了
🍣 今天给大家带来的文章是《消息队列的 Work-Queues 篇》🍣
🍣 这是RabbitMQ的另一种模式🍣
🍣 希望各位小伙伴们能够耐心的读完这篇文章🍣
🙏 博主也在学习阶段,如若发现问题,请告知,非常感谢🙏
💗 同时也非常感谢各位小伙伴们的支持💗

1、Work Queues

在这里插入图片描述

  • 工作队列的主要思想是避免立即执行密集型任务,而不得不等待它完成。我们把任务发送到队列中,后台进程将消息从队列中弹出执行,如果后台有多个工作进程的话,这些工作进程轮询(就是你一条、我一条、他一条)进行处理这些任务

1.1、轮询发送消息

1.1.1、抽取工具类

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class RabbitmqUtil {
    /**
     * 连接工厂获取信道的工具类
     * @return
     * @throws IOException
     * @throws TimeoutException
     */
    public static Channel getChannel() throws IOException, TimeoutException {
        // 创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        // 设置IP地址
        factory.setHost("192.168.123.129");
        // 用户名
        factory.setUsername("admin");
        // 密码
        factory.setPassword("123");


        // 创建连接
        Connection connection = factory.newConnection();
        // 获取信道
        Channel channel = connection.createChannel();

        return channel;
    }
}

1.1.2、工作线程

import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import com.xiao.utils.RabbitmqUtil;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Worker {
    public static final String QUEUE_NAME = "hello";

    public static void main(String[] args) throws IOException, TimeoutException {
        Channel channel = RabbitmqUtil.getChannel();

        DeliverCallback deliverCallback = (var1, var2)->{
            System.out.println(new String(var2.getBody()));
        };

        CancelCallback cancelCallback = var1->{
            System.out.println(var1 + "消费消息被中断了");
        };
        /**
         * 接收消息
         * 1.消费哪个队列
         * 2.消费成功之后是否要自动应答
         * 3.消费者未成功消费的回调
         * 4.消费者取消消费的回调
         */
        channel.basicConsume(QUEUE_NAME,true,deliverCallback,cancelCallback);
    }
}

1.1.3、制造多个工作线程

在这里插入图片描述

  • 我们通过更改IDEA的配置进行启动多个工作线程。

1.1.4、生产者

import com.rabbitmq.client.Channel;
import com.xiao.utils.RabbitmqUtil;

import java.io.IOException;
import java.util.Scanner;
import java.util.concurrent.TimeoutException;

public class Task01 {
    public static final String QUEUE_NAME = "hello";

    public static void main(String[] args) throws IOException, TimeoutException {
        Channel channel = RabbitmqUtil.getChannel();
        /**
         * 声明队列
         * 1.队列名称
         * 2.队列里面的消息是否持久化(磁盘)默认情况下消息存储在内存中
         * 3.该队列是否提供一个消费者进行消费,就是是否进行消息共享
         * 4.就是当最后一个消费者断开连接之后,该队列是否自动删除消息
         * 5.其他参数
         */
        channel.queueDeclare(QUEUE_NAME,false,false,false,null);

        Scanner sc = new Scanner(System.in);
        while(sc.hasNext()){
            String message = sc.next();

            /**
             * 发送一个消息
             * 1.发送到哪个交换机
             * 2.路由的Key值,也就是本次队列的名称
             * 3.其他参数信息
             * 4.发送消息的消息内容
             */
            channel.basicPublish("",QUEUE_NAME,null,message.getBytes());
            System.out.println("发送消息成功:" + message);
        }
    }
}

1.1.5、最终测试

1. 生产者发送消息

AA
发送消息成功:AA
BB
发送消息成功:BB
CC
发送消息成功:CC
DD
发送消息成功:DD

2. 测试结果

W1的控制台输出

在这里插入图片描述

W2的控制台输出

在这里插入图片描述

1.2、消息应答

1.2.1、概念

  • 可能出现的问题:如果消费者完成一段任务需要一段时间,如果其中一个消费者处理一个很长的任务并只是完成了一部分就挂掉了,那么我们将丢失一部分消息。
  • 为了保证消息在发送过程中不丢失,rabbitmq引入了消息应答机制。
  • 消息应答就是:消费者在接收到消息并且处理该消息之后,告诉rabbitmq它已经处理了,rabbitmq可以把消息删除了。

1.2.2、自动应答

消息发送后立即被认为已经发送成功,这种模式需要在高吞吐量和数据传输安全性方面做权衡。这种模式仅适合于在消费者可以高效并以某种速率能够处理这些消息的情况下使用

1.2.3、手动应答的方法

// 用于肯定确认,RabbitMQ已知道消息并且成功的处理消息,可以将其丢弃了。
Channel.basicAck()

// 用于否定确认
Channel.basicNack()
Channel.basicReject()

// 后者比前者多一个参数,不处理消息了直接拒绝,可以将其对其

1.2.4、Multiple的解释

在这里插入图片描述

  • Multiple = true 表示可以批量应答,就好像到8了,可以对5,6,7都进行应答。

在这里插入图片描述

  • Multiple = true 表示不可以批量应答,就好像到8了,只可以对8进行应答,不可以对5,6,7都进行应答。

1.2.5、消息自动重新入队

在这里插入图片描述

  • 如果消费者由于某些原因失去连接(其通道已关闭,连接已关闭或TCP连接丢失),导致消息未发送ACK确认,RabbitMQ将了解到消息未完全处理,并将其重新入对。如果此时其他消费者可以处理,它将很快重新分发给另一个消费者。这样,即使某个消费者偶尔死亡,也可以确保不会丢失任何消息。
  • 如果所示,C1收到消息1之后为发送ack就断开链接了,其消息1会重新入队,并被C2给处理。

1.2.6、生产者代码

import com.rabbitmq.client.Channel;
import com.xiao.utils.RabbitmqUtil;

import java.io.IOException;
import java.util.Scanner;
import java.util.concurrent.TimeoutException;

public class Task2 {

    public static final String TASK_QUEUE_NAME = "ack_queue";

    public static void main(String[] args) throws IOException, TimeoutException {
        Channel channel = RabbitmqUtil.getChannel();

        channel.queueDeclare(TASK_QUEUE_NAME,false,false,false,null);

        Scanner sc = new Scanner(System.in);
        while(sc.hasNext()){
            String next = sc.next();
            channel.basicPublish("",TASK_QUEUE_NAME,null,next.getBytes("UTF-8"));
            System.out.println("成功发送的消息是:" + next);
        }
    }
}

1.2.7、消费者代码

1. 工作线程1

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import com.xiao.utils.RabbitmqUtil;
import com.xiao.utils.SleepUtils;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Work03 {
    public static final String TASK_QUEUE_NAME = "ack_queue";

    public static void main(String[] args) throws IOException, TimeoutException {
        Channel channel = RabbitmqUtil.getChannel();
        System.out.println("C1正准备接受消息处理时间较短");
        
        DeliverCallback deliverCallback = (var1,var2)->{
            SleepUtils.sleep(1);
            System.out.println("接收到的消息是:" + new String(var2.getBody(),"UTF-8"));
            /**
             * 1. 消息的标签
             * 2. 是否进行批量应答
             */
            channel.basicAck(var2.getEnvelope().getDeliveryTag(),false);
        };

        channel.basicConsume(TASK_QUEUE_NAME,false,deliverCallback,(var1->{
            System.out.println("消息处理失败进行函数回调");
        }));
    }
}

2. 工作线程2

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import com.xiao.utils.RabbitmqUtil;
import com.xiao.utils.SleepUtils;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Work04 {
    public static final String TASK_QUEUE_NAME = "ack_queue";

    public static void main(String[] args) throws IOException, TimeoutException {
        Channel channel = RabbitmqUtil.getChannel();
        System.out.println("C2正准备接受消息处理时间较长");
        
        DeliverCallback deliverCallback = (var1,var2)->{
            SleepUtils.sleep(30);
            System.out.println("接收到的消息是:" + new String(var2.getBody(),"UTF-8"));
            /**
             * 1. 消息的标签
             * 2. 是否进行批量应答
             */
            channel.basicAck(var2.getEnvelope().getDeliveryTag(),false);
        };

        channel.basicConsume(TASK_QUEUE_NAME,false,deliverCallback,(var1->{
            System.out.println("消息处理失败进行函数回调");
        }));
    }
}

3、工具类


public class SleepUtils {

    public static void sleep(int seconds){
        try {
            seconds *= 1000;
            Thread.sleep(seconds);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }

}

1.2.8、效果演示

  • 生产者分别发送消息:aa、bb、cc、dd
  • 当工作进程2接受dd消息的时候宕机了,那么消息不会丢失,而是转发给工作进程1
  1. 生产者

在这里插入图片描述

  1. 工作进程1

在这里插入图片描述

  1. 工作进程2

在这里插入图片描述

1.3、持久化

1.3.1、概念

  • 我们如何保障当RabbitMQ服务停止之后消息生产者发送过来的消息不丢失。确保消息不会丢失需要做两件事情:我们需要将队列和消息都标记为持久化

1.3.2、队列如何实现持久化

  • 在我们没有将队列设置为持久化状态的时候,如果rabbitmq重启的话,该队列就会被删除,如果队列要实现持久化,需要将声明队列的地方设置为持久化
boolean durable = true;
channel.queueDeclare(TASK_QUEUE_NAME,durable,false,false,null);
  • 但是需要注意的是如果之前的队列不是持久化的,需要把原先的队列先删除,或者重新创建一个持久化的队列,不然就会出现错误

在这里插入图片描述

重启之后

在这里插入图片描述

1.3.3、消息如何进行持久化

  • 要想让消息实现持久化需要将消息生产者代码修改,如下所示。
channel.basicPublish("",TASK_QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN,next.getBytes("UTF-8"));
  • 将消息标记为持久化并不能完全保证不会丢失消息尽管这里它告诉RabbitMQ将消息保存在磁盘,但是这里依然存在当消息刚准备存储在磁盘的时候,但是还没有存储完,消息还在缓存的一个间隔点。此时并没有真正写入磁盘,持久性保证并不强

1.3.4、不公平分发

  • RabbitMQ分发消息默认采用轮训分发机制。但是如果我们的工作进程对任务的处理速度不一样的话,那么还是采用轮训分发的方式将会影响机器的运作效率。所以我们需要采用不公平分发来实现能者多劳(就是处理消息快的处理的消息更多),从而达到机器运行效率的提高
  • 在消费者消费消息之前加入以下代码
channel.basicQos(1);

在这里插入图片描述

1.3.5、预取值

  • 这个值是在工作进程里面的代码进行设置的,channel.basicQos()中传入的参数就是我们设置的预取值。如果预取值是5,表示的是队列里面最多可以堆积5条消息。
  • 如果另一个工作进程将手上的消息处理完毕,预取值是5的工作进程队列中的消息不会被拿出来再给空闲的工作进程处理
相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
4月前
|
消息中间件
RabbitMQ的 RPC 消息模式你会了吗?
【9月更文挑战第11天】RabbitMQ 的 RPC(远程过程调用)消息模式允许客户端向服务器发送请求并接收响应。其基本原理包括:1) 客户端发送请求,创建回调队列并设置关联标识符;2) 服务器接收请求并发送响应至回调队列;3) 客户端根据关联标识符接收并匹配响应。实现步骤涵盖客户端和服务器的连接、信道创建及请求处理。注意事项包括关联标识符唯一性、回调队列管理、错误处理及性能考虑。RPC 模式适用于构建可靠的分布式应用程序,但需根据需求调整优化。
|
11天前
|
消息中间件 网络协议 RocketMQ
RocketMQ Controller 模式 始终更新成本机ip
ontrollerAddr=192.168.24.241:8878 但是日志输出Update controller leader address to 127.0.0.1:8878。导致访问失败
40 3
|
5月前
|
消息中间件 开发者
【RabbitMQ深度解析】Topic交换器与模式匹配:掌握消息路由的艺术!
【8月更文挑战第24天】在消息队列(MQ)体系中,交换器作为核心组件之一负责消息路由。特别是`topic`类型的交换器,它通过模式匹配实现消息的精准分发,适用于发布-订阅模式。不同于直接交换器和扇形交换器,`topic`交换器支持更复杂的路由策略,通过带有通配符(如 * 和 #)的模式字符串来定义队列与交换器间的绑定关系。
88 2
|
5月前
|
消息中间件
RabbitMQ广播模式
RabbitMQ广播模式
88 1
|
5月前
|
消息中间件 应用服务中间件 网络安全
rabbitMQ镜像模式搭建
rabbitMQ镜像模式搭建
|
6月前
|
消息中间件 传感器 负载均衡
消息队列 MQ使用问题之如何配置一主一从的同步复制模式
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
消息队列 MQ使用问题之如何配置一主一从的同步复制模式
|
6月前
|
消息中间件 存储 Kafka
MetaQ/RocketMQ 原理问题之RocketMQ DLedger融合模式的问题如何解决
MetaQ/RocketMQ 原理问题之RocketMQ DLedger融合模式的问题如何解决
|
5月前
|
消息中间件 Java Maven
RabbitMQ通配符模式
RabbitMQ通配符模式
78 0
|
6月前
|
消息中间件 Java Apache
消息队列 MQ使用问题之如何在内外网环境下使用单组节点单副本模式
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
|
6月前
|
消息中间件 负载均衡 RocketMQ
MetaQ/RocketMQ 原理问题之在广播模式下,RebalanceService工作的问题如何解决
MetaQ/RocketMQ 原理问题之在广播模式下,RebalanceService工作的问题如何解决