三、RabbitMQ Work Queues

本文涉及的产品
容器服务 Serverless 版 ACK Serverless,317元额度 多规格
容器服务 Serverless 版 ACK Serverless,952元额度 多规格
简介: 三、RabbitMQ Work Queues

工作队列(又称任务队列) 的主要思想是避免立即执行资源密集型任务,而不得不等待它完成。 相反我们安排任务在之后执行。我们把任务封装为消息并将其发送到队列。在后台运行的工作进程将弹出任务并最终执行作业。当有多个工作线程时,这些工作线程将一起处理这些任务。


1. 轮训分发消息


启动两个工作线程,一个消息发送线程,我们来看看他们两个工作线程是如何工作的。


抽取工具类:获取信道


public class RabbitMqUtils {
    //得到一个连接的 channel
    public static Channel getChannel() throws Exception{
        //创建一个连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("182.92.234.00");
        factory.setUsername("admin");
        factory.setPassword("pwd...");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        return channel;
    }
}


编写两个线程



public class Worker01 {
    private static final String QUEUE_NAME="hello";
    public static void main(String[] args) throws Exception {
        Channel channel = RabbitMqUtils.getChannel();
        DeliverCallback deliverCallback=(consumerTag,delivery)->{
        String receivedMessage = new String(delivery.getBody());
        System.out.println("接收到消息:"+receivedMessage);
        };
        CancelCallback cancelCallback=(consumerTag)->{
        System.out.println(consumerTag+"消费者取消消费接口回调逻辑");
        };
        System.out.println("C1 消费者启动等待消费......");
        channel.basicConsume(QUEUE_NAME,true,deliverCallback,cancelCallback);
    }
}
public class Worker02 {
    private static final String QUEUE_NAME="hello";
    public static void main(String[] args) throws Exception {
        Channel channel = RabbitMqUtils.getChannel();
        DeliverCallback deliverCallback=(consumerTag,delivery)->{
        String receivedMessage = new String(delivery.getBody());
        System.out.println("接收到消息:"+receivedMessage);
        };
        CancelCallback cancelCallback=(consumerTag)->{
        System.out.println(consumerTag+"消费者取消消费接口回调逻辑");
        };
        System.out.println("C2 消费者启动等待消费......");
        channel.basicConsume(QUEUE_NAME,true,deliverCallback,cancelCallback);
    }
}


启动起来。


编写一个生产者线程


public class Task01 {
private static final String QUEUE_NAME="hello";
    public static void main(String[] args) throws Exception {
        try(Channel channel=RabbitMqUtils.getChannel();) {
        channel.queueDeclare(QUEUE_NAME,false,false,false,null);
        //从控制台当中接受信息
        Scanner scanner = new Scanner(System.in);
        while (scanner.hasNext()){
        String message = scanner.next();
        channel.basicPublish("",QUEUE_NAME,null,message.getBytes());
        System.out.println("发送消息完成:"+message);
        }
        }
    }
}


将 生产者线程也启动起来。会看到 两个消费者在轮训的工作。


2. 消息应答


2.1 概念


消费者完成一个任务可能需要一段时间,如果其中一个消费者处理一个长的任务并仅只完成了部分突然它挂掉了,会发生什么情况。


RabbitMQ 一旦向消费者传递了一条消息,便立即将该消息标记为删除。在这种情况下,突然有个消费者挂掉了,我们将丢失正在处理的消息。以及后续发送给该消费者的消息,因为它无法接收到。 为了保证消息在发送过程中不丢失,RabbitMQ 引入消息应答机制。


消息应答就是: 消费者在接收到消息并且处理该消息之后,告诉 RabbitMQ 它已经处理了,RabbitMQ 可以把该消息删除了。


2.2 自动应答


消息发送后立即被认为已经传送成功,这种模式需要在高吞吐量和数据传输安全性方面做权衡,因为这种模式如果消息在接收到之前,消费者那边出现连接或者 channel 关闭,那么消息就丢失了,当然另一方面这种模式消费者那边可以传递过载的消息,没有对传递的消息数量进行限制, 当然这样有可能使得消费者这边由于接收太多还来不及处理的消息,导致这些消息的积压,最终使得内存耗尽,最终这些消费者线程被操作系统杀死,所以这种模式仅适用在消费者可以高效并以某种速率能够处理这些消息的情况下使用。


2.3 手动应答


  • 肯定确认


A.Channel.basicAck


RabbitMQ 已知道该消息并且成功的处理消息,可以将其丢弃了


  • 否定确认


B.Channel.basicNack


  • 否定确认


C.Channel.basicReject


与 Channel.basicNack 相比少一个参数(批处理标识)


不处理该消息了直接拒绝,可以将其丢弃了


2.4 Multiple 的解释


手动应答的好处是可以批量应答并且减少网络拥堵



multiple 的 true 和 false 代表不同意思:


  • true 代表批量应答 channel 上未应答的消息 比如说 channel 上有传送 tag 的消息 5,6,7,8 当前 tag 是 8 那么此时 5-8 的这些还未应答的消息都会被确认收到消息应答


  • false 同上面相比 只会应答 tag=8 的消息 5,6,7 这三个消息依然不会被确认收到消息应答



2.5 消息自动重新入队


如果消费者由于某些原因失去连接(其通道已关闭,连接已关闭或 TCP 连接丢失),导致消息 未发送 ACK 确认,RabbitMQ 将了解到消息未完全处理,并将对其重新排队。如果此时其他消费者可以处理,它将很快将其重新分发给另一个消费者。这样,即使某个消费者偶尔死亡,也可以确保不会丢失任何消息。



2.6 消息手动应答代码


默认消息采用的是自动应答,所以我们要想实现消息消费过程中不丢失,需要把自动应答改为手动应答,消费者在上面代码的基础上增加下面画红色部分代码。



消息生产者


public class Task02 {
  private static final String TASK_QUEUE_NAME = "ack_queue";
  public static void main(String[] argv) throws Exception {
    try (Channel channel = RabbitMqUtils.getChannel()) {
    //  参数:队列名称 是否持久化 是否共享 是否自动删除 其他参数
    channel.queueDeclare(TASK_QUEUE_NAME, false, false, false, null);
    Scanner sc = new Scanner(System.in);
    System.out.println("请输入信息");
    while (sc.hasNext()) {
    String message = sc.nextLine();
    channel.basicPublish("", TASK_QUEUE_NAME, null, message.getBytes("UTF-8"));
    System.out.println("生产者发出消息" + message);
    }
    }
  }
}


消费者01


public class Work03 {
 private static final String ACK_QUEUE_NAME="ack_queue";
 public static void main(String[] args) throws Exception {
   Channel channel = RabbitMqUtils.getChannel();
   System.out.println("C1 等待接收消息处理时间较短");
   //消息消费的时候如何处理消息
   DeliverCallback deliverCallback=(consumerTag,delivery)->{
   String message= new String(delivery.getBody());
   SleepUtils.sleep(1);
   System.out.println("接收到消息:"+message);
   /**
   * 1.消息标记 tag
   * 2.是否批量应答未应答消息
   */
   channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
   };
   //采用手动应答
   boolean autoAck=false;
   channel.basicConsume(ACK_QUEUE_NAME,autoAck,deliverCallback,(consumerTag)->{
   System.out.println(consumerTag+"消费者取消消费接口回调逻辑");
   });
 }
}


消费者02


public class Work04 {
 private static final String ACK_QUEUE_NAME="ack_queue";
 public static void main(String[] args) throws Exception {
 Channel channel = RabbitMqUtils.getChannel();
 System.out.println("C2 等待接收消息处理时间较长");
 //消息消费的时候如何处理消息
 DeliverCallback deliverCallback=(consumerTag,delivery)->{
 String message= new String(delivery.getBody());
 SleepUtils.sleep(30);
 System.out.println("接收到消息:"+message);
 /**
 * 1.消息标记 tag
 * 2.是否批量应答未应答消息
 */
 channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);
 };
 //采用手动应答
 boolean autoAck=false;
 channel.basicConsume(ACK_QUEUE_NAME,autoAck,deliverCallback,(consumerTag)->{
 System.out.println(consumerTag+"消费者取消消费接口回调逻辑");
 });
 }
}


睡眠工具类


public class SleepUtils {
  public static void sleep(int second){
    try {
    Thread.sleep(1000*second);
    } catch (InterruptedException _ignored) {
      Thread.currentThread().interrupt();
    }
  }
}


2.7 手动应答效果


正常情况下消息发送方发送两个消息 C1 和 C2 分别接收到消息并进行处理



在发送者发送消息 dd,发出消息之后的把 C2 消费者停掉,按理说该 C2 来处理该消息,但是 由于它处理时间较长,在还未处理完,也就是说 C2 还没有执行 ack 代码的时候,C2 被停掉了, 此时会看到消息被 C1 接收到了,说明消息 dd 被重新入队,然后分配给能处理消息的 C1 处理了



相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
消息中间件 网络协议 调度
【RabbitMQ三】——RabbitMQ工作队列模式(Work Queues)(上)
【RabbitMQ三】——RabbitMQ工作队列模式(Work Queues)(上)
71 1
|
消息中间件 调度
【RabbitMQ三】——RabbitMQ工作队列模式(Work Queues)(下)
【RabbitMQ三】——RabbitMQ工作队列模式(Work Queues)(下)
126 1
|
消息中间件 网络协议
RabbitMQ从入门到进阶(Work Queues)
RabbitMQ从入门到进阶(Work Queues)
125 0
|
3月前
|
消息中间件 C语言 RocketMQ
消息队列 MQ操作报错合集之出现"Connection reset by peer"的错误,该如何处理
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
|
3月前
|
消息中间件 Java C语言
消息队列 MQ使用问题之在使用C++客户端和GBase的ESQL进行编译时出现core dump,该怎么办
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
|
12天前
|
消息中间件 运维 监控
云消息队列RabbitMQ实践解决方案评测报告
本报告旨在对《云消息队列RabbitMQ实践》解决方案进行综合评测。通过对该方案的原理理解、部署体验、设计验证以及实际应用价值等方面进行全面分析,为用户提供详尽的反馈与建议。
44 15
|
11天前
|
消息中间件 弹性计算 运维
阿里云云消息队列RabbitMQ实践解决方案评测报告
阿里云云消息队列RabbitMQ实践解决方案评测报告
39 9
|
7天前
|
消息中间件 监控 数据处理
解决方案 | 云消息队列RabbitMQ实践
解决方案 | 云消息队列RabbitMQ实践
17 1
|
8天前
|
消息中间件 弹性计算 运维
云消息队列RabbitMQ实践
本评测报告详细分析了阿里云云消息队列 RabbitMQ 版的实践原理、部署体验及核心优势。报告认为其在解决消息积压、脑裂难题及弹性伸缩方面表现优秀,但建议进一步细化架构优化策略和技术细节描述。部署文档详尽,对初学者友好,但仍需加强网络配置和版本兼容性说明。实际部署展示了其高可用性和成本优化能力,适用于高并发消息处理和分布式系统数据同步。为进一步提升方案,建议增加安全性配置指导、性能调优建议及监控告警系统设置。
|
21天前
|
消息中间件
手撸MQ消息队列——循环数组
队列是一种常用的数据结构,类似于栈,但采用先进先出(FIFO)的原则。生活中常见的排队场景就是队列的应用实例。在数据结构中,队列通常用数组实现,包括入队(队尾插入元素)和出队(队头移除元素)两种基本操作。本文介绍了如何用数组实现队列,包括定义数组长度、维护队头和队尾下标(front 和 tail),并通过取模运算解决下标越界问题。此外,还讨论了队列的空与满状态判断,以及并发和等待机制的实现。通过示例代码展示了队列的基本操作及优化方法,确保多线程环境下的正确性和高效性。
24 0
手撸MQ消息队列——循环数组
下一篇
无影云桌面