消费者1
package com.atguigu.rabbitmq.three; import com.atguigu.rabbitmq.utils.RabbitMqutils; import com.atguigu.rabbitmq.utils.SleepUtils; import com.rabbitmq.client.CancelCallback; import com.rabbitmq.client.Channel; import com.rabbitmq.client.DeliverCallback; public class Work03 { //队列的名称 public static final String TASK_QUEUE_NAME="闫文超ack"; //接受消息 public static void main(String[] args)throws Exception { Channel channel = RabbitMqutils.getChannel(); System.out.println("C1等待接受消息出来等待时间较短"); DeliverCallback deliverCallback=(var1, var2)->{ //沉睡1s SleepUtils.sleep(1); System.out.println("接受到的消息"+new String(var2.getBody())); //手动应答 /** * 1.消息的标记 tag * 2.是否批量应答 fasle不批量应答信道中的消息 true批量 */ channel.basicAck(var2.getEnvelope().getDeliveryTag(),false); }; CancelCallback cancelCallback= var1->{ System.out.println(var1+"消费者取消的消费接口回调"); }; //采用手动应答 boolean autAck=false;//不是自动的 channel.basicConsume(TASK_QUEUE_NAME,autAck,deliverCallback,cancelCallback); } }
消费者2
package com.atguigu.rabbitmq.three; import com.atguigu.rabbitmq.utils.RabbitMqutils; import com.atguigu.rabbitmq.utils.SleepUtils; import com.rabbitmq.client.CancelCallback; import com.rabbitmq.client.Channel; import com.rabbitmq.client.DeliverCallback; public class Work04 { //队列的名称 public static final String TASK_QUEUE_NAME="闫文超ack"; //接受消息 public static void main(String[] args)throws Exception { Channel channel = RabbitMqutils.getChannel(); System.out.println("C2等待接受消息出来等待时间较长"); DeliverCallback deliverCallback=(var1, var2)->{ //沉睡1s SleepUtils.sleep(30); System.out.println("接受到的消息"+new String(var2.getBody())); //手动应答 /** * 1.消息的标记 tag * 2.是否批量应答 fasle不批量应答信道中的消息 true批量 */ channel.basicAck(var2.getEnvelope().getDeliveryTag(),false); }; CancelCallback cancelCallback= var1->{ System.out.println(var1+"消费者取消的消费接口回调"); }; //采用手动应答 boolean autAck=false;//不是自动的 channel.basicConsume(TASK_QUEUE_NAME,autAck,deliverCallback,cancelCallback); } }
手动应答但消费者2 死机的时候消息回到消费者1里面去处理消息 防止消息的丢失
3.2.7. 手动应答效果演示
正常情况下消息发送方发送两个消息 C1 和 C2 分别接收到消息并进行处理
在发送者发送消息 dd,发出消息之后的把 C2 消费者停掉,按理说该 C2 来处理该消息,但是由于它处理时间较长,在还未处理完,也就是说 C2 还没有执行 ack 代码的时候,C2 被停掉了,此时会看到消息被 C1 接收到了,说明消息 dd 被重新入队,然后分配给能处理消息的 C1 处理了
3.3. RabbitMQ 持久化
3.3.1. 概念
刚刚我们已经看到了如何处理任务不丢失的情况,但是如何保障当 RabbitMQ 服务停掉以后消息生产者发送过来的消息不丢失。默认情况下 RabbitMQ 退出或由于某种原因崩溃时,它忽视队列和消息,除非告知它不要这样做。确保消息不会丢失需要做两件事:我们需要将队列和消息都标
记为持久化。
3.3.2. 队列如何实现持久化
之前我们创建的队列都是非持久化的,rabbitmq 如果重启的化,该队列就会被删除掉,如果要队列实现持久化 需要在声明队列的时候把 durable 参数设置为持久化
但是需要注意的就是如果之前声明的队列不是持久化的,需要把原先队列先删除,或者重新创建一个持久化的队列,不然就会出现错误
以下为控制台中持久化与非持久化队列的 UI 显示区、
这个时候即使重启 rabbitmq 队列也依然存在
3.3.3. 消息实现持久化
要想让消息实现持久化需要在消息生产者修改代码,MessageProperties.PERSISTENT_TEXT_PLAIN 添
加这个属性。
消息持久化
package com.atguigu.rabbitmq.three; import com.atguigu.rabbitmq.utils.RabbitMqutils; import com.rabbitmq.client.Channel; import com.rabbitmq.client.MessageProperties; import java.util.Scanner; /** * * 消息应答时不丢失 放回队列重新消费 */ public class Task2 { public static final String TASK_QUEUE_NAME="闫文超ack"; public static void main(String[] args) throws Exception{ Channel channel = RabbitMqutils.getChannel(); /** * 生成一个队列 * 1.队列名称 * 2.队列中的消息是否持久化 默认消息存储在内存中 否不持久化 (持久化放在磁盘中) *3.该队列是否只供一个消费者进行消费 是否共享 true 可以多个消费者消费 默认false,只能一个消费者消费 4.是否自动删除 最后一个消费者端开连接以后 该队列是否自动删除 true自动删除 false不删除 5.其他参数 */ //声明队列 持久化 boolean abc=true; channel.queueDeclare(TASK_QUEUE_NAME,abc,false,false,null); //从控制输入的信息 Scanner scanner = new Scanner(System.in); while (scanner.hasNext()){ String message = scanner.next(); //发消息了开始 /** * 发送一个消息 * 1.发送到那个交换机 此不考虑不写 * 2.路由key值是那个 本次是队列的名称 * 3.其他参数信息 *4.发送的消息 的消息体 发送要发送他的二进制 才能发送出去消息 */ //设置生产者发送消息为持久化消息(保存到磁盘中)一般保存在内存中 随时节能会丢失 channel.basicPublish("",TASK_QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN,message.getBytes("utf-8")); System.out.println("生产者发出消息的:"+message); } } }
将消息标记为持久化并不能完全保证不会丢失消息。尽管它告诉 RabbitMQ 将消息保存到磁盘,但是这里依然存在当消息刚准备存储在磁盘的时候 但是还没有存储完,消息还在缓存的一个间隔点。此时并没有真正写入磁盘。持久性保证并不强,但是对于我们的简单任务队列而言,这已经绰绰有余了。如果需要更强有力的持久化策略,参考后边课件发布确认章节。
3.3.4. 不公平分发
在最开始的时候我们学习到 RabbitMQ 分发消息采用的轮训分发,但是在某种场景下这种策略并不是很好,比方说有两个消费者在处理任务,其中有个消费者 1 处理任务的速度非常快,而另外一个消费者 2 处理速度却很慢,这个时候我们还是采用轮训分发的化就会到这处理速度快的这个消费者很大一部分时间处于空闲状态,而处理慢的那个消费者一直在干活,这种分配方式在这种情况下其实就不太好,但是 RabbitMQ 并不知道这种情况它依然很公平的进行分发。
为了避免这种情况,我们可以设置参数 channel.basicQos(1);
消费者加入
加入之后 沉睡1s的多做 沉睡30s的就做一个 发4个消息 1s3个 30s的一个
意思就是如果这个任务我还没有处理完或者我还没有应答你,你先别分配给我,我目前只能处理一个任务,然后 rabbitmq 就会把该任务分配给没有那么忙的那个空闲消费者,当然如果所有的消费者都没有完成手上任务,队列还在不停的添加新任务,队列有可能就会遇到队列被撑满的情况,这个时候就只能添加新的 worker 或者改变其他存储任务的策略。
3.3.5. 预取值
本身消息的发送就是异步发送的,所以在任何时候,channel 上肯定不止只有一个消息另外来自消费者的手动确认本质上也是异步的。因此这里就存在一个未确认的消息缓冲区,因此希望开发人员能限制此缓冲区的大小,以避免缓冲区里面无限制的未确认消息问题。这个时候就可以通过使用 basic.qos 方法设置“预取计数”值来完成的。该值定义通道上允许的未确认消息的最大数量。一旦数量达到配置的数量, RabbitMQ 将停止在通道上传递更多消息,除非至少有一个未处理的消息被确认,例如,假设在通道上有未确认的消息 5、6、7,8,并且通道的预取计数设置为 4,此时RabbitMQ 将不会在该通道上再传递任何消息,除非至少有一个未应答的消息被 ack。比方说 tag=6 这个消息刚刚被确认 ACK,RabbitMQ 将会感知这个情况到并再发送一条消息。消息应答和 QoS 预取值对用户吞吐量有重大影响。通常,增加预取将提高向消费者传递消息的速度。**虽然自动应答传输消息速率是最佳的,但是,在这种情况下已传递但尚未处理的消息的数量也会增加,**从而增加了消费者的 RAM 消耗(随机存取存储器)应该小心使用具有无限预处理的自动确认模式或手动确认模式,消费者消费了大量的消息如果没有确认的话,会导致消费者连接节点的内存消耗变大,所以找到合适的预取值是一个反复试验的过程,不同的负载该值取值也不同 100 到 300 范围内的值通常可提供最佳的吞吐量,并且不会给消费者带来太大的风险。预取值为 1 是最保守的。当然这将使吞吐量变得很低,特别是消费者连接延迟很严重的情况下,特别是在消费者连接等待时间较长的环境中。对于大多数应用来说,稍微高一点的值将是最佳的。