三、RabbitMQ Work Queues

简介: 三、RabbitMQ Work Queues

工作队列(又称任务队列) 的主要思想是避免立即执行资源密集型任务,而不得不等待它完成。 相反我们安排任务在之后执行。我们把任务封装为消息并将其发送到队列。在后台运行的工作进程将弹出任务并最终执行作业。当有多个工作线程时,这些工作线程将一起处理这些任务。


1. 轮训分发消息


启动两个工作线程,一个消息发送线程,我们来看看他们两个工作线程是如何工作的。


抽取工具类:获取信道


public class RabbitMqUtils {
    //得到一个连接的 channel
    public static Channel getChannel() throws Exception{
        //创建一个连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("182.92.234.00");
        factory.setUsername("admin");
        factory.setPassword("pwd...");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        return channel;
    }
}


编写两个线程



public class Worker01 {
    private static final String QUEUE_NAME="hello";
    public static void main(String[] args) throws Exception {
        Channel channel = RabbitMqUtils.getChannel();
        DeliverCallback deliverCallback=(consumerTag,delivery)->{
        String receivedMessage = new String(delivery.getBody());
        System.out.println("接收到消息:"+receivedMessage);
        };
        CancelCallback cancelCallback=(consumerTag)->{
        System.out.println(consumerTag+"消费者取消消费接口回调逻辑");
        };
        System.out.println("C1 消费者启动等待消费......");
        channel.basicConsume(QUEUE_NAME,true,deliverCallback,cancelCallback);
    }
}
public class Worker02 {
    private static final String QUEUE_NAME="hello";
    public static void main(String[] args) throws Exception {
        Channel channel = RabbitMqUtils.getChannel();
        DeliverCallback deliverCallback=(consumerTag,delivery)->{
        String receivedMessage = new String(delivery.getBody());
        System.out.println("接收到消息:"+receivedMessage);
        };
        CancelCallback cancelCallback=(consumerTag)->{
        System.out.println(consumerTag+"消费者取消消费接口回调逻辑");
        };
        System.out.println("C2 消费者启动等待消费......");
        channel.basicConsume(QUEUE_NAME,true,deliverCallback,cancelCallback);
    }
}


启动起来。


编写一个生产者线程


public class Task01 {
private static final String QUEUE_NAME="hello";
    public static void main(String[] args) throws Exception {
        try(Channel channel=RabbitMqUtils.getChannel();) {
        channel.queueDeclare(QUEUE_NAME,false,false,false,null);
        //从控制台当中接受信息
        Scanner scanner = new Scanner(System.in);
        while (scanner.hasNext()){
        String message = scanner.next();
        channel.basicPublish("",QUEUE_NAME,null,message.getBytes());
        System.out.println("发送消息完成:"+message);
        }
        }
    }
}


将 生产者线程也启动起来。会看到 两个消费者在轮训的工作。


2. 消息应答


2.1 概念


消费者完成一个任务可能需要一段时间,如果其中一个消费者处理一个长的任务并仅只完成了部分突然它挂掉了,会发生什么情况。


RabbitMQ 一旦向消费者传递了一条消息,便立即将该消息标记为删除。在这种情况下,突然有个消费者挂掉了,我们将丢失正在处理的消息。以及后续发送给该消费者的消息,因为它无法接收到。 为了保证消息在发送过程中不丢失,RabbitMQ 引入消息应答机制。


消息应答就是: 消费者在接收到消息并且处理该消息之后,告诉 RabbitMQ 它已经处理了,RabbitMQ 可以把该消息删除了。


2.2 自动应答


消息发送后立即被认为已经传送成功,这种模式需要在高吞吐量和数据传输安全性方面做权衡,因为这种模式如果消息在接收到之前,消费者那边出现连接或者 channel 关闭,那么消息就丢失了,当然另一方面这种模式消费者那边可以传递过载的消息,没有对传递的消息数量进行限制, 当然这样有可能使得消费者这边由于接收太多还来不及处理的消息,导致这些消息的积压,最终使得内存耗尽,最终这些消费者线程被操作系统杀死,所以这种模式仅适用在消费者可以高效并以某种速率能够处理这些消息的情况下使用。


2.3 手动应答


  • 肯定确认


A.Channel.basicAck


RabbitMQ 已知道该消息并且成功的处理消息,可以将其丢弃了


  • 否定确认


B.Channel.basicNack


  • 否定确认


C.Channel.basicReject


与 Channel.basicNack 相比少一个参数(批处理标识)


不处理该消息了直接拒绝,可以将其丢弃了


2.4 Multiple 的解释


手动应答的好处是可以批量应答并且减少网络拥堵



multiple 的 true 和 false 代表不同意思:


  • true 代表批量应答 channel 上未应答的消息 比如说 channel 上有传送 tag 的消息 5,6,7,8 当前 tag 是 8 那么此时 5-8 的这些还未应答的消息都会被确认收到消息应答


  • false 同上面相比 只会应答 tag=8 的消息 5,6,7 这三个消息依然不会被确认收到消息应答



2.5 消息自动重新入队


如果消费者由于某些原因失去连接(其通道已关闭,连接已关闭或 TCP 连接丢失),导致消息 未发送 ACK 确认,RabbitMQ 将了解到消息未完全处理,并将对其重新排队。如果此时其他消费者可以处理,它将很快将其重新分发给另一个消费者。这样,即使某个消费者偶尔死亡,也可以确保不会丢失任何消息。



2.6 消息手动应答代码


默认消息采用的是自动应答,所以我们要想实现消息消费过程中不丢失,需要把自动应答改为手动应答,消费者在上面代码的基础上增加下面画红色部分代码。



消息生产者


public class Task02 {
  private static final String TASK_QUEUE_NAME = "ack_queue";
  public static void main(String[] argv) throws Exception {
    try (Channel channel = RabbitMqUtils.getChannel()) {
    //  参数:队列名称 是否持久化 是否共享 是否自动删除 其他参数
    channel.queueDeclare(TASK_QUEUE_NAME, false, false, false, null);
    Scanner sc = new Scanner(System.in);
    System.out.println("请输入信息");
    while (sc.hasNext()) {
    String message = sc.nextLine();
    channel.basicPublish("", TASK_QUEUE_NAME, null, message.getBytes("UTF-8"));
    System.out.println("生产者发出消息" + message);
    }
    }
  }
}


消费者01


public class Work03 {
 private static final String ACK_QUEUE_NAME="ack_queue";
 public static void main(String[] args) throws Exception {
   Channel channel = RabbitMqUtils.getChannel();
   System.out.println("C1 等待接收消息处理时间较短");
   //消息消费的时候如何处理消息
   DeliverCallback deliverCallback=(consumerTag,delivery)->{
   String message= new String(delivery.getBody());
   SleepUtils.sleep(1);
   System.out.println("接收到消息:"+message);
   /**
   * 1.消息标记 tag
   * 2.是否批量应答未应答消息
   */
   channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
   };
   //采用手动应答
   boolean autoAck=false;
   channel.basicConsume(ACK_QUEUE_NAME,autoAck,deliverCallback,(consumerTag)->{
   System.out.println(consumerTag+"消费者取消消费接口回调逻辑");
   });
 }
}


消费者02


public class Work04 {
 private static final String ACK_QUEUE_NAME="ack_queue";
 public static void main(String[] args) throws Exception {
 Channel channel = RabbitMqUtils.getChannel();
 System.out.println("C2 等待接收消息处理时间较长");
 //消息消费的时候如何处理消息
 DeliverCallback deliverCallback=(consumerTag,delivery)->{
 String message= new String(delivery.getBody());
 SleepUtils.sleep(30);
 System.out.println("接收到消息:"+message);
 /**
 * 1.消息标记 tag
 * 2.是否批量应答未应答消息
 */
 channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);
 };
 //采用手动应答
 boolean autoAck=false;
 channel.basicConsume(ACK_QUEUE_NAME,autoAck,deliverCallback,(consumerTag)->{
 System.out.println(consumerTag+"消费者取消消费接口回调逻辑");
 });
 }
}


睡眠工具类


public class SleepUtils {
  public static void sleep(int second){
    try {
    Thread.sleep(1000*second);
    } catch (InterruptedException _ignored) {
      Thread.currentThread().interrupt();
    }
  }
}


2.7 手动应答效果


正常情况下消息发送方发送两个消息 C1 和 C2 分别接收到消息并进行处理



在发送者发送消息 dd,发出消息之后的把 C2 消费者停掉,按理说该 C2 来处理该消息,但是 由于它处理时间较长,在还未处理完,也就是说 C2 还没有执行 ack 代码的时候,C2 被停掉了, 此时会看到消息被 C1 接收到了,说明消息 dd 被重新入队,然后分配给能处理消息的 C1 处理了



相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
消息中间件 网络协议 调度
【RabbitMQ三】——RabbitMQ工作队列模式(Work Queues)(上)
【RabbitMQ三】——RabbitMQ工作队列模式(Work Queues)(上)
89 1
|
消息中间件 调度
【RabbitMQ三】——RabbitMQ工作队列模式(Work Queues)(下)
【RabbitMQ三】——RabbitMQ工作队列模式(Work Queues)(下)
141 1
|
消息中间件 网络协议
RabbitMQ从入门到进阶(Work Queues)
RabbitMQ从入门到进阶(Work Queues)
138 0
|
6月前
|
消息中间件 C语言 RocketMQ
消息队列 MQ操作报错合集之出现"Connection reset by peer"的错误,该如何处理
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
|
3月前
|
消息中间件 JSON Java
开发者如何使用轻量消息队列MNS
【10月更文挑战第19天】开发者如何使用轻量消息队列MNS
168 6
|
3月前
|
消息中间件 安全 Java
云消息队列RabbitMQ实践解决方案评测
一文带你详细了解云消息队列RabbitMQ实践的解决方案优与劣
110 10
|
2月前
|
消息中间件 存储 Kafka
MQ 消息队列核心原理,12 条最全面总结!
本文总结了消息队列的12个核心原理,涵盖消息顺序性、ACK机制、持久化及高可用性等内容。关注【mikechen的互联网架构】,10年+BAT架构经验倾囊相授。
|
3月前
|
消息中间件
解决方案 | 云消息队列RabbitMQ实践获奖名单公布!
云消息队列RabbitMQ实践获奖名单公布!
|
3月前
|
消息中间件 存储 弹性计算
云消息队列RabbitMQ实践
云消息队列RabbitMQ实践
|
3月前
|
消息中间件 存储 监控
解决方案 | 云消息队列RabbitMQ实践
在实际业务中,网站因消息堆积和高流量脉冲导致系统故障。为解决这些问题,云消息队列 RabbitMQ 版提供高性能的消息处理和海量消息堆积能力,确保系统在流量高峰时仍能稳定运行。迁移前需进行技术能力和成本效益评估,包括功能、性能、限制值及费用等方面。迁移步骤包括元数据迁移、创建用户、网络打通和数据迁移。
80 4