消息持久化
我们已经学会了如何确保即使消费者死亡,消息也不会丢失。但是如果RabbitMQ服务器停止,我们的消息仍然会丢失。
当RabbitMQ退出或崩溃时,它会忘记队列和消息,除非你告诉它不要这样做。为了确保消息不会丢失,需要做两件事:我们需要将队列和消息都标记为持久的。
首先,我们需要确保队列在RabbitMQ节点重启后仍然存在。为了做到这一点,我们需要声明它是持久的
boolean durable = true; channel.queueDeclare("task_queue2", durable, false, false, null);
注意:
RabbitMQ不允许你用不同的参数重新定义一个现有的队列,并且会向任何试图这样做的程序返回一个错误。
这个queueDeclare更改需要同时应用于生产者和使用者代码。
此时我们可以确定task_queue队列不会丢失,即使RabbitMQ重新启动。现在我们需要将消息标记为持久消息——通过将MessageProperties(实现BasicProperties)设置为值PERSISTENT_TEXT_PLAIN。
import com.rabbitmq.client.MessageProperties; channel.basicPublish("", "task_queue2", MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());
公平调度
您可能已经注意到,调度仍然没有完全按照我们想要的方式工作。例如,在有两个工作人员的情况下,当所有奇数消息都很重而偶数消息都很轻时,一个工作人员将一直很忙,而另一个几乎不做任何工作。RabbitMQ对此一无所知,它仍然会均匀地分发消息。
这是因为RabbitMQ只是在消息进入队列时分派消息。它不会查看消费者的未确认消息的数量。它只是盲目地将每n个消息发送给第n个消费者。
为了克服这个问题,我们可以使用basicQos方法,设置prefetchCount = 1。这告诉RabbitMQ一次不要给一个消费者提供超过一条消息。或者,换句话说,在消费者处理并确认前一条消息之前,不要向它发送新消息。相反,它将把它分派给下一个不忙的工作人员。
int prefetchCount = 1; channel.basicQos(prefetchCount);
消费者完整代码:
import com.rabbitmq.client.Channel; import com.rabbitmq.client.DeliverCallback; import java.io.IOException; import java.util.concurrent.TimeoutException; /** * @author : [WangWei] * @version : [v1.0] * @className : ConsumerOne * @description : [消费者1] * @createTime : [2023/1/17 9:07] * @updateUser : [WangWei] * @updateTime : [2023/1/17 9:07] * @updateRemark : [描述说明本次修改内容] */ public class ConsumerOne { private static final String TASK_QUEUE_NAME = "task_queue"; public static void main(String[] args) throws IOException, TimeoutException { RabbitMQUtils.getConnection(); Channel channel = RabbitMQUtils.getChannel(); channel.queueDeclare(TASK_QUEUE_NAME, false, false, false, null); System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); int prefetchCount = 1; channel.basicQos(prefetchCount); //使用额外的DeliverCallback接口来缓冲服务器推送给我们的消息。 DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), "UTF-8"); System.out.println(" [x] Received '" + message + "'"); try { doWork(message); } finally { System.out.println(" [x] Done"); channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); } }; boolean autoAck = false; channel.basicConsume(TASK_QUEUE_NAME, autoAck, deliverCallback, consumerTag -> { }); } /* * @version V1.0 * Title: doWork * @author Wangwei * @description 模拟任务的执行时间 * @createTime 2023/1/18 9:21 * @param [task] * @return void */ private static void doWork(String task) { for (char ch : task.toCharArray()) { //如果消息中存在.则1秒之后继续,有几个.停止几秒 if (ch == '.') { try { Thread.sleep(1000); } catch (InterruptedException _ignored) { Thread.currentThread().interrupt(); } } } } }
验证公平调度
现在将消费者1中的Thread.sleep(1000)改为Thread.sleep(3000);不添加公平调度相关代码进行测试。
生产者完整代码:
import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.MessageProperties; import java.io.IOException; import java.nio.charset.StandardCharsets; import java.util.concurrent.TimeoutException; /** * @author : [WangWei] * @version : [v1.0] * @className : Producer * @description : [生产者1] * @createTime : [2023/1/17 8:48] * @updateUser : [WangWei] * @updateTime : [2023/1/17 8:48] * @updateRemark : [描述说明本次修改内容] */ public class Producer { private static final String TASK_QUEUE_NAME = "task_queue1"; public static void main(String[] args) throws IOException, TimeoutException { //连接RabbitMQ RabbitMQUtils.getConnection(); //获取信道 Channel channel = RabbitMQUtils.getChannel(); /* * TASK_QUEUE_NAME 队列名称 * durable 队列是否持久化,true表示队列为持久化, 持久化的队列会存盘,在服务器重启的时候会保证不丢失相关信息 * exclusive 设置是否排他true表示队列为排他的, 如果一个队列被设置为排他队列,该队列仅对首次声明它的连接可见, 并在连接断开时自动删除, (这里需要注意三点:1.排他队列是基于连接Connection可见的, 同一个连接的不同信道Channel是可以同时访问同一连接创建的排他队列; "首次"是指如果一个连接己经声明了一个排他队列,其他连接是不允许建立同名的排他队列的, 这个与普通队列不同;即使该队列是持久化的,一旦连接关闭或者客户端退出, 该排他队列都会被自动删除,这种队列适用于一个客户端同时发送和读取消息的应用场景) *autoDelete 设置是否自动删除。为true 则设置队列为自动删除。自动删除的前提是, 至少有一个消费者连接到这个队列, 之后所有与这个队列连接的消费者都断开时,才会自动删除。 不能把这个参数错误地理解为: "当连接到此队列的所有客户端断开时,这个队列自动删除", 因为生产者客户端创建这个队列,或者没有消费者客户端与这个队列连接时,都不会自动删除这个队列 *arguments 可以设置队列其它参数,设置队列的有效期,消息的最大长度,队列的消息生命周期等等 * * */ channel.queueDeclare(TASK_QUEUE_NAME,false,false,false,null); //发送10条消息 for (int i = 0; i <10 ; i++) { String message="廊坊师范..."+i; /* *交换机命名,不填写使用默认的交换机 * routingKey -路由键道具-消息的其他属性-路由头等正文 * 消息正文 * **/ //推送消息 channel.basicPublish("",TASK_QUEUE_NAME, null,message.getBytes(StandardCharsets.UTF_8)); System.out.println(" [x] Sent '" + message + "'"); } } }
消费者1完整代码:
import com.rabbitmq.client.Channel; import com.rabbitmq.client.DeliverCallback; import java.io.IOException; import java.util.concurrent.TimeoutException; /** * @author : [WangWei] * @version : [v1.0] * @className : ConsumerOne * @description : [消费者1] * @createTime : [2023/1/17 9:07] * @updateUser : [WangWei] * @updateTime : [2023/1/17 9:07] * @updateRemark : [描述说明本次修改内容] */ public class ConsumerOne { private static final String TASK_QUEUE_NAME = "task_queue1"; public static void main(String[] args) throws IOException, TimeoutException { RabbitMQUtils.getConnection(); Channel channel = RabbitMQUtils.getChannel(); channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null); System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); //channel.basicQos(1); //使用额外的DeliverCallback接口来缓冲服务器推送给我们的消息。 DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), "UTF-8"); System.out.println(" [x] Received '" + message + "'"); try { doWork(message); } finally { System.out.println(" [x] Done"); channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); } }; boolean autoAck = false; channel.basicConsume(TASK_QUEUE_NAME, autoAck, deliverCallback, consumerTag -> { }); } /* * @version V1.0 * Title: doWork * @author Wangwei * @description 模拟任务的执行时间 * @createTime 2023/1/18 9:21 * @param [task] * @return void */ private static void doWork(String task) { for (char ch : task.toCharArray()) { //如果消息中存在.则1秒之后继续,有几个.停止几秒 if (ch == '.') { try { Thread.sleep(3000); } catch (InterruptedException _ignored) { Thread.currentThread().interrupt(); } } } } }
消费者2完整代码:
import com.rabbitmq.client.Channel; import com.rabbitmq.client.DeliverCallback; import java.io.IOException; import java.util.concurrent.TimeoutException; /** * @author : [WangWei] * @version : [v1.0] * @className : ConsumerTwo * @description : [消费者2] * @createTime : [2023/1/17 9:10] * @updateUser : [WangWei] * @updateTime : [2023/1/17 9:10] * @updateRemark : [描述说明本次修改内容] */ public class ConsumerTwo { private static final String TASK_QUEUE_NAME = "task_queue1"; public static void main(String[] args) throws IOException, TimeoutException { RabbitMQUtils.getConnection(); Channel channel = RabbitMQUtils.getChannel(); channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null); System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); //channel.basicQos(1); //使用额外的DeliverCallback接口来缓冲服务器推送给我们的消息。 DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), "UTF-8"); System.out.println(" [x] Received '" + message + "'"); try { doWork(message); } finally { System.out.println(" [x] Done"); channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); } }; boolean autoAck = false; channel.basicConsume(TASK_QUEUE_NAME, autoAck, deliverCallback, consumerTag -> { }); } /* * @version V1.0 * Title: doWork * @author Wangwei * @description 模拟任务的执行时间 * @createTime 2023/1/18 9:21 * @param [task] * @return void */ private static void doWork(String task) { for (char ch : task.toCharArray()) { //如果消息中存在.则1秒之后继续,有几个.停止几秒 if (ch == '.') { try { Thread.sleep(1000); } catch (InterruptedException _ignored) { Thread.currentThread().interrupt(); } } } } }
执行结果:
消费者1:
消费者2:
结论:
有两个消费者的情况下,一个消费者1一直很忙(完成一条消息需要3秒),消费者2一致很轻松(完成一条消息需要1秒)。
RabbitMQ对此一无所知,它仍然会均匀地分发消息。导致消费者1和消费者2执行了同样的消息数量。
现在将消费者1中的Thread.sleep(1000)改为Thread.sleep(3000);添加公平调度相关代码进行测试。**
消费者1完整代码
import com.rabbitmq.client.Channel; import com.rabbitmq.client.DeliverCallback; import java.io.IOException; import java.util.concurrent.TimeoutException; /** * @author : [WangWei] * @version : [v1.0] * @className : ConsumerOne * @description : [消费者1] * @createTime : [2023/1/17 9:07] * @updateUser : [WangWei] * @updateTime : [2023/1/17 9:07] * @updateRemark : [描述说明本次修改内容] */ public class ConsumerOne { private static final String TASK_QUEUE_NAME = "task_queue1"; public static void main(String[] args) throws IOException, TimeoutException { RabbitMQUtils.getConnection(); Channel channel = RabbitMQUtils.getChannel(); channel.queueDeclare(TASK_QUEUE_NAME, false, false, false, null); System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); channel.basicQos(1); //使用额外的DeliverCallback接口来缓冲服务器推送给我们的消息。 DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), "UTF-8"); System.out.println(" [x] Received '" + message + "'"); try { doWork(message); } finally { System.out.println(" [x] Done"); channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); } }; boolean autoAck = false; channel.basicConsume(TASK_QUEUE_NAME, autoAck, deliverCallback, consumerTag -> { }); } /* * @version V1.0 * Title: doWork * @author Wangwei * @description 模拟任务的执行时间 * @createTime 2023/1/18 9:21 * @param [task] * @return void */ private static void doWork(String task) { for (char ch : task.toCharArray()) { //如果消息中存在.则1秒之后继续,有几个.停止几秒 if (ch == '.') { try { Thread.sleep(3000); } catch (InterruptedException _ignored) { Thread.currentThread().interrupt(); } } } } }
消费者2完整代码:
import com.rabbitmq.client.Channel; import com.rabbitmq.client.DeliverCallback; import java.io.IOException; import java.util.concurrent.TimeoutException; /** * @author : [WangWei] * @version : [v1.0] * @className : ConsumerTwo * @description : [消费者2] * @createTime : [2023/1/17 9:10] * @updateUser : [WangWei] * @updateTime : [2023/1/17 9:10] * @updateRemark : [描述说明本次修改内容] */ public class ConsumerTwo { private static final String TASK_QUEUE_NAME = "task_queue1"; public static void main(String[] args) throws IOException, TimeoutException { RabbitMQUtils.getConnection(); Channel channel = RabbitMQUtils.getChannel(); channel.queueDeclare(TASK_QUEUE_NAME, false, false, false, null); System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); channel.basicQos(1); //使用额外的DeliverCallback接口来缓冲服务器推送给我们的消息。 DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), "UTF-8"); System.out.println(" [x] Received '" + message + "'"); try { doWork(message); } finally { System.out.println(" [x] Done"); channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); } }; boolean autoAck = false; channel.basicConsume(TASK_QUEUE_NAME, autoAck, deliverCallback, consumerTag -> { }); } /* * @version V1.0 * Title: doWork * @author Wangwei * @description 模拟任务的执行时间 * @createTime 2023/1/18 9:21 * @param [task] * @return void */ private static void doWork(String task) { for (char ch : task.toCharArray()) { //如果消息中存在.则1秒之后继续,有几个.停止几秒 if (ch == '.') { try { Thread.sleep(1000); } catch (InterruptedException _ignored) { Thread.currentThread().interrupt(); } } } } }
执行结果:
消费者1:
消费者2:
结论:
置prefetchCount = 1。这告诉RabbitMQ一次不要给一个消费者提供超过一条消息。或者,换句话说,在消费者处理并确认前一条消息之前,不要向它发送新消息。相反,它将把它分派给下一个不忙的消费者。
总结
学习一门知识需要亲自动手去验证去证明这种方式是可行了,这样对于这个知识点才算是理解的更深。按照这样做一定行,而不是应该可以,大概可以吧。