一文带大家快速掌握RabbitMQ!(三)

简介: 一文带大家快速掌握RabbitMQ!

一文带大家快速掌握RabbitMQ!(二)https://developer.aliyun.com/article/1624434


Return消息机制

用于处理一些不可路由的消息。

基础API

有一个关键配置项:Mandatory:true,则监听器会接收到路由不可达的消息,然后进行处理;false,Broker会自动删除该消息

默认为false,当我们使用Return 消息机制的时候,我们需要将它设置为true

消息的生产者通过制定Exchange和RoutingKey,把消息投递到某一个队列中,消费者监听队列,进行消费。

但在一些情况下,发送消息时,Exchange不存在或RoutingKey路由不到,Return Listener就会监听这种不可达的消息,然后进行处理。

Return Listener代码

public class ReturnProducer {
    private static final String EXCHANGE_NAME = "return_exchange";
    private static final String ROUTING_KEY = "return.key";
    private static final String ROUTING_KEY_ERROR = "wrong.key";
    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("192.168.58.129");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/test");
        Connection connection = connectionFactory.newConnection();
        Channel channel = connection.createChannel();
        // 消息
        String msg = "Send message of return demo";
        // 添加并设置Return监听器
        channel.addReturnListener(new ReturnListener() {
            @Override
            public void handleReturn(int replyCode, String replyText, String exchange, String routingKey, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.err.println("============ handleReturn ============");
                System.err.println("replyCode —— " + replyCode);
                System.err.println("replyText —— " + replyText);
                System.err.println("exchange —— " + exchange);
                System.err.println("routingKey —— " + routingKey);
                System.err.println("properties —— " + properties);
                System.err.println("body —— " + new String(body));
            }
        });
        // 设置Mandatory为true, 可以进行后续处理, 不会删除消息。
        // channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, true,null, msg.getBytes());
        // 发送消息
        channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY_ERROR, true, null, msg.getBytes());
    }
}

消费端自定义监听

自定义监听的原因:

  • 我们一般就是在代码中编写while循环,进行consumer.nextDelivery方法进行获取下一条消息,然后进行消费处理!
  • 但是我们使用自定义的Consumer更加的方便,解耦性更加的强,也是在实际工作中最常用的使用方式!
  • 非常简单,消费者只需要继承DefaultConsumer类,然后重写handleDelivery方法即可;

继承DefaultConsumer的此类被写出后,需要进行绑定。(在交换机绑定时绑定自定义的Consumer);


public class ReturnConsumer {
    private static final String EXCHANGE_NAME = "return_exchange";
    private static final String ROUTING_KEY = "return.#";
    private static final String QUEUE_NAME = "return_queue";
    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("192.168.58.129");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/test");
        Connection connection = connectionFactory.newConnection();
        Channel channel = connection.createChannel();
        // 绑定交换机与队列, 指定路由键
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC, true);
        channel.queueDeclare(QUEUE_NAME, true, false, false, null);
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY);
        DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("Receive Message —— " + new String(body));
            }
        };
        channel.basicConsume(QUEUE_NAME, true, defaultConsumer);
    }
}

消费端限流

当巨量消息瞬间全部推送时,单个客户端无法同时处理这些数据,服务器容易故障。因此要进行消费端限流

RabbitMQ提供了一种Qos(服务质量保证)功能,即在非自动确认前提下,如果一定数目的消息未被确认前(通过consume或者channel设置Qos值),不进行消费新消息。


void basicQos(int prefetchSize, int prefetchCount, boolean global) throws IOException;

prefetchSize:消息限制大小,一般为0,不做限制。

prefetchCount:一次处理消息的个数,一般设置为1

global:一般为false。true,在channel级别做限制;false,在consumer级别做限制


public class QosConsumer {
    private static final String EXCHANGE_NAME = "qos_exchange";
    private static final String ROUTING_KEY = "qos.#";
    private static final String QUEUE_NAME = "qos_queue";
    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("192.168.58.129");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/test");
        connectionFactory.setUsername("orcas");
        connectionFactory.setPassword("1224");
        Connection connection = connectionFactory.newConnection();
        Channel channel = connection.createChannel();
        // 绑定交换机与队列, 指定路由键
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC, true);
        channel.queueDeclare(QUEUE_NAME, true, false, false, null);
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY);
        DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("Receive Message —— " + new String(body));
                // 手动ack签收
                channel.basicAck(envelope.getDeliveryTag(), false); // 不批量签收
            }
        };
        /**
         *  prefetchSize: 0 不限制消息大小
         *  prefetchCount: 一次处理消息的个数, ack后继续推送
         *  global: false 应用在consumer级别
         */
        channel.basicQos(0, 1, false);
        //限流:autoAck需设置为false, 关闭自动签收
        channel.basicConsume(QUEUE_NAME, false, defaultConsumer);
    }
}

限流需要设置channel.basicQos(0, 1, false);

关闭autoAck,且需要手动签收。

在重写的handleDelivery方法中,如果没有进行手动签收channel.basicAck(),那么消费端在接收消息时,因为prefetchCount设置为1,只会接收1条消息,剩下的消息的等待中,并不会被推送,直到手动ack后。

队列

消费端ACK与重回队列机制

消费端的手工ACK和NACK:

消费端进行消费时,可能由于业务异常,会调用NACK拒绝确认,而到了一定次数,就直接ACK,将异常消息进行日志的记录,然后进行补偿。

由于服务器宕机等严重问题,消费端没消费成功,重发消息后,需要手工ACK保障消费端消费成功。

消费端的重回队列:

将没有处理成功的消息重新回递给Broker。

一般在实际应用中,会关闭重回队列。

TTL队列

TTL:Time To Live,生存时间。

可以指定消息的过期时间。

可以指定队列的过期时间,从消息入队列开始计算,超过了队列的超时时间设置,那么消息会自动清除


AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
                    .deliveryMode(2)
                    .expiration("10000")
                    .build();

死信队列

DLX:Dead-Letter-Exchange

当消息在队列中变成死信时,能被重新publish到另一个Exchange,该Exchange就是DLX

发生死信队列的情况:

  • 消息被拒绝(basic.reject/ basic.nack)并且requeue=false(没有重回队列)
  • 消息TTL过期
  • 队列达到最大长度

死信队列的设置:

  1. 正常声明交换机,队列并绑定,需要在队列上设置一个参数:arguments.put("x-dead-letter-exchange", "dlx.exchange");
  2. 声明死信队列的Exchange和Queue,然后进行绑定:Exchange: dlx.exchange``Queue: dlx.queue``RoutingKey: #
  3. 在消息过期、requeue、队列达到最大长度时(即为死信),消息会发送到指定的dlx.exchange交换机上,消费者会监听该交换机所绑定的死信队列。


public class DlxConsumer {
    private static final String EXCHANGE_NAME = "dlx_exchange";
    private static final String ROUTING_KEY = "dlx.#";
    private static final String QUEUE_NAME = "dlx_queue";
 // DLX
    private static final String DLX_EXCHANGE = "dlx.exchange";
    private static final String DLX_QUEUE = "dlx.queue";
    private static final String DLX_ROUTING_KEY = "#";
    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("192.168.58.129");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/test");
        connectionFactory.setUsername("orcas");
        connectionFactory.setPassword("1224");
        Connection connection = connectionFactory.newConnection();
        Channel channel = connection.createChannel();
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC, true);
        // 1. 设置死信队列的参数
        Map<String, Object> arguments = new HashMap<>();
        arguments.put("x-dead-letter-exchange", DLX_EXCHANGE);
        channel.queueDeclare(QUEUE_NAME, true, false, false, arguments);
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY);
        // 2. 声明死信队列
        channel.exchangeDeclare(DLX_EXCHANGE, BuiltinExchangeType.TOPIC, true, false, null);
        channel.queueDeclare(DLX_QUEUE, true, false, false, null);
        channel.queueBind(DLX_QUEUE, DLX_EXCHANGE, DLX_ROUTING_KEY);
        DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("Receive Message —— " + new String(body));
                // 手动ack签收
                channel.basicAck(envelope.getDeliveryTag(), false); // false 不批量签收
            }
        };
        channel.basicConsume(QUEUE_NAME, false, defaultConsumer);
    }
}

最后

文章内容收录到个人网站,方便阅读hardyfish.top/

参考和鸣谢:

官网:www.rabbitmq.com/

视频:RabbitMQ消息中间件技术精讲

相关实践学习
快速体验阿里云云消息队列RocketMQ版
本实验将带您快速体验使用云消息队列RocketMQ版Serverless系列实例进行获取接入点、创建Topic、创建订阅组、收发消息、查看消息轨迹和仪表盘。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
SQL NoSQL 数据库
SpringCloud基础6——分布式事务,Seata
分布式事务、ACID原则、CAP定理、Seata、Seata的四种分布式方案:XA、AT、TCC、SAGA模式
SpringCloud基础6——分布式事务,Seata
|
运维 资源调度 Kubernetes
阿里巴巴超大规模Kubernetes基础设施运维体系揭秘
在过去近三年时间,Kubernetes架构在阿里巴巴集团经历了飞速发展:既支持了集团统一调度超大规模场景,又支持了大批云上云产品用户,还在支持OXS区容器化。经过这些年的发展,我们也逐渐锤炼出了K8S Serverless全托管稳定性运维体系。这些运维和稳定性能力不是K8S原生就具备的,而是在大量实践和失败过程中的沉淀出来的系统稳定性加固能力。
阿里巴巴超大规模Kubernetes基础设施运维体系揭秘
|
数据采集 人工智能 机器人
RPA与爬虫:自动化工具的本质差异与选择指南
本文深入解析RPA与爬虫的本质差异,帮助企业根据业务需求明智选型。RPA侧重内部流程自动化,爬虫专注外部数据采集。内容涵盖技术原理、应用场景、优劣势对比及主流RPA工具介绍,助力把握自动化趋势,提升效率。
472 0
|
11月前
|
安全 Go
用 Zap 轻松搞定 Go 语言中的结构化日志
在现代应用程序开发中,日志记录至关重要。Go 语言中有许多日志库,而 Zap 因其高性能和灵活性脱颖而出。本文详细介绍如何在 Go 项目中使用 Zap 进行结构化日志记录,并展示如何定制日志输出,满足生产环境需求。通过基础示例、SugaredLogger 的便捷使用以及自定义日志配置,帮助你在实际开发中高效管理日志。
323 1
|
8月前
|
人工智能 监控 开发者
阿里云PAI发布DeepRec Extension,打造稳定高效的分布式训练,并宣布开源!
阿里云PAI发布DeepRec Extension,打造稳定高效的分布式训练,并宣布开源!
146 0
|
12月前
|
消息中间件 数据库 存储
一文带大家快速掌握RabbitMQ!(二)
一文带大家快速掌握RabbitMQ!
一文带大家快速掌握RabbitMQ!(二)
|
12月前
|
消息中间件 存储 网络协议
一文带大家快速掌握RabbitMQ!(一)
一文带大家快速掌握RabbitMQ!
111 2
|
12月前
|
SQL 中间件 关系型数据库
那些年,我们在Go中间件上踩过的坑
作者总结了过去在Go中间件上踩过的坑,这些坑也促进了阿里内部Go中间件的完善,希望大家学习本文后,不犯同样的错误。
|
Java
springboot建站脚手架
springboot建站脚手架
368 0
|
监控 Go 开发者
掌握Go语言中的日志管理
【8月更文挑战第31天】
158 0