RabbitMQ 常用 API(二)

简介: RabbitMQ 常用 API(二)

发送消息

  • 方法:
void basicPublish(String exchange, String routingKey, 
                  BasicProperties props, byte[] body) throws IOException;  // 常用
void basicPublish(String exchange, String routingKey, boolean mandatory, 
                  BasicProperties props, byte[] body)throws IOException;
void basicPublish(String exchange, String routingKey, boolean mandatory, boolean immediate, 
                  BasicProperties props, byte[] body)throws IOException;
  • exchange:交换器的名称,指明消息需要发送到哪个交换器中
    如果设置为空字符串,则消息会被发送到 RabbitMQ 默认的交换器中。
  • routingKey: 路由键,交换器根据路由键将消息存储到相应的队列之中
  • props : 消息的基本属性集,其包含14个属性成员,分别有 contentType、contentEncoding、headers(Map<String, object>)、deliveryMode、priority、correlationId、replyTo、expiration、messageId、timestamp、type、userId、appId、clusterId。
  • byte[] body: 消息体(payload),真正需要发送的消息
  • mandatory
  • 为 true 时,如果 exchange 根据自身类型和消息 routingKey 无法找到一个合适的 queue 存储消息,那么 broker 会调用 basic.return 方法将消息返还给生产者;
  • 为 false 时,出现上述情况 broker 会直接将消息丢弃;
  • immediate
  • 为 true 时,如果该消息关联的 queue 上有消费者,则马上将消息投递给它,如果所有 queue 都没有消费者,直接把消息返还给生产者,不用将消息入队列等待消费者了。
  • 为 false 时,出现上述情况 broker 会直接将消息丢弃;
  • 示例:
byte[] messageBodyBytes = "Hello World!".getBytes();
// 示例:发送一条内容为“Hello World!”的消息
channel.basicPublish(exchangeName, routingKey, null, messageBodyBytes);
// 示例:为了更好地控制发送,可以使用mandatory这个参数,或者可以发送一些特定属性的信息
channel.basicPublish(exchangeName, routingKey, mandatory, 
                     MessageProperties.PERSISTENT_TEXT_PLAIN, messageBodyBytes);
// 示例:这条消息的投递模式(delivery mode)设置为2,即消息会被持久化(即存入磁盘)在服务器中。同时这条消息的优先级( priority)设置为1, content-type为“text/plain”。可以自己设定消息的属性。
channel.basicPublish(exchangeName, routingKey, 
    new AMQP.BasicProperties().builder().contentType("text/plain")
            .deliveryMode(2).priority(1).userId("hidden").build(), 
       messageBodyBytes);
// 示例:发送一条带有headers的消息
Map<String, Object> headers = new HashMap<>();
headers.put("localtion", "here");
headers.put("time", "today");
channel.basicPublish(exchangeName, routingKey,
        new AMQP.BasicProperties().builder().headers(headers).build(),
        messageBodyBytes);
// 示例:发送一条带有过期时间(expiration)的消息
byte[] messageBodyBytes = "Hello World!".getBytes();
channel.basicPublish(exchangeName, routingKey, 
    new AMQP.BasicProperties.Builder().expiration("60000").build(), 
        messageBodyBytes); 


消费消息

RabbitMQ 的消费模式分两种:

  • 推(Push)模式
    推模式采用 Basic.Consume 进行消费
  • 拉(Pull)模式
    拉模式调用 Basic.Get 进行消费


推模式

在推模式中,可以通过持续订阅的方式来消费消息。

Channel 类中的常用 basicConsume 方法:

String basicConsume(String queue, Consumer callback) throws IOException;
String basicConsume(String queue, boolean autoAck, Consumer callback) throws IOException;  // 常用
String basicConsume(String queue, boolean autoAck, String consumerTag, 
                    Consumer callback) throws IOException;
String basicConsume(String queue, boolean autoAck, Map<String, Object> arguments, 
                    Consumer callback) throws IOException;
String basicConsume(String queue, boolean autoAck, String consumerTag, 
                    boolean noLocal, boolean exclusive, 
                    Map<String, Object> arguments, Consumer callback) throws IOException;
String basicConsume(String queue, boolean autoAck, Map<String, Object> arguments, 
                    DeliverCallback deliverCallback, CancelCallback cancelCallback, 
                    ConsumerShutdownSignalCallback shutdownSignalCallback) throws IOException;
  • queue:消息队列的名称
  • autoAck:是否自动确认(为了确保消息不会丢失,RabbitMQ 支持消息应答)
    建议设成 false,即不自动确认。消费者在已经接收并且处理完毕消息后调用 channel.basicAck 来确认消息已被成功接收(发送一个消息应答,告诉 RabbitMQ 这个消息已经接收并且处理完毕了,RabbitMQ 可以删除它了)
  • consumerTag:消费者标签,用来区分多个消费者
    当调用与 Consumer 相关的 API 方法时,不同的订阅采用不同的消费者标签(consumerTag)来区分彼此,在同一个 Channel 中的消费者也需要通过唯一的消费者标签以作区分。
  • noLocal:为 true 则表示不能将同一个 Connection 中生产者发送的消息传送给这个 Connection 中的消费者
  • exclusive:设置是否排他
  • arguments:设置消费者的其他参数
  • deliverCallback:(消息传递时回调)设置消费者的回调参数。
    用来处理 RabbitMQ 推送过来的消息,比如 DefaultConsumer,使用时需要客户端重写其中的方法
  • cancelCallback:消费者被取消时回调
  • shutdownSignalCallback:当通道/连接关闭时回调
  • consumer:设置消费者的回调函数用来处理RabbitMQ 推送过来的消息,接收消息一般通过实现com.rabbitmq.client.Consumer 接口或者继承com.rabbitmq.client.DefaultConsumer类来实现。对于消费者客户端来说重写 DefaultConsumer 类的 handleDelivery 方法是十分方便的。更复杂的消费者客户端会重写更多的方法,具体如下:
// 在其他方法之前调用
public void handleConsumeOk(String consumerTag)
// 在显示地或者隐式的取消订阅时调用
public void handleCancelOk(String consumerTag)
public void handleCancel(String consumerTag) throws IOException
// 当Channel或者Connection关闭的时候会调用
public void handleShutdownSignal(String consumerTag, ShutdownSignalException sig)
public void handleRecoverOk(String consumerTag)
  • 注:
  • 可以通过 channel.casicCancel 方法来显示的取消一个消费者的订阅:
channel.basicCancel(consumerTag);
  • 该行代码会首先触发 handleConsumerOk 方法,之后触发 handleDelivery 方法,最后才触发 handleCancelOk 方法


和生产者一样,消费者客户端同样需要考虑线程安全的问题。消费者客户端的这些 callback 会被分配到与 Channel 不同的线程池上,这意味着消费者客户端可以安全地调用这些阻塞方法,比如 channel.queueDeclare、channel.basicCancel 等。

每个 Channel 都拥有自己独立的线程。最常用的做法是一个 Channel 对应一个消费者,也就是意味着消费者彼此之间没有任何关联。当然也可以在一个 Channel 中维持多个消费者,但是要注意一个问题,如果 Channel 中的一个消费者一直在运行,那么其他消费者的 callback 会被“耽搁”。

示例:

boolean autoAck = false;
channel.basicQos(64);
String consumerTag = "myConsumerTag";
channel.basicConsume(queueName, autoAck, consumerTag,
        new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String routingKey = envelope.getRoutingKey();
                String contentType = properties.getContentType();
                //process the message components here ...
                channel.basicAck(envelope.getDeliveryTag(), false);
            }
        });

注意:上面代码中显式地设置 autoAck=false,然后在接收到消息之后进行显式 ack 操作(channel.basicAck ),对于消费者来说这个设置是非常必要的,可以防止消息不必要地丢失。


拉模式

拉模式的消费方式:通过 channel.basicGet 方法可以单条地获取消息,其返回值是 GetRespone

Channel 类的 basicGet 方法:

GetResponse basicGet (String queue, boolean autoAck).throws IOException;
  • queue :队列的名称
  • autoAck:是否自动确认(为了确保消息不会丢失,RabbitMQ 支持消息应答)
    建议设成 false,即不自动确认。消费者在已经接收并且处理完毕消息后调用 channel.basicAck 来确认消息已被成功接收(发送一个消息应答,告诉 RabbitMQ 这个消息已经接收并且处理完毕了,RabbitMQ 可以删除它了)

示例:

GetResponse getResponse = channel.basicGet(QUEUE_NAME, false);
System.out.println(new String(getResponse.getBody()));
channel.basicAck(getResponse.getEnvelope().getDeliveryTag(), false);

图示:


推 | 拉模式 总结
  • Basic. Consume 将信道(Channel)置为接收模式,直到取消队列的订阅为止。
    在接收模式期间,RabbitMQ 会不断地推送消息给消费者,当然推送消息的个数还是会受到 Basic.Qos 的限制。
  • 如果只想从队列获得单条消息而不是持续订阅,建议使用 Basic.Get 进行消费。
    但是不能将 Basic.Get 放在一个循环里来代替 Basic.Consume,这样做会严重影响 RabbitMQ 的性能。如果要实现高吞吐量,消费者理应使用 Basic. Consume 方法。


消费端的确认与拒绝

为了保证消息从队列可靠地达到消费者,RabbitMQ 提供了消息确认机制(messageacknowledgement)。消费者在订阅队列时,可以指定 autoAck 参数,当 autoAck 等于 false 时,RabbitMQ 会等待消费者显式地回复确认信号后才从内存(或者磁盘)中移去消息(实质上是先打上删除标记,之后再删除)。当 autoAck 等于 true 时,RabbitMQ 会自动把发送出去的消息置为确认,然后从内存(或者磁盘)中删除,而不管消费者是否真正地消费到了这些消息。

采用消息确认机制后,只要设置 autoAck 参数为 false,消费者就有足够的时间处理消息(任务),不用担心处理消息过程中消费者进程挂掉后消息丢失的问题,因为 RabbitMQ 会一直等待持有消息直到消费者显式调用 Basic.Ack 命令为止。

当 autoAck 参数置为 false,对于 RabbitMQ 服务端而言,队列中的消息分成了两个部分:

  • 一部分是等待投递给消费者的消息
  • 一部分是已经投递给消费者,但是还没有收到消费者确认信号的消息
    如果 RabbitMQ 一直没有收到消费者的确认信号,并且消费此消息的消费者已经断开连接,则 RabbitMQ 会安排该消息重新进入队列,等待投递给下一个消费者,当然也有可能还是原来的那个消费者。
    RabbitMQ 不会为未确认的消息设置过期时间,它判断此消息是否需要重新投递给消费者的唯一依据是消费该消息的消费者连接是否已经断开,这么设计的原因是 RabbitMQ 允许消费者消费一条消息的时间可以很久很久。

RabbtiMQ的Web 管理平台上可以看到当前队列中的“Ready”状态和“Unacknowledged”状态的消息数,分别对应上文中的等待投递给消费者的消息数和已经投递给消费者但是未收到确认信号的消息数:


限制未确认消息数量
void basicQos(int prefetchCount) 
void basicQos(int prefetchCount, boo1ean global) 
void basicQos(int prefetchSize, int prefetchCount, boo1ean global) 
  • prefetchSize:消费者所能接收未确认消息的总体大小的上限,单位为 B
  • prefetchCount:限制信道上的消费者所能保持的最大未确认消息的数量
    即一旦超过指定个数的消息还没有 ack(确认),则该 consumer 将 block 掉,直到有消息 ack
  • global:是否将上面设置应用于 channel,简单点说,就是上面限制是 channel 级别的还是 consumer 级别


拒绝消息

在消费者接收到消息后,如果想明确拒绝当前的消息而不是确认,RabbitMQ 在 2.0.0 版本开始引入了Basic.Relject 这个命令,消费者客户端可以调用与其对应的 channel. basicReject 方法来告诉 RabbitMQ 拒绝这个消息。

Channel 类中的 API 方法定义如下:

// 拒绝一条消息
void basicReject(long deliveryTag, boolean requeue) throws IOException;
// 批量拒绝消息
void basicNack(long deliveryTag, boolean multiple, boolean requeue) throws IOException;
  • deliveryTag 参数:消息的编号,它是一个 64 位的长整型值,最大值是 9223372036854775807
  • requeue参数:是否重新将这条消息存入队列
  • 设置为 true,则 RabbitMQ 会重新将这条消息存入队列,以便可以发送给下一个订阅的消费者
  • 设置为 false,则 RabbitMQ 立即会把消息从队列中移除,而不会把它发送给新的消费者
  • multiple参数:是否批量拒绝消息
  • 设置为 false,则表示拒绝编号为 deliveryTag 的这一条消息,这时候 basicNack 和 basicReject 方法一样
  • 设置为 true,则表示拒绝 deliveryTag 编号之前所有未被当前消费者确认的消息
  • 注:将 channel.basicReject 或者 channel.basicNack 中的 requeue 设置为 false,可以启用“死信队列”的功能。死信队列可以通过检测被拒绝或者未送达的消息来追踪问题。


重新发送未确认的消息

对于 requeue,AMQP 中的 Basic. Recover 命令具备可重入队列的特性,可以用来请求 RabbitMQ 重新发送还未被确认的消息。

其对应的客户端方法为:

Basic.RecoverOk basicRecover() throws IOException;
Basic.RecoverOk basicRecover(boolean requeue) throws IOException;
  • requeue 参数:是否将未被确认的消息分配给与之前相同的消费者
  • 设置为 true,则未被确认的消息会被重新加入到队列中,对于同一条消息来说,可能会被分配给与之前不同的消费者
  • 设置为 false,那么同一条消息会被分配给与之前相同的消费者
  • 默认情况下,如果不设置 requeue 参数,requeue 默认为 true


参考

相关实践学习
RocketMQ一站式入门使用
从源码编译、部署broker、部署namesrv,使用java客户端首发消息等一站式入门RocketMQ。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
8月前
|
消息中间件 Java 测试技术
SpringBoot整合RabbitMQ图文过程以及RabbitTemplate常用API介绍
SpringBoot整合RabbitMQ图文过程以及RabbitTemplate常用API介绍
107 0
|
8月前
|
消息中间件 存储 安全
RabbitMQ 常用 API(一)
RabbitMQ 常用 API(一)
102 0
|
2天前
|
消息中间件 API RocketMQ
RocketMQ-初体验RocketMQ(07)-使用API操作RocketMQ_顺序消息 ordermessage
RocketMQ-初体验RocketMQ(07)-使用API操作RocketMQ_顺序消息 ordermessage
32 0
|
2天前
|
消息中间件 存储 Java
RocketMQ-初体验RocketMQ(06)-使用API操作RocketMQ ,理解RocketMQ的存储结构
RocketMQ-初体验RocketMQ(06)-使用API操作RocketMQ ,理解RocketMQ的存储结构
177 0
|
6月前
|
消息中间件 API 数据安全/隐私保护
使用 REST API 操作 RabbitMQ(二)
使用 REST API 操作 RabbitMQ
|
6月前
|
消息中间件 JSON API
使用 REST API 操作 RabbitMQ(一)
使用 REST API 操作 RabbitMQ
181 0
|
消息中间件 运维 Cloud Native
RocketMQ 5.0 API 与 SDK 的演进
RocketMQ 5.0 SDK 采用了全新的 API,使用 gRPC 作为通信层的实现,并在可观测性上做了很大幅度的提升。
369 0
RocketMQ 5.0 API 与  SDK 的演进
|
消息中间件 缓存 运维
RocketMQ 5.0 API 与 SDK 的演进
RocketMQ 5.0 SDK 采用了全新的 API,使用 gRPC 作为通信层的实现,并在可观测性上做了很大幅度的提升。
RocketMQ 5.0 API 与 SDK 的演进
EMQ
|
消息中间件 缓存 运维
NanoMQ Newsletter 2022-08|v0.11:MQTT 5.0 + MQTT over QUIC 桥接,新增 HTTP API 监控客户端状态
八月,0.11.0版本发布:增加了MQTT 5.0+MQTT over QUIC桥接模式,新增和修复了对已连接客户端状态进行监控和查询的HTTP API。
EMQ
272 0
NanoMQ Newsletter 2022-08|v0.11:MQTT 5.0 + MQTT over QUIC 桥接,新增 HTTP API 监控客户端状态
EMQ
|
边缘计算 监控 物联网
NanoMQ Newsletter 2022-07|v0.10:多路桥接、HTTP 发布 MQTT 消息 API、NanoSDK 支持 MQTT 5.0
v0.10.0已于8月初正式发布,此版本主要增强了桥接功能,新增了发布消息的HTTP API,同时还为NanoSDK增加了MQTT 5.0支持。
EMQ
271 0
NanoMQ Newsletter 2022-07|v0.10:多路桥接、HTTP 发布 MQTT 消息 API、NanoSDK 支持 MQTT 5.0

热门文章

最新文章