结构
Hello World
consumer
package one; import com.rabbitmq.client.*; import java.io.IOException; import java.util.concurrent.TimeoutException; /** * 消费者 */ public class Consumer { public static final String QUEUE_NAME = "hello"; // 接受消息 public static void main(String[] args) throws IOException, TimeoutException { // 创建链接工厂 ConnectionFactory factory = new ConnectionFactory(); factory.setHost("106.14.165.91"); factory.setPassword("123"); factory.setUsername("admin"); // 进行连接 Connection connection = factory.newConnection(); // 链接成功之后创建一个信道 Channel channel = connection.createChannel(); // 消费者消费消息 /** * 参数 * 1.消费哪个队列 * 2.消费成功之后,是否要自动应答,true表示自动应答, 否则false * 3.未消费成功的回调方法 * 4.消费者取消消费的回调 */ channel.basicConsume(QUEUE_NAME, true, new DeliverCallback() { @Override public void handle(String var1, Delivery var2) throws IOException { String msg = new String(var2.getBody()); System.out.println(msg); } }, new CancelCallback() { @Override public void handle(String s) throws IOException { System.out.println("发生错误:" + s); } }); } }
producer
package one; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import java.io.IOException; import java.util.concurrent.TimeoutException; public class Producer { public static final String QUEUE_NAME = "hello"; // 发消息 public static void main(String[] args) throws IOException, TimeoutException { // 创建一个链接工厂 ConnectionFactory factory = new ConnectionFactory(); // 工厂的ip, 链接rabbit队列 factory.setHost("106.14.165.91"); factory.setUsername("admin"); factory.setPassword("123"); // 建立连接 Connection connection = factory.newConnection(); // 获取信道 Channel channel1 = connection.createChannel(); // 生成一个队列 /** * 第一个参数: 队列名称 * 第二个参数: 消息是否持久化, true表示存储在磁盘上, 否则表示存储在内存中(默认) * 第三个参数: 该队列是否消息共享, true表示可以多个消费者消费, 否则只能一个消费者消费 * 第四个参数: 是否自动删除, 最后一个消费者断开连接之后, 该队列是否自动删除,true表示自动删除 * 其他参数: */ channel1.queueDeclare(QUEUE_NAME,false,false,false,null); long nextPublishSeqNo = channel1.getNextPublishSeqNo(); System.out.println(nextPublishSeqNo); // 发送消息 String msg = "hello world"; /** 参数列表 * 1 : 发送到哪个交换机 * 2 : 路由的key值, 本次是队列名称 * 3 : 其他参数 * 4 : 消息体 */ channel1.basicPublish("",QUEUE_NAME,null,msg.getBytes()); long nextPublishSeqNo2 = channel1.getNextPublishSeqNo(); System.out.println(nextPublishSeqNo); channel1.basicPublish("",QUEUE_NAME,null,msg.getBytes()); long nextPublishSeqNo3 = channel1.getNextPublishSeqNo(); System.out.println("消息发送完毕over"); } }
创建连接API解析
官方api网址:
Connection:publisher/consumer和broker之间的TCP连接, Channel:如果每一次访问 RabbitMQ 都建立一个Connection,在消息量大的时候建立 TCP Connection的开销将是巨大的,效率也较低。Channel是在connection内部建立的逻辑连接,如果应用程序支持多线程,通常每个thread创建单独的channel进行通讯,AMQP method包含了channel id 帮助客户端和message broker 识别 channel,所以channel之间是完全隔离的。
Channel作为轻量级的Connection极大减少了操作系统建立TCP connection的开销.
创建连接工厂
创建一个连接工厂之后, 设置对应Rabbitmq在哪个服务器上面, 并提供安全访问的验证.
在建立连接工厂之后进行连接, 就可以使用工厂创建连接.
ConnectionFactory factory = new ConnectionFactory(); // 工厂的ip, 链接rabbit队列 factory.setHost("106.14.165.11"); factory.setUsername("usr"); factory.setPassword("123456"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel();
创建链接之后就可以使用这个链接的对象来创建channel.
生产者生产消息
生产者生产消息, 然后通过channel发送给队列. 通过创建的channel对象, 调用其中的basicPublish方法来将消息发送给队列.
basicPublish
是 RabbitMQ 中用于发布消息到指定交换机的方法。它的主要作用是允许生产者将消息发送到 RabbitMQ 的交换机,然后交换机根据路由规则将消息发送到相应的队列中,以供消费者消费。
basicPublish参数解析:
basicPublish有三个重载版本:
void basicPublish(String exchange , String routingKey , AMQP.BasicProperties props, byte[] body ) throws IOException;
- exchange : 指定要发送的交换机的名称, 如果设置空字符串, 那么消息会被发送到RabbitMQ的默认交换机.
- routingKey : 路由键, 用于指定消息要路由到的队列.
- props : 消息的属性, 这是一个可选参数, 里面有: 消息类型, 格式, 优先级, 过期时间等等
- body : 消息体, 也就是要发送的消息本身
exchange这个参数, 如果指定默认的交换机, 也就是如下图所示:
void basicPublish(String exchange, String routingKey, boolean var3, AMQP.BasicProperties props, byte[] body) throws IOException;
- exchange和
routingKey
:与第一个方法中的意义相同,分别是交换机名称和路由键。 var3
(boolean):是否强制路由(mandatory routing)。如果设置为true
,并且消息无法路由到任何队列(没有匹配的绑定),那么RabbitMQ会返回一个错误给生产者。如果设置为false
,消息将被丢弃。- props和 body:与第一个方法中的意义相同,分别是消息属性和消息体。
void basicPublish(String var1, String var2, boolean var3, boolean var4, AMQP.BasicProperties var5, byte[] var6) throws IOException;
var1
和var2
:与前两个方法中的意义相同,分别是交换机名称和路由键。var3
(boolean):是否强制路由,与第二个方法中的意义相同。var4
(boolean):是否立即发布(immediate flag)。如果设置为true
,并且消息无法路由到任何消费者(没有匹配的队列或消费者不在线),那么RabbitMQ会返回一个错误给生产者。如果设置为false
,消息将被存储在队列中等待消费者。var5
和var6
:与第一个方法中的意义相同,分别是消息属性和消息体。
需要注意的是, 如果你指定默认的交换机, 也就是default交换机, 那么第二个参数routingKey的意思就变成了queue了, 也就是第二个参数改为 对应的队列的名称.
消费者消费消息
消费者消费消息的方法为basicConsume() 这个方法有很多个重载, 如下:
地址: Channel (RabbitMQ Java Client 5.20.0 API)
这里只讲解最常见的, 也是初学者最常用的一个方法:
basicConsume( String queue, boolean autoAck, Map<String,Object> arguments, DeliverCallback deliverCallback, CancelCallback cancelCallback )
参数解析:
- String queue:
- 这个参数指定了消费者要从中接收消息的队列名称。
- boolean autoAck:
- 这个参数决定了是否自动确认消息。如果设置为
true
,则一旦消息被交付给消费者,RabbitMQ 会自动将其标记为已确认,即使消费者还没有实际处理完这条消息。这种模式下,如果消费者在处理消息时崩溃或发生错误,那么这条消息就会丢失,因为 RabbitMQ 认为它已经被成功处理了。 - 如果设置为
false
,则消费者需要显式地调用basicAck
方法来确认消息已被成功处理。这样,如果消费者在处理消息时崩溃,RabbitMQ 会重新将这条消息放回队列中,等待其他消费者处理,从而保证了消息的可靠性。
- Map arguments:
- 这个参数允许你传递额外的参数到消费者,这些参数可以用来配置消费者的行为。例如,你可以使用它来设置消费者标签(consumer tag),该标签用于唯一标识这个消费者,或者在后续的操作中引用它。
- DeliverCallback deliverCallback:
- 这是一个回调函数,当 RabbitMQ 向消费者发送消息时,会自动调用这个回调。回调函数通常包含处理消息的逻辑,比如解析消息内容、执行业务逻辑等。
- 回调函数的参数通常包含消息的内容、消息的元数据(如消息的交换机、路由键、消息ID等)以及一个通道(Channel)对象,通过这个通道对象,消费者可以发送消息确认、拒绝消息或进行其他操作。
- CancelCallback cancelCallback:
- 这是一个可选的回调函数,当消费者被取消(例如,由于连接断开或消费者显式地调用
basicCancel
)时,会自动调用这个回调。这个回调可以用于执行清理工作,比如释放资源、记录日志等。
下面是 DeliverCallback 和CancelCallback 两个接口的代码:
@FunctionalInterface public interface DeliverCallback { void handle(String var1, Delivery var2) throws IOException; }
@FunctionalInterface public interface CancelCallback { void handle(String var1) throws IOException; }
我们需要重写里面的handle方法, 示例如下:
channel.basicConsume(QUEUE_NAME, true, new DeliverCallback() { @Override public void handle(String var1, Delivery var2) throws IOException { // ... } }, new CancelCallback() { @Override public void handle(String s) throws IOException { // ... } });
当然你也可以使用lambda表达式;
队列声明
生产者使用的是basicPublish来将消息推送至队列, 也就是:
channel1.basicPublish("",QUEUE_NAME,null,msg.getBytes());
但是存在一个问题, 如果你basicPublish指定的交换机不存在? 那么你推送消息到你指定的交换机, 就会发生异常, 所以除非你的RabbitMQ-server本地已经创建了这个交换机, 那么就不需要其他操作, 但是如果你没有你指定的名称的交换机, 那么就应该去声明一个交换机.
queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String,Object> arguments)
queueDeclare()
也就是方法: queueDeclare() , 它可以指定参数, 也可以不指定, 下面是他们的解释:
queueDeclare
是 RabbitMQ Java 客户端库中用于声明队列的方法。这个方法有两个版本,一个不带参数,另一个带有多个参数以提供队列的详细配置。下面我将详细解释这两个方法及其参数的作用。
第一个方法:queueDeclare()
Actively declare a server-named exclusive, autodelete, non-durable queue.
这个方法不带任何参数。当你调用这个方法时,RabbitMQ 会为你创建一个新的队列,该队列的名称将由 RabbitMQ 自动生成,并且这个队列是非持久的、排他的、自动删除的,且不带任何额外的参数。
由于没有指定队列名称,你通常无法预先知道队列的确切名称,这可能会在某些场景下造成不便,比如当你需要多个消费者共享同一个队列时。此外,由于队列是非持久的,如果 RabbitMQ 服务器重启,这个队列将会丢失,所有在队列中的消息也会丢失。
第二个方法:queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map arguments)
这个方法允许你更细致地配置队列的属性。下面是每个参数的解释:
- String queue:
- 队列的名称。这个名称必须是非空的,并且在 RabbitMQ 服务器上是唯一的。
- boolean durable:
- 是否持久化队列。如果设置为
true
,队列会在 RabbitMQ 服务器重启后依然存在。如果设置为false
,队列则是非持久的,服务器重启后队列将不存在。
- boolean exclusive:
- 是否排他。如果设置为
true
,队列只能被声明它的连接使用,并且当连接关闭时,队列会被自动删除。这通常用于临时队列。
- boolean autoDelete:
- 是否自动删除。如果设置为
true
,当最后一个消费者断开连接后,队列会自动删除。如果设置为false
,则不会自动删除队列。
- Map arguments:
- 一组额外的队列参数,可以用来设置队列的更多高级特性。例如,你可以设置队列的最大长度、消息生存时间等。
对比两个方法
第一个方法(无参数版本)非常简单易用,但功能有限。它适用于那些不需要复杂队列配置的场景,比如临时测试或简单应用。然而,由于它创建的队列是非持久的,且名称不可预知,因此它可能不适用于需要持久化存储或精确控制队列名称的场景。
第二个方法(带参数版本)提供了更丰富的队列配置选项,使得你可以更精确地控制队列的行为。通过设置不同的参数,你可以创建持久化队列、排他队列、自动删除队列,以及带有额外属性的队列。这使得这个方法适用于那些需要复杂队列配置和高级特性的场景。
在实际应用中,你应该根据应用的需求来选择使用哪个方法。如果你只是需要一个简单的、临时的队列来传递消息,那么无参数版本可能足够了。但如果你需要确保队列的持久性、控制队列的名称、设置队列的额外属性等,那么你应该使用带参数版本。
对于消费者同样如此
AMQP.Queue.DeclareOk queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map arguments)
这个方法确实是用于声明(或创建)一个队列的。在RabbitMQ中,队列的声明是一个幂等操作,这意味着即使队列已经存在,再次声明它也不会产生错误或导致任何不期望的行为。
当调用这个方法时,RabbitMQ会检查是否已经存在具有相同名称的队列:
- 如果队列不存在:RabbitMQ会根据提供的参数(如
durable
、exclusive
、autoDelete
和arguments
)创建一个新的队列。 - 如果队列已经存在:RabbitMQ会忽略声明请求中的大多数参数(除了
exclusive
和autoDelete
,这两个参数仅在首次声明队列时生效),并返回队列的当前属性。重要的是要注意,即使队列已经存在,durable
标志也不会影响现有队列的持久性。如果队列在原始声明时是持久的,那么它将继续是持久的,即使后续的声明将其标记为非持久的。
因此,如果你尝试声明一个已经存在的队列,RabbitMQ不会报错或采取任何特别的行动,除了验证提供的exclusive
和autoDelete
标志是否与原始声明一致(如果不一致,操作会失败)。其他参数(如durable
)将不会影响已存在的队列(但是不报错并不是绝对的, 这个需要根据版本说明去判断, 不能肯定它不报错)。
最后,需要注意的是,虽然声明队列本身不会抛出IOException
,但如果在与RabbitMQ服务器通信时发生网络问题或其他I/O问题,这个方法可能会抛出IOException
。因此,在实际使用中,你应该妥善处理这些潜在的异常。
工作队列Work Queues
工作队列, 主要是避免立即执行资源密集型任务, 而不得不等待它完成, 相反我们安装任务之后执行, 我们把任务封装为消息并将其发送给队列, 在后台运行的工作进程将弹出任务并最终执行作业, 当有多个线程工作时, 这些工作线程讲义气处理这些任务.
RabbitMQ的工作队列(Work Queue)是一种消息队列模式,它允许你将任务(通常表示为消息)分发给多个消费者(工作进程)进行并行处理。这种模式特别适用于那些可以并行处理且不需要按照特定顺序完成的任务。
在工作队列模式中,生产者发送消息到队列中,一个或多个消费者从队列中接收并处理这些消息。每个消息都会被一个消费者处理,并且通常不会被多个消费者处理(除非有明确的路由或复制逻辑)。这种模式非常适合用于处理后台任务,如批量电子邮件发送、日志处理、图像处理等。
RabbitMQ的工作队列模式有以下几个关键特点:
- 任务分发:生产者将任务作为消息发送到队列中。RabbitMQ负责将消息从队列中取出并分发给一个或多个消费者。分发通常基于消息的先进先出(FIFO)顺序,但也可以通过其他策略(如优先级队列)进行定制。
- 并行处理:多个消费者可以同时从队列中接收消息并处理任务。这使得任务可以并行执行,从而提高了整体的处理速度。
- 消息确认:为了确保消息的可靠处理,消费者通常在处理完消息后会向RabbitMQ发送一个确认信号(ack)。这样,即使消费者在处理消息时崩溃,RabbitMQ也可以将未确认的消息重新放回队列中,等待其他消费者处理。这种机制保证了消息的可靠性。
- 持久化:通过配置队列和消息的持久化属性,可以确保即使在RabbitMQ服务器重启后,消息也不会丢失。这对于处理重要任务至关重要。
- 扩展性:工作队列模式具有很好的扩展性。你可以根据需要添加更多的消费者来处理更多的任务,从而轻松应对负载的增加。
使用RabbitMQ的工作队列模式,你可以构建高效、可靠且可扩展的后台任务处理系统,以满足各种应用场景的需求。
下面我们来一一列举出案例来解析工作队列的特性....
公平分发
RabbitMQ 在默认情况下,其分发机制是公平的,它试图将消息平均地分发给各个消费者,确保每个消费者都有机会处理大致相同数量的消息。这种分发并不是随机的,而是按照一定的顺序或规则进行。
但是这种分发模式会有一个很大的问题, 那么就是如果一个消费者处理消息的速度慢, 一个快, 那么就会有一个消费者产生饥饿的情况, 而另外一个消费者非常忙碌, 严重的队列会出现消息积压的情况. 此时产生饥饿的消费者没有完全利用cpu来消费消息, 所以就产生了资源的浪费, 为了避免这个情况
在 RabbitMQ 中,如果消息被平均分发到多个消费者(如消费者a和b),但消费者的处理速度不同(如a处理速度很快,b处理速度很慢),那么未被消费的消息会继续保留在队列中,等待消费者处理。具体来说,当消费者a迅速处理完自己的消息后,它会继续从队列中获取并处理新的消息(如果有的话)。而消费者b由于处理速度慢,它还未消费完的消息会留在队列中,等待其逐渐处理。
RabbitMQ 本身并没有为每个消费者设置单独的缓存来存储未处理的消息。消息的处理和存储都是在队列层面进行的。队列是消息的缓冲区,它负责存储和分发消息给消费者。消费者按照自己的速度从队列中拉取(或在某些配置下由队列推送)消息进行处理。
为了解决这种情况, 可以使用basicQos(1)方法来设置每个消费者同时只能消费一个消息, 这个设置将会告诉队列, 给我发送的消息, 同时不能超过一个 , 或者说是"别给我发送消息, 除非我上一个消息已经处理并应答", 同时, 他会将第二个消息发送给另外一个空闲的消费者来处理.
int prefetchCount = 1; channel.basicQos(prefetchCount);
但是如果所有的消费者都处于忙碌状态, 消息无法即使处理, 那么如果你还有必要维护这个队列, 那么推荐您多创建几个消费者去消费.
轮训分发
首先创建两个消费者, 创建一个生产者, 看看他们之间的任务是如何分配的:
消费者1 :
package MutiThreadWorkQueue; import Util.RabbitMQUtil; import com.rabbitmq.client.CancelCallback; import com.rabbitmq.client.Channel; import com.rabbitmq.client.DeliverCallback; import com.rabbitmq.client.Delivery; import java.io.IOException; /** * 工作队列1 * 也就是消费者1 */ public class Worker1 { // 接收消息 public static void main(String[] args) throws IOException { Channel channel = RabbitMQUtil.getChannel(); // sout System.out.println("worker1 : "); // 接收消息 channel.basicConsume(RabbitMQUtil.QUEUE_NAME, true, new DeliverCallback() { @Override public void handle(String s, Delivery delivery) throws IOException { System.out.println("worker1: " + new String(delivery.getBody())); } }, new CancelCallback() { @Override public void handle(String s) throws IOException { System.out.println("worker1 发生错误"); } }); } }
消费者2同消费者1一样, 只不过里面的一些向控制台输出的提示信息发生了一些修改, 例如:
System.out.println("worker2 : ");
生产者
package MutiThreadWorkQueue; import Util.RabbitMQUtil; import com.rabbitmq.client.Channel; import java.io.IOException; import java.util.Scanner; public class Producer { public static void main(String[] args) throws IOException { Channel channel = RabbitMQUtil.getChannel(); // 从控制台输入 接收信息 Scanner scanner = new Scanner(System.in); while(scanner.hasNext()) { String msg = scanner.next(); channel.basicPublish("",RabbitMQUtil.QUEUE_NAME,null,msg.getBytes()); System.out.println("发送消息: " + msg + ",发送完毕"); } } }
RabbitMQUtil :
package Util; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import java.io.IOException; import java.util.concurrent.TimeoutException; public class RabbitMQUtil { public static final String QUEUE_NAME = "hello"; public static Channel getChannel() { ConnectionFactory factory = new ConnectionFactory(); // 工厂的ip, 链接rabbit队列 factory.setHost("106.14.165.91"); factory.setUsername("admin"); factory.setPassword("123"); // 建立连接 Connection connection = null; try { connection = factory.newConnection(); return connection.createChannel(); } catch (IOException e) { throw new RuntimeException(e); } catch (TimeoutException e) { throw new RuntimeException(e); } // 获取信道 } }
查看rabbitmq是否已经存在RabbitMQUtil.QUEUE_NAME这个字符串对应的队列:
已经存在, 直接启动生产者和消费者, 然后在生产者中多次输入信息:
查看消费者1和消费者2 :
如果你多次重复的去实验你就会发现, 总是奇数的在woker1或者是woker2.
为什么?
我们首先看看消费者的消费的代码:
channel.basicConsume(RabbitMQUtil.QUEUE_NAME, true, new DeliverCallback() { @Override public void handle(String s, Delivery delivery) throws IOException { System.out.println("worker1: " + new String(delivery.getBody())); } }, new CancelCallback() { @Override public void handle(String s) throws IOException { System.out.println("worker1 发生错误"); } });
然后根据上面的参数解析, 可以发现, 其实他是使用的这个构造方法:
这个构造方法有什么特别之处? 那就是它没有指定exclusive, 也就是没有指定它是否是排他的, 但是不设置就不代表没有隐式设置.
官方文档给出的这个构造方法的描述是:
也就是non-sxclusive, 我们知道exclusive是排他的意思, 那么non-exclusive就是不拍他的, 也就是说, 消费的时候允许其他消费者一起共享处理. 但是每一个任务只能分发给一个消费者.
消息应答
首先你得确认一个东西, 那就是, 消息从生产者这里发送出去, 就可以不管了吗, 队列将消息分配给消费者, 就可以不管了吗, 当然不是, 还需要使用一种应答机制, 你可以将它和TCP协议的应答报文机制和超时重传进行一个对比.
RabbitMQ 的消息应答机制是一个确保消息在发送和接收过程中可靠性的重要手段。这种机制主要用于处理消费者在处理消息时可能出现的异常情况,如消费者在处理消息过程中宕机,导致消息丢失。
RabbitMQ 一旦向消费者传递了一条消息,通常会将该消息标记为已发送。然而,如果消费者在处理消息的过程中发生宕机,未处理的消息可能会丢失。为了保证消息在发送过程中不丢失,RabbitMQ 引入了消息应答机制。
消息应答机制的工作原理是:消费者在接收到消息并且处理完该消息之后,会向 RabbitMQ 发送一个确认信号,告诉 RabbitMQ 它已经处理了该消息,此时 RabbitMQ 可以安全地将该消息从队列中删除。
RabbitMQ 提供了两种消息应答模式:
- 自动应答(Auto Acknowledgment):在这种模式下,一旦消息被消费者接收,RabbitMQ 会立即将消息标记为已被消费,而不需要消费者明确地向 RabbitMQ 发送确认。这种模式对消息的处理时机和可靠性要求不高,可以容忍一定程度的消息丢失。但是,如果消费者在处理消息的过程中发生错误,消息仍然会从队列中删除,这可能导致消息丢失。
- 手动应答(Manual Acknowledgment):在手动应答模式下,消费者在处理完消息之后,需要向 RabbitMQ 发送明确的确认信号。这种模式下,消费者可以更精确地控制消息的删除时机,只有在确认消息已经成功处理后才通知 RabbitMQ 删除消息。这有助于防止因消费者处理错误或宕机而导致的消息丢失。
DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), "UTF-8"); System.out.println(" [x] Received '" + message + "'"); try { doWork(message); } finally { System.out.println(" [x] Done"); } }; boolean autoAck = true; // acknowledgment is covered below channel.basicConsume(TASK_QUEUE_NAME, autoAck, deliverCallback, consumerTag -> { });
在实际使用中,可以根据应用的需求和消息的重要性来选择适合的消息应答模式。对于要求消息可靠传递的场景,建议使用手动应答模式;而对于对消息丢失容忍度较高的场景,可以选择自动应答模式以提高处理效率。
对于自动应答, 你想想,你有没有在什么地方见过?? 那肯定见过呀, 就在我们的api解析中, basicConsume的构造方法中, 存在一个参数, 名为AutoAck, 也就是Auto acknowledgement, 自动确认. 此时, 如果你设置为true, 那么就表明他是自动确认, 当队列将消息发送给消费者, 只要消费者接收到消息之后, RabbitMQ会立即将消息标记为已消费, 然后删除.
但是这样不安全, 得换一个更安全的方法 : 手动应答.
开启消息应答(手动) 你首先需要设置消费者的消费autoack为false :
官方有这样一句话描述basicAck:
简而言之就是, 如果你没有进行手动应答, 虽然是一个很容易犯的错误, 但是他会造成严重的后果, 也就是当你的客户端退出的时候消息会被重新推送(就像一些消息被无规则的推送), 但是RabbitMQ将会占用越来越多的内存, 这是因为这些消息没有得到正确的处理.
接下来我们看看basicAck这个方法的声明:
// Acknowledge one or several received messages. basicAck(long deliveryTag, boolean multiple)
参数解析:
- deliveryTag: 这是一个长整型(
long
)参数,代表要确认的消息的投递标签(delivery tag)。投递标签是 RabbitMQ 在发送消息给消费者时附带的,用于唯一标识这个消息。通过确认特定的投递标签,消费者可以告诉 RabbitMQ 它已经处理了哪个消息。 - multiple : 这是一个布尔型(
boolean
)参数,指示是否确认一个投递标签范围内的多个消息。如果multiple
设置为true
,则 RabbitMQ 会将投递标签小于或等于指定deliveryTag
的所有未确认消息标记为已确认。如果multiple
设置为false
,则仅确认具有指定deliveryTag
的单个消息。
那么, 这个deliveryTag和multiple从哪里来? 还记得处理接口DeliverCallback 吗, 每次消息队列向这个消费者发送消息, 消费者就会调用这个接口.
DeliverCallback
在 RabbitMQ 的 Java 客户端中是一个回调接口,用于处理从 RabbitMQ 队列接收到的消息。当 RabbitMQ 服务器向消费者发送消息时,它会调用这个回调接口,并将消息作为参数传递给 DeliverCallback
的实现方法。
具体来说,DeliverCallback
的实现方法接收两个参数:
consumerTag
:这是一个唯一标识消费者的标签,用于在多个消费者之间区分不同的消费实例, 是队列发送给消费者的时候自动为消费者分配的。delivery
:这是一个Delivery
对象,它包含了从 RabbitMQ 接收到的消息的内容以及其他相关信息,如消息的包体(body)、消息的头部(headers)、消息的投递标签(delivery tag)等。
DeliverCallback
不是一个缓存。它仅仅是一个回调函数,用于实时处理从 RabbitMQ 服务器接收到的消息。每当有新消息到达时,RabbitMQ 就会调用这个回调函数,并将消息传递给它。因此,你的消费者代码需要在 DeliverCallback
的实现中编写处理消息的逻辑。
例如,在上面的代码示例中,当接收到消息时,DeliverCallback
的实现会打印出消息内容,模拟一些处理过程(在这个例子中是等待两秒),然后发送一个确认信号给 RabbitMQ,告诉它消息已经被成功处理。
需要注意的是,DeliverCallback
的实现应该尽可能快地处理消息并发送确认信号,以避免消息在队列中堆积。如果处理消息的过程非常耗时,或者有可能失败,你可能需要考虑使用更复杂的错误处理机制,比如重试逻辑、死信队列等。
channel.basicConsume(TASK_QUEUEN_NAME, false, new DeliverCallback() { @Override public void handle(String s, Delivery delivery) throws IOException { String message = new String(delivery.getBody(), "UTF-8"); System.out.println(" [x] Received '" + message + "'"); try { doWork(message); } finally { System.out.println(" [x] Done"); channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); } } }, new CancelCallback() { @Override public void handle(String s) throws IOException { System.out.println("c1 消费者接收到 取消接口消费回调逻辑"); } });
至于multiple, 批量应答以减少网络拥堵:
处理消息:
如果消费者由于某些原因失去连接(其通道已关闭,连接已关闭或TCP连接丢失),导致消息未发送ACK确认,RabbitMQ将了解到消息未完全处理,并将对其重新排队。如果此时其他消费者可以处理,它将很快将其重新分发给另一个消费者。这样,即使某个消费者偶尔死亡,也可以确保不会丢失任何消息
发布确认
为了保证消息被安全的发送给broker, 也就是RabbitMQ队列, 你应该需要使用到一些策略, 来让发布确认生效.
开启发布确认模式
Channel channel = connection.createChannel(); channel.confirmSelect();
发布确认模式是, AMQP 0.9.1 协议对于RabbitMQ的扩展, 所以发布确认模式不是默认启动的, 发布确认需要再channel 频道开启, 使用上述的confirmSelect()方法来开启发布确认.
开启发布确认之后, producer每次发送消息之后, 都会遵循相应的确认策略, 可以单个确认, 也可以批量确认, 下面是发布确认的一些常用确认方法:
- waitForConfirms()
这个方法会阻塞当前线程,直到自上次调用此方法以来发布的所有消息都被Broker确认(ack)或拒绝(nack)。如果没有设置超时时间,它可能会无限期地等待,直到所有消息都被处理。
返回类型是boolean
,但在大多数情况下,此方法可能会因为阻塞而不返回任何值。实际上,其返回值的意义可能取决于具体的RabbitMQ客户端库实现,但通常这种同步等待方法不会使用其返回值来进行流控制或错误处理。
2. waitForConfirms(long timeout)
与上一个方法类似,这个方法也会阻塞当前线程,等待Broker对消息的确认或拒绝。但是,它接受一个超时参数timeout
,表示等待的最大时间(以毫秒为单位)。如果在指定的超时时间内Broker没有对所有消息进行确认或拒绝,那么该方法将停止等待并返回。
返回类型是boolean
,但同样,返回值的意义可能取决于具体的RabbitMQ客户端库实现。通常,如果所有消息都在超时前得到了确认,则返回true
;如果超时了,则返回false
。
3. waitForConfirmsOrDie()
这个方法的行为与waitForConfirms()
类似,也会阻塞当前线程,等待Broker对所有消息进行确认或拒绝。但是,如果Broker没有对所有消息进行确认或拒绝,那么这个方法不会返回,而是会抛出异常(通常是运行时异常),导致当前线程终止。
由于这个方法可能导致线程终止,因此它通常用于那些对消息确认有严格要求的场景,并且愿意在消息未得到确认时让整个程序失败。
4. waitForConfirmsOrDie(long timeout)
这个方法结合了waitForConfirms(long timeout)
和waitForConfirmsOrDie()
的特点。它会在指定的超时时间内等待Broker对所有消息进行确认或拒绝。如果超时时间到了,而Broker还没有对所有消息进行确认或拒绝,那么这个方法会抛出异常,导致当前线程终止。
这种方法在需要确保消息被处理但又不想无限期等待的情况下非常有用。它允许设置一个合理的超时时间,以便在消息处理失败时能够及时地采取其他措施。
单独的发送消息
while (thereAreMessagesToPublish()) { byte[] body = ...; BasicProperties properties = ...; channel.basicPublish(exchange, queue, properties, body); // uses a 5 second timeout channel.waitForConfirmsOrDie(5_000); }
上面的例子中我们发送了一个消息, 然后等待他的确认(waitForConfirmsOrDie(5_000)), 这个方法将会再消息得到队列的确认之后返回, 如果消息没有在指定time内确认, 或者是由于某些原因队列无法返回确认消息(比如网络原因) , 那么该方法就会抛出异常, 这种异常的处理一般是记录日志, 或者重新将消息发送.
不同的客户端库拥有不同的方法区同步处理发布者确认模式, 所以确保仔细阅读你所使用的客户端的文件.
缺点:
这种方法虽然是很简便的, 但是也有一些主要的缺点, 它大大降低了发布者发布的效率, 因为一个消息的确认, 阻止了发布随后将要发布的所有消息.这种方法, 将不能提供每秒发送几百条消息的吞吐量, 但是这种方法对于某一些应用来说, 还是很不错的, 足够支持一个应用了.
批量发布
int batchSize = 100; int outstandingMessageCount = 0; while (thereAreMessagesToPublish()) { byte[] body = ...; BasicProperties properties = ...; channel.basicPublish(exchange, queue, properties, body); outstandingMessageCount++; if (outstandingMessageCount == batchSize) { channel.waitForConfirmsOrDie(5_000); outstandingMessageCount = 0; } } if (outstandingMessageCount > 0) { channel.waitForConfirmsOrDie(5_000); }
对比上面的一次性发布确认, 可以看到这个java代码有很大的不同, 首先, 他不并不是每一次循环都进行一个等待确认, 而是当 outstandingMessageCount == batchSize 这个条件成立再进行确认.
等待批量发送的消息被确认, 这个提高了吞吐量(对比于单独确认), 差不多时单独确认的20 ~ 30倍的效率提升, 但是他的一个缺点就是, 我们不能明确知道在失败的情况中, 是什么原因造成这种失败. 所以我们需要让整个批量发送维护在内存中来记录一些有用的东西, 或者重新发送该消息, 并且这种方法依然是同步的, 也就是在等待确认的时候, 会阻塞当前线程, 也就会阻止当前线程继续publish消息.
异步确认
Channel channel = connection.createChannel(); channel.confirmSelect(); channel.addConfirmListener((sequenceNumber, multiple) -> { // code when message is confirmed }, (sequenceNumber, multiple) -> { // code when message is nack-ed });
broker异步的确认发送过来的消息, 仅仅只需要在客户端上注册一个回调函数, 来监视这些确认信息.
这里有两个回调, 一个是已经确认的消息, 一个是被拒绝的消息(你可以理解为被RabbitMQ丢弃的消息), 每一次回调都有两个参数:
- sequenceNumber: 序列号, 这个序列号码用来标记被确认或者被拒绝的消息,
- multiple : boolean类型的数据, 如果为false, 那么仅仅是一个消息被确认或者拒绝. 如果为true, 所有的小于等于sequenceNumber的消息都会被确认或者拒绝.
每个消息在发布之前, 你可以通过下面的方法来获取到序列号:
int sequenceNumber = channel.getNextPublishSeqNo()); ch.basicPublish(exchange, queue, properties, body);
你可以使用这个序列号来找到对应的被拒绝或者是被确认的消息, 然后做出相关的处理操作. 但是在此之前, 你应该首先维护一个 key -value 的map, 以便记录sequenceNumber和对应消息的关联.
下面是一些代码案例:
ConcurrentNavigableMap<Long, String> outstandingConfirms = new ConcurrentSkipListMap<>(); // ... code for confirm callbacks will come later String body = "..."; outstandingConfirms.put(channel.getNextPublishSeqNo(), body); channel.basicPublish(exchange, queue, properties, body.getBytes());
如何使用这个ConcurrentSkipListMap?
ConcurrentNavigableMap<Long, String> outstandingConfirms = new ConcurrentSkipListMap<>(); ConfirmCallback cleanOutstandingConfirms = (sequenceNumber, multiple) -> { if (multiple) { ConcurrentNavigableMap<Long, String> confirmed = outstandingConfirms.headMap( sequenceNumber, true ); confirmed.clear(); } else { outstandingConfirms.remove(sequenceNumber); } }; channel.addConfirmListener(cleanOutstandingConfirms, (sequenceNumber, multiple) -> { String body = outstandingConfirms.get(sequenceNumber); System.err.format( "Message with body %s has been nack-ed. Sequence number: %d, multiple: %b%n", body, sequenceNumber, multiple ); cleanOutstandingConfirms.handle(sequenceNumber, multiple); }); // ... publishing code
相关的实体类说明
Delivery消息体
源码:
public class Delivery { private final Envelope _envelope; private final AMQP.BasicProperties _properties; private final byte[] _body; public Delivery(Envelope envelope, AMQP.BasicProperties properties, byte[] body) { this._envelope = envelope; this._properties = properties; this._body = body; } public Envelope getEnvelope() { return this._envelope; } public AMQP.BasicProperties getProperties() { return this._properties; } public byte[] getBody() { return this._body; } }
解析:
这个Delivery
类是在RabbitMQ的Java客户端中使用的,用于封装从RabbitMQ服务器接收到的消息。下面我将详细解释类中的参数和方法的作用:
参数:
Envelope envelope
:
- 作用:这个参数包含了消息的元数据,比如消息的
deliveryTag
(投递标签)、exchange
(交换机)名称、routingKey
(路由键)等。 - 重要字段:
deliveryTag
是一个唯一的标识符,用于确认(ack)或拒绝(nack)特定的消息。
AMQP.BasicProperties properties
:
- 作用:这个参数包含了消息的附加属性,比如消息的内容类型、消息头部、消息的优先级、消息的发布和过期时间等。
- 重要字段:
contentType
表示消息的内容类型(例如,text/plain
或application/json
),headers
可以包含自定义的键值对,用于传递额外的信息。
byte[] body
:
- 作用:这个参数包含了消息的实际内容。在RabbitMQ中,消息的内容被表示为字节数组,这意味着你可以发送任何类型的数据,只要你能将其转换为字节。
- 处理方式:通常,你需要根据
properties
中的contentType
字段来确定如何解析这个字节数组。例如,如果contentType
是text/plain
,你可能需要将其转换为字符串;如果是application/json
,你可能需要将其解析为JSON对象。
方法:
public Envelope getEnvelope()
:
- 作用:这个方法返回消息的元数据(
Envelope
对象)。通过这个方法,你可以获取到消息的deliveryTag
,进而在处理完消息后进行确认或拒绝操作。
public AMQP.BasicProperties getProperties()
:
- 作用:这个方法返回消息的附加属性(
AMQP.BasicProperties
对象)。你可以使用这个方法获取到消息的contentType
、headers
等字段,以便正确地解析和处理消息内容。
public byte[] getBody()
:
- 作用:这个方法返回消息的实际内容(字节数组)。你需要根据
getProperties()
返回的属性来确定如何解析这个字节数组。
使用场景:
当你在RabbitMQ的Java客户端中消费消息时,RabbitMQ服务器会将消息封装为一个Delivery
对象,并通过DeliverCallback
回调给你。你可以在回调中处理这个消息,例如解析消息内容、执行业务逻辑,并在处理完后通过channel.basicAck
方法发送确认。
总之,Delivery
类及其参数和方法在RabbitMQ的Java客户端中起到了封装和传递消息的作用,使得开发者能够方便地获取和处理从RabbitMQ服务器接收到的消息。
RabbitMQ Tutorial by Java(2)https://developer.aliyun.com/article/1517445