一文带大家快速掌握RabbitMQ!(三)

本文涉及的产品
容器服务 Serverless 版 ACK Serverless,952元额度 多规格
容器服务 Serverless 版 ACK Serverless,317元额度 多规格
简介: 一文带大家快速掌握RabbitMQ!

一文带大家快速掌握RabbitMQ!(二)https://developer.aliyun.com/article/1624434


Return消息机制

用于处理一些不可路由的消息。

基础API

有一个关键配置项:Mandatory:true,则监听器会接收到路由不可达的消息,然后进行处理;false,Broker会自动删除该消息

默认为false,当我们使用Return 消息机制的时候,我们需要将它设置为true

消息的生产者通过制定Exchange和RoutingKey,把消息投递到某一个队列中,消费者监听队列,进行消费。

但在一些情况下,发送消息时,Exchange不存在或RoutingKey路由不到,Return Listener就会监听这种不可达的消息,然后进行处理。

Return Listener代码

public class ReturnProducer {
    private static final String EXCHANGE_NAME = "return_exchange";
    private static final String ROUTING_KEY = "return.key";
    private static final String ROUTING_KEY_ERROR = "wrong.key";
    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("192.168.58.129");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/test");
        Connection connection = connectionFactory.newConnection();
        Channel channel = connection.createChannel();
        // 消息
        String msg = "Send message of return demo";
        // 添加并设置Return监听器
        channel.addReturnListener(new ReturnListener() {
            @Override
            public void handleReturn(int replyCode, String replyText, String exchange, String routingKey, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.err.println("============ handleReturn ============");
                System.err.println("replyCode —— " + replyCode);
                System.err.println("replyText —— " + replyText);
                System.err.println("exchange —— " + exchange);
                System.err.println("routingKey —— " + routingKey);
                System.err.println("properties —— " + properties);
                System.err.println("body —— " + new String(body));
            }
        });
        // 设置Mandatory为true, 可以进行后续处理, 不会删除消息。
        // channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, true,null, msg.getBytes());
        // 发送消息
        channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY_ERROR, true, null, msg.getBytes());
    }
}

消费端自定义监听

自定义监听的原因:

  • 我们一般就是在代码中编写while循环,进行consumer.nextDelivery方法进行获取下一条消息,然后进行消费处理!
  • 但是我们使用自定义的Consumer更加的方便,解耦性更加的强,也是在实际工作中最常用的使用方式!
  • 非常简单,消费者只需要继承DefaultConsumer类,然后重写handleDelivery方法即可;

继承DefaultConsumer的此类被写出后,需要进行绑定。(在交换机绑定时绑定自定义的Consumer);


public class ReturnConsumer {
    private static final String EXCHANGE_NAME = "return_exchange";
    private static final String ROUTING_KEY = "return.#";
    private static final String QUEUE_NAME = "return_queue";
    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("192.168.58.129");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/test");
        Connection connection = connectionFactory.newConnection();
        Channel channel = connection.createChannel();
        // 绑定交换机与队列, 指定路由键
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC, true);
        channel.queueDeclare(QUEUE_NAME, true, false, false, null);
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY);
        DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("Receive Message —— " + new String(body));
            }
        };
        channel.basicConsume(QUEUE_NAME, true, defaultConsumer);
    }
}

消费端限流

当巨量消息瞬间全部推送时,单个客户端无法同时处理这些数据,服务器容易故障。因此要进行消费端限流

RabbitMQ提供了一种Qos(服务质量保证)功能,即在非自动确认前提下,如果一定数目的消息未被确认前(通过consume或者channel设置Qos值),不进行消费新消息。


void basicQos(int prefetchSize, int prefetchCount, boolean global) throws IOException;

prefetchSize:消息限制大小,一般为0,不做限制。

prefetchCount:一次处理消息的个数,一般设置为1

global:一般为false。true,在channel级别做限制;false,在consumer级别做限制


public class QosConsumer {
    private static final String EXCHANGE_NAME = "qos_exchange";
    private static final String ROUTING_KEY = "qos.#";
    private static final String QUEUE_NAME = "qos_queue";
    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("192.168.58.129");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/test");
        connectionFactory.setUsername("orcas");
        connectionFactory.setPassword("1224");
        Connection connection = connectionFactory.newConnection();
        Channel channel = connection.createChannel();
        // 绑定交换机与队列, 指定路由键
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC, true);
        channel.queueDeclare(QUEUE_NAME, true, false, false, null);
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY);
        DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("Receive Message —— " + new String(body));
                // 手动ack签收
                channel.basicAck(envelope.getDeliveryTag(), false); // 不批量签收
            }
        };
        /**
         *  prefetchSize: 0 不限制消息大小
         *  prefetchCount: 一次处理消息的个数, ack后继续推送
         *  global: false 应用在consumer级别
         */
        channel.basicQos(0, 1, false);
        //限流:autoAck需设置为false, 关闭自动签收
        channel.basicConsume(QUEUE_NAME, false, defaultConsumer);
    }
}

限流需要设置channel.basicQos(0, 1, false);

关闭autoAck,且需要手动签收。

在重写的handleDelivery方法中,如果没有进行手动签收channel.basicAck(),那么消费端在接收消息时,因为prefetchCount设置为1,只会接收1条消息,剩下的消息的等待中,并不会被推送,直到手动ack后。

队列

消费端ACK与重回队列机制

消费端的手工ACK和NACK:

消费端进行消费时,可能由于业务异常,会调用NACK拒绝确认,而到了一定次数,就直接ACK,将异常消息进行日志的记录,然后进行补偿。

由于服务器宕机等严重问题,消费端没消费成功,重发消息后,需要手工ACK保障消费端消费成功。

消费端的重回队列:

将没有处理成功的消息重新回递给Broker。

一般在实际应用中,会关闭重回队列。

TTL队列

TTL:Time To Live,生存时间。

可以指定消息的过期时间。

可以指定队列的过期时间,从消息入队列开始计算,超过了队列的超时时间设置,那么消息会自动清除


AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
                    .deliveryMode(2)
                    .expiration("10000")
                    .build();

死信队列

DLX:Dead-Letter-Exchange

当消息在队列中变成死信时,能被重新publish到另一个Exchange,该Exchange就是DLX

发生死信队列的情况:

  • 消息被拒绝(basic.reject/ basic.nack)并且requeue=false(没有重回队列)
  • 消息TTL过期
  • 队列达到最大长度

死信队列的设置:

  1. 正常声明交换机,队列并绑定,需要在队列上设置一个参数:arguments.put("x-dead-letter-exchange", "dlx.exchange");
  2. 声明死信队列的Exchange和Queue,然后进行绑定:Exchange: dlx.exchange``Queue: dlx.queue``RoutingKey: #
  3. 在消息过期、requeue、队列达到最大长度时(即为死信),消息会发送到指定的dlx.exchange交换机上,消费者会监听该交换机所绑定的死信队列。


public class DlxConsumer {
    private static final String EXCHANGE_NAME = "dlx_exchange";
    private static final String ROUTING_KEY = "dlx.#";
    private static final String QUEUE_NAME = "dlx_queue";
 // DLX
    private static final String DLX_EXCHANGE = "dlx.exchange";
    private static final String DLX_QUEUE = "dlx.queue";
    private static final String DLX_ROUTING_KEY = "#";
    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("192.168.58.129");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/test");
        connectionFactory.setUsername("orcas");
        connectionFactory.setPassword("1224");
        Connection connection = connectionFactory.newConnection();
        Channel channel = connection.createChannel();
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC, true);
        // 1. 设置死信队列的参数
        Map<String, Object> arguments = new HashMap<>();
        arguments.put("x-dead-letter-exchange", DLX_EXCHANGE);
        channel.queueDeclare(QUEUE_NAME, true, false, false, arguments);
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY);
        // 2. 声明死信队列
        channel.exchangeDeclare(DLX_EXCHANGE, BuiltinExchangeType.TOPIC, true, false, null);
        channel.queueDeclare(DLX_QUEUE, true, false, false, null);
        channel.queueBind(DLX_QUEUE, DLX_EXCHANGE, DLX_ROUTING_KEY);
        DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("Receive Message —— " + new String(body));
                // 手动ack签收
                channel.basicAck(envelope.getDeliveryTag(), false); // false 不批量签收
            }
        };
        channel.basicConsume(QUEUE_NAME, false, defaultConsumer);
    }
}

最后

文章内容收录到个人网站,方便阅读hardyfish.top/

参考和鸣谢:

官网:www.rabbitmq.com/

视频:RabbitMQ消息中间件技术精讲

相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
6月前
|
消息中间件 存储 负载均衡
|
30天前
|
消息中间件 数据库 存储
一文带大家快速掌握RabbitMQ!(二)
一文带大家快速掌握RabbitMQ!
一文带大家快速掌握RabbitMQ!(二)
|
2月前
|
消息中间件 存储 Java
rabbitmq(一)
rabbitmq(一)
|
5月前
|
消息中间件 网络协议 API
rabbitmq
rabbitmq
48 2
|
6月前
|
消息中间件 大数据 Java
RabbitMQ
RabbitMQ
95 1
|
6月前
|
消息中间件 存储 负载均衡
什么是RabbitMQ?
RabbitMQ是一个开源的消息代理软件,用于在分布式系统中传递消息。它实现了高级消息队列协议(AMQP),提供了一种可靠的、强大的、灵活的消息传递机制,使得不同应用程序或组件之间可以轻松地进行通信。
64 0
|
消息中间件 存储 数据库
RabbitMQ特殊应用
RabbitMQ特殊应用
60 0
|
消息中间件
1. 什么是 RabbitMQ?
1. 什么是 RabbitMQ?
62 0
|
消息中间件 存储 网络协议
rabbitmq的介绍
rabbitMQ是一个开源的AMQP实现的消息队列中间件,服务器端用Erlang语言编写,支持多种客户端,如:Python、Ruby、.NET、Java、C、 用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不错,与SpringAMQP完美的整合、API丰富易用。
|
消息中间件 存储 NoSQL
RabbitMQ(二)
RabbitMQ(二)
244 0
RabbitMQ(二)
下一篇
无影云桌面