🏇 小肖来了
🍣 今天给大家带来的文章是《消息队列的 Work-Queues 篇》🍣
🍣 这是RabbitMQ的另一种模式🍣
🍣 希望各位小伙伴们能够耐心的读完这篇文章🍣
🙏 博主也在学习阶段,如若发现问题,请告知,非常感谢🙏
💗 同时也非常感谢各位小伙伴们的支持💗
1、Work Queues
- 工作队列的主要思想是避免立即执行密集型任务,而不得不等待它完成。我们把任务发送到队列中,后台进程将消息从队列中弹出执行,如果后台有多个工作进程的话,这些工作进程轮询(就是你一条、我一条、他一条)进行处理这些任务。
1.1、轮询发送消息
1.1.1、抽取工具类
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class RabbitmqUtil {
/**
* 连接工厂获取信道的工具类
* @return
* @throws IOException
* @throws TimeoutException
*/
public static Channel getChannel() throws IOException, TimeoutException {
// 创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
// 设置IP地址
factory.setHost("192.168.123.129");
// 用户名
factory.setUsername("admin");
// 密码
factory.setPassword("123");
// 创建连接
Connection connection = factory.newConnection();
// 获取信道
Channel channel = connection.createChannel();
return channel;
}
}
1.1.2、工作线程
import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import com.xiao.utils.RabbitmqUtil;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Worker {
public static final String QUEUE_NAME = "hello";
public static void main(String[] args) throws IOException, TimeoutException {
Channel channel = RabbitmqUtil.getChannel();
DeliverCallback deliverCallback = (var1, var2)->{
System.out.println(new String(var2.getBody()));
};
CancelCallback cancelCallback = var1->{
System.out.println(var1 + "消费消息被中断了");
};
/**
* 接收消息
* 1.消费哪个队列
* 2.消费成功之后是否要自动应答
* 3.消费者未成功消费的回调
* 4.消费者取消消费的回调
*/
channel.basicConsume(QUEUE_NAME,true,deliverCallback,cancelCallback);
}
}
1.1.3、制造多个工作线程
- 我们通过更改
IDEA的配置
进行启动多个工作线程。
1.1.4、生产者
import com.rabbitmq.client.Channel;
import com.xiao.utils.RabbitmqUtil;
import java.io.IOException;
import java.util.Scanner;
import java.util.concurrent.TimeoutException;
public class Task01 {
public static final String QUEUE_NAME = "hello";
public static void main(String[] args) throws IOException, TimeoutException {
Channel channel = RabbitmqUtil.getChannel();
/**
* 声明队列
* 1.队列名称
* 2.队列里面的消息是否持久化(磁盘)默认情况下消息存储在内存中
* 3.该队列是否提供一个消费者进行消费,就是是否进行消息共享
* 4.就是当最后一个消费者断开连接之后,该队列是否自动删除消息
* 5.其他参数
*/
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
Scanner sc = new Scanner(System.in);
while(sc.hasNext()){
String message = sc.next();
/**
* 发送一个消息
* 1.发送到哪个交换机
* 2.路由的Key值,也就是本次队列的名称
* 3.其他参数信息
* 4.发送消息的消息内容
*/
channel.basicPublish("",QUEUE_NAME,null,message.getBytes());
System.out.println("发送消息成功:" + message);
}
}
}
1.1.5、最终测试
1. 生产者发送消息
AA
发送消息成功:AA
BB
发送消息成功:BB
CC
发送消息成功:CC
DD
发送消息成功:DD
2. 测试结果
W1的控制台输出
W2的控制台输出
1.2、消息应答
1.2.1、概念
- 可能出现的问题:如果消费者完成一段任务需要一段时间,如果其中一个消费者处理一个很长的任务并只是完成了一部分就挂掉了,那么我们将丢失一部分消息。
- 为了保证消息在发送过程中不丢失,
rabbitmq
引入了消息应答机制。 - 消息应答就是:消费者在接收到消息并且处理该消息之后,告诉
rabbitmq
它已经处理了,rabbitmq
可以把消息删除了。
1.2.2、自动应答
消息发送后立即被认为已经发送成功,这种模式需要在高吞吐量和数据传输安全性方面做权衡。这种模式仅适合于在消费者可以高效并以某种速率能够处理这些消息的情况下使用。
1.2.3、手动应答的方法
// 用于肯定确认,RabbitMQ已知道消息并且成功的处理消息,可以将其丢弃了。
Channel.basicAck()
// 用于否定确认
Channel.basicNack()
Channel.basicReject()
// 后者比前者多一个参数,不处理消息了直接拒绝,可以将其对其
1.2.4、Multiple的解释
Multiple = true
表示可以批量应答,就好像到8
了,可以对5,6,7
都进行应答。
Multiple = true
表示不可以批量应答,就好像到8
了,只可以对8
进行应答,不可以对5,6,7
都进行应答。
1.2.5、消息自动重新入队
- 如果消费者由于某些原因失去连接(其通道已关闭,连接已关闭或
TCP
连接丢失),导致消息未发送ACK
确认,RabbitMQ将了解到消息未完全处理,并将其重新入对。如果此时其他消费者可以处理,它将很快重新分发给另一个消费者。这样,即使某个消费者偶尔死亡,也可以确保不会丢失任何消息。 - 如果所示,
C1
收到消息1之后为发送ack
就断开链接了,其消息1会重新入队,并被C2
给处理。
1.2.6、生产者代码
import com.rabbitmq.client.Channel;
import com.xiao.utils.RabbitmqUtil;
import java.io.IOException;
import java.util.Scanner;
import java.util.concurrent.TimeoutException;
public class Task2 {
public static final String TASK_QUEUE_NAME = "ack_queue";
public static void main(String[] args) throws IOException, TimeoutException {
Channel channel = RabbitmqUtil.getChannel();
channel.queueDeclare(TASK_QUEUE_NAME,false,false,false,null);
Scanner sc = new Scanner(System.in);
while(sc.hasNext()){
String next = sc.next();
channel.basicPublish("",TASK_QUEUE_NAME,null,next.getBytes("UTF-8"));
System.out.println("成功发送的消息是:" + next);
}
}
}
1.2.7、消费者代码
1. 工作线程1
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import com.xiao.utils.RabbitmqUtil;
import com.xiao.utils.SleepUtils;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Work03 {
public static final String TASK_QUEUE_NAME = "ack_queue";
public static void main(String[] args) throws IOException, TimeoutException {
Channel channel = RabbitmqUtil.getChannel();
System.out.println("C1正准备接受消息处理时间较短");
DeliverCallback deliverCallback = (var1,var2)->{
SleepUtils.sleep(1);
System.out.println("接收到的消息是:" + new String(var2.getBody(),"UTF-8"));
/**
* 1. 消息的标签
* 2. 是否进行批量应答
*/
channel.basicAck(var2.getEnvelope().getDeliveryTag(),false);
};
channel.basicConsume(TASK_QUEUE_NAME,false,deliverCallback,(var1->{
System.out.println("消息处理失败进行函数回调");
}));
}
}
2. 工作线程2
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import com.xiao.utils.RabbitmqUtil;
import com.xiao.utils.SleepUtils;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Work04 {
public static final String TASK_QUEUE_NAME = "ack_queue";
public static void main(String[] args) throws IOException, TimeoutException {
Channel channel = RabbitmqUtil.getChannel();
System.out.println("C2正准备接受消息处理时间较长");
DeliverCallback deliverCallback = (var1,var2)->{
SleepUtils.sleep(30);
System.out.println("接收到的消息是:" + new String(var2.getBody(),"UTF-8"));
/**
* 1. 消息的标签
* 2. 是否进行批量应答
*/
channel.basicAck(var2.getEnvelope().getDeliveryTag(),false);
};
channel.basicConsume(TASK_QUEUE_NAME,false,deliverCallback,(var1->{
System.out.println("消息处理失败进行函数回调");
}));
}
}
3、工具类
public class SleepUtils {
public static void sleep(int seconds){
try {
seconds *= 1000;
Thread.sleep(seconds);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
1.2.8、效果演示
- 生产者分别发送消息:
aa、bb、cc、dd
。 - 当工作进程
2
接受dd
消息的时候宕机了,那么消息不会丢失,而是转发给工作进程1
。
- 生产者
- 工作进程1
- 工作进程2
1.3、持久化
1.3.1、概念
- 我们如何保障当RabbitMQ服务停止之后消息生产者发送过来的消息不丢失。确保消息不会丢失需要做两件事情:我们需要将队列和消息都标记为持久化。
1.3.2、队列如何实现持久化
- 在我们没有将队列设置为持久化状态的时候,如果rabbitmq重启的话,该队列就会被删除,如果队列要实现持久化,需要将声明队列的地方设置为持久化。
boolean durable = true;
channel.queueDeclare(TASK_QUEUE_NAME,durable,false,false,null);
- 但是需要注意的是如果之前的队列不是持久化的,需要把原先的队列先删除,或者重新创建一个持久化的队列,不然就会出现错误。
重启之后
1.3.3、消息如何进行持久化
- 要想让消息实现持久化需要将消息生产者代码修改,如下所示。
channel.basicPublish("",TASK_QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN,next.getBytes("UTF-8"));
- 将消息标记为持久化并不能完全保证不会丢失消息尽管这里它告诉RabbitMQ将消息保存在磁盘,但是这里依然存在当消息刚准备存储在磁盘的时候,但是还没有存储完,消息还在缓存的一个间隔点。此时并没有真正写入磁盘,持久性保证并不强。
1.3.4、不公平分发
- RabbitMQ分发消息默认采用轮训分发机制。但是如果我们的工作进程对任务的处理速度不一样的话,那么还是采用轮训分发的方式将会影响机器的运作效率。所以我们需要采用不公平分发来实现能者多劳(就是处理消息快的处理的消息更多),从而达到机器运行效率的提高。
- 在消费者消费消息之前加入以下代码
channel.basicQos(1);
1.3.5、预取值
- 这个值是在工作进程里面的代码进行设置的,
channel.basicQos()
中传入的参数就是我们设置的预取值。如果预取值是5
,表示的是队列里面最多可以堆积5
条消息。 - 如果另一个工作进程将手上的消息处理完毕,预取值是5的工作进程队列中的消息不会被拿出来再给空闲的工作进程处理。