RabbitMQ消息队列的原理和实践

简介: RabbitMQ消息队列的原理和实践

本文首先会介绍消息队列(MQ)的基本概念,也会对比目前业界常见的MQ区别,之后会介绍RabbitMQ的核心概念和其实用。

一、消息队列(MQ)介绍和对比

消息队列主要用来在不同系统之间进行消息传递,他是服务间进行异步通信的一种常见方式,RabbitMQ作为一种常见的消息中间件,经常被用在处理异步消息、进行系统间解耦、进行大流量的削峰填谷的作用。

但引入消息中间件同时也会带来一些坏处,那就是增加了系统的复杂性,如果消息中间件不可用,可能就会导致整个系统的不可用,这样就降低了系统的可用性,同时由于RabbitMQ的本身的特性,他在面对消息传递过大的时候,会造成消息积压的问题,这些都是在生产环境使用RabbitMQ需要考虑的问题。

下面是对比业界常用的3种MQ:

优点 缺点 使用场景
kafka 吞吐量非常大, 性能非常好, 集群高可用。 会丢数据, 功能比较单一。 日志分析 , 大数据采集
RabbitMQ 消息可靠性高, 功能全面。 吞吐量比较低, 消息积累会影响性能, erlang语言不好定制。 小规模场景
RocketMQ 高吞吐,高性能,高可用, 功能全面。 开源版功能不如云上版, 官方文档比较简单, 客户端只支持java。 几乎全场景

总的来说,kafka在常见MQ中性能最高,但是存在数据丢失的问题,所以一般用在日志采集中;RabbitMQ的可靠性最高、功能最全面,但是其存在消息积压的问题,所以一般用在需要保障消息不丢失的小规模场景中;RocketMQ汲取了kafka和RabbitMQ的长处,权衡了性能和可靠性,所以适用大多数场景。

二、RabbitMQ基本概念

以下是RabbitMQ的通信模型,总的来说这是一种生产者/消费者的通信模型,生产者和RabbitMQ通过Connection建立连接后,生产者将消息发送给RabbitMQ,消息会直接投递到Exchange,Exchange根据路由信息再转发到对应的Queue中,下游如果关联了消费者,就会直接推送给消费者。

  1. Message 消息:消息是不具名的,它由消息头header和消息体payload组成。消息体是不透明的,而消息头则由一系列的可选属性组成,这些属性包括routing-key(路由键)、priority(相对于其他消息的优先权)、delivery-mode(指出该消息可能需要持久性存储)等。
  2. Producer消息生产者:也是一个向交换器Exchange发布消息的客户端应用程序。
  3. Exchange 交换器:用来接收生产者发送的消息并将这些消息路由给服务器中的队列Queue。
  4. Binding 绑定:用于消息队列Queue和交换器Exchange之间的关联。一个绑定就是基于Routing key将交换器和消息队列连接起来的路由规则,所以可以将交换器理解成一个由绑定构成的路由表。
  5. Queue 消息队列:用来保存消息直到发送给消费者。它是消息的容器,也是消息的终点。一个消息可投入一个或多个队列。消息一直在队列里面,等待消费者连接到这个队列将其取走。
  6. Connection 网络连接:一般是一个TCP连接。
  7. Channel 信道:多路复用连接中的一条独立的双向数据流通道。信道是建立在真实的TCP连接内地虚拟连接,AMQP 命令都是通过信道发出去的,不管是发布消息、订阅队列还是接收消息,这些动作都是通过信道完成。因为对于操作系统来说建立和销毁 TCP 都是非常昂贵的开销,所以引入了信道的概念,以复用一条 TCP 连接。
  8. Consumer 消息消费者:表示一个从消息队列中取得消息的客户端应用程序。
  9. Virtual Host 虚拟主机:表示一批交换器、消息队列和相关对象。虚拟主机是共享相同的身份认证和加密环境的独立服务器域。每个 vhost 本质上就是一个 mini 版的 RabbitMQ 服务器,拥有自己的队列、交换器、绑定和权限机制。vhost 是 AMQP 概念的基础,必须在连接时指定,RabbitMQ 默认的 vhost 是 / 。
  10. Broker 服务器:表示消息队列服务器实体。

其中需要重点说明的是Exchange的类别,包括:

  1. direct模式:单播模式,每个Exchange根据Binding中routing key将消息转发到一个queue中;
  2. fanout模式:广播模式,一个Exchange可以关联多个queue,能将消息转发到多个queue中;
  3. topic模式:匹配模式,Exchange和queue通过消息中的routing key来匹配决定放到哪个queue;

总的来说,就是direct模式只会将消息投递到一个queue中,fanout模式会将消息投递绑定的所有queue中,topic模式会根据routing key进行模式匹配,最后投递到对应的queue中。

三、常见问题

RabbitMQ如何保障消息传递的可靠性?

RabbitMQ的消息在投递过程中经过如下阶段:生产者->RabbitMQ服务器->生产者,其中可能出现消息丢失的情况为:(1)生产者传递到RabbitMQ服务器消息丢失;(2)RabbitMQ服务器内部消息丢失;(3)消费者处理RabbitMQ服务器失败;

1.生产者传递到RabbitMQ服务器消息丢失

方案一:开启RabbitMQ的事务机制,通过对channel设置为事务模式,可以通过channel.txCommit()、 channel.txRollback()来控制提交或者回滚,但是但设置channel为事务模式时候,其消息传递是阻塞式,传递效率不到,所以一般不推荐使用。

方案二:利用消息confirm机制,当生产者在传递消息的时候,都会生产一个id,当消息被RabbitMQ的exchange接收到了就会进行confirm确认。

(2)RabbitMQ服务器内部消息丢失

方案一:设置exchange到queue的returnCallback,这个是当exchange进过路由无法正常投递到queue的时候,会进行回调处理。

方案二:设置备份交换机,在创建交换机的时候可以设置备份交换机,当该交换,机无效就会使用备份交换机。

方案三:设置持久化,为message、queue、exchange等资源设置持久化策略,这样但RabbitMQ宕机后重启数据也不会丢失。

(3)消费者处理RabbitMQ服务器失败;

方案一:针对RabbitMQ消费数据,默认情况是自动ack的,如果需要确认消费者正常处理消息了才ack,只需要将其改为手动ack即可。

2.RabbitMQ如何保障消息消费的顺序性?

四、代码实现

使用RabbitMQ的主要流程:

(1)引入maven依赖

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

(2)配置properties配置文件

spring.application.name=spring-boot-rabbitmq
spring.rabbitmq.host=xxxx
spring.rabbitmq.port=5672
spring.rabbitmq.username=xxxx
spring.rabbitmq.password=xxxx
spring.rabbitmq.virtual-host=vhost
# 开启发送失败退回(消息有没有找到合适的队列)
spring.rabbitmq.publisher-returns=true
#开启消息的confirm机制 correlated:开启;NONE:关闭
spring.rabbitmq.publisher-confirms-type=correlated
#在需要使用消息的return机制时候,此参数必须设置为true
spring.rabbitmq.template.mandatory=true
spring.rabbitmq.listener.type=simple
# 开启发送确认
spring.rabbitmq.publisher-confirm-type=simple
#消费方消息确认:手动确认
spring.rabbitmq.listener.simple.acknowledge-mode=manual
#决定被拒绝的消息是否重新入队;默认是true(与参数acknowledge-mode有关系)
spring.rabbitmq.listener.simple.default-requeue-rejected=false

(3)配置RabbitMQ的配置类

@Slf4j
@Configuration
public class RabbitConfig {
    // ------------------------普通队列 start------------------------
    // 普通队列
    @Bean
    public Queue normalQueue() {
        return new Queue(MqConstant.MQ_WEBSITE_NORMAL_QUEUE, true);
    }
    // 普通交换机
    @Bean
    public DirectExchange normalExchange() {
        return new DirectExchange(MqConstant.MQ_WEBSITE_NORMAL_EXCHANGE, true, false);
    }
    // 绑定普通消息队列
    @Bean
    public Binding normalBind() {
        return BindingBuilder.bind(normalQueue()).to(normalExchange()).with(MqConstant.MQ_WEBSITE_NORMAL_ROUTING_KEY);
    }
    // ------------------------普通队列 end------------------------
    @Bean("defaultRabbitConnectionFactory")
    @Primary
    public ConnectionFactory defaultRabbitConnectionFactory(RabbitProperties rabbitProperties){
        CachingConnectionFactory cachingConnectionFactory = new CachingConnectionFactory();
        cachingConnectionFactory.setHost(rabbitProperties.getHost());
        cachingConnectionFactory.setPort(rabbitProperties.getPort());
        cachingConnectionFactory.setUsername(rabbitProperties.getUsername());
        cachingConnectionFactory.setPassword(rabbitProperties.getPassword());
        cachingConnectionFactory.setVirtualHost("vhost");
        cachingConnectionFactory.setCacheMode(rabbitProperties.getCache().getConnection().getMode());
        // 配置发送确认回调时,次配置必须配置,否则即使在RabbitTemplate配置了ConfirmCallback也不会生效
        cachingConnectionFactory.setPublisherConfirmType(rabbitProperties.getPublisherConfirmType());
        return cachingConnectionFactory;
    }
    /** ======================== 定制一些处理策略 =============================*/
    /**
     * 定制化amqp模版
     * <p>
     * Rabbit MQ的消息确认有两种。
     * <p>
     * 一种是消息发送确认:这种是用来确认生产者将消息发送给交换机,交换机传递给队列过程中,消息是否成功投递。
     * 发送确认分两步:一是确认是否到达交换机,二是确认是否到达队列
     * <p>
     * 第二种是消费接收确认:这种是确认消费者是否成功消费了队列中的消息。
     * Springboot中使用ConfirmCallback和ReturnCallback
     * 注意:
     *  在需要使用消息的return机制时候,mandatory参数必须设置为true
     * 新版本开启消息的confirm配置publisher-confirms已经过时,改为使用publisher-confirm-type参数设置(correlated:开启;NONE:关闭)
     */
    @Bean
    public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
        RabbitTemplate rabbitTemplate = new RabbitTemplate();
        rabbitTemplate.setConnectionFactory(connectionFactory);
//        设置开启Mandatory才能触发回调函数,无论消息推送结果怎么样都强制调用回调函数
        rabbitTemplate.setMandatory(true);
        /**
         * 使用该功能需要开启消息确认 无论成功与否,需要配置 publisher-confirms: true
         * 通过实现ConfirmCallBack接口,用于实现消息发送到交换机Exchange后接收ack回调
         * correlationData  消息唯一标志
         * ack              确认结果
         * cause            失败原因
         */
        rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
            log.info("ConfirmCallback:" + "相关数据:" + correlationData);
            log.info("ConfirmCallback:" + "确认情况:" + ack);
            log.info("ConfirmCallback:" + "原因:" + cause);
        });
        /**
         * 消息从Exchange路由到Queue失败的回调
         * 使用该功能需要开启消息返回确认,需要配置 publisher-returns: true
         * 通过实现ReturnCallback接口,如果消息从交换机发送到对应队列失败时触发
         * message    消息主体 message
         * replyCode  消息主体 message
         * replyText  描述
         * exchange   消息使用的交换机
         * routingKey 消息使用的路由键
         */
        rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
            log.info("ReturnCallback:" + "消息:" + message);
            log.info("ReturnCallback:" + "回应码:" + replyCode);
            log.info("ReturnCallback:" + "回应信息:" + replyText);
            log.info("ReturnCallback:" + "交换机:" + exchange);
            log.info("ReturnCallback:" + "路由键:" + routingKey);
        });
        return rabbitTemplate;
    }
} 

(4)配置消息生产者

@Component
@Slf4j
public class Sender {
    @Autowired
    private AmqpTemplate rabbitTemplate;
    // 发送普通消息
    public void sendMsg(String exchange,String routingKey, String content) {
        // DirectExchange类型的交换机,必须指定对应的路由键
        rabbitTemplate.convertAndSend(exchange, routingKey, content);
        log.info("=================start sendMsg, exchange:{}, routingKey:{}, content:{}================",
                exchange, routingKey, content);
    }
}

(5)配置消息消费者

@Component
@Slf4j
public class Receiver {
    // 消费普通消息
    @RabbitListener(queues = {"website_normal_queue"})
    @RabbitHandler
    public void process1(String content, Message message, Channel channel) throws IOException {
        try {
            log.info("普通队列的内容[{}]", content);
            // 消息的可定确认,第二个参数如果为true将一次性确认所有小于deliveryTag的消息
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
            log.info("普通信息处理完毕");
        } catch (Exception e) {
            log.error("处理失败:{}", e.getMessage());
            // 直接拒绝消费该消息,后面的参数一定要是false,否则会重新进入业务队列,不会进入死信队列
            channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);
        }
    }
}

代码实现https://github.com/yangnk/SpringBoot_Learning/tree/master/RabbitMQDemo


TODO

  • 常见问题还需要整理一下;
  • 还需要描写一下死信队列;

参考资料

  1. springboot 整合 RabbitMQ#:https://www.cnblogs.com/MrYuChen-Blog/p/15984975.html#基础配置项


相关实践学习
快速体验阿里云云消息队列RocketMQ版
本实验将带您快速体验使用云消息队列RocketMQ版Serverless系列实例进行获取接入点、创建Topic、创建订阅组、收发消息、查看消息轨迹和仪表盘。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
目录
相关文章
|
3月前
|
消息中间件 数据管理 Serverless
阿里云消息队列 Apache RocketMQ 创新论文入选顶会 ACM FSE 2025
阿里云消息团队基于 Apache RocketMQ 构建 Serverless 消息系统,适配多种主流消息协议(如 RabbitMQ、MQTT 和 Kafka),成功解决了传统中间件在可伸缩性、成本及元数据管理等方面的难题,并据此实现 ApsaraMQ 全系列产品 Serverless 化,助力企业提效降本。
|
1月前
|
消息中间件 Java Kafka
消息队列比较:Spring 微服务中的 Kafka 与 RabbitMQ
本文深入解析了 Kafka 和 RabbitMQ 两大主流消息队列在 Spring 微服务中的应用与对比。内容涵盖消息队列的基本原理、Kafka 与 RabbitMQ 的核心概念、各自优势及典型用例,并结合 Spring 生态的集成方式,帮助开发者根据实际需求选择合适的消息中间件,提升系统解耦、可扩展性与可靠性。
120 1
消息队列比较:Spring 微服务中的 Kafka 与 RabbitMQ
|
6月前
|
消息中间件 存储 缓存
RocketMQ原理—4.消息读写的性能优化
本文详细解析了RocketMQ消息队列的核心原理与性能优化机制,涵盖Producer消息分发、Broker高并发写入、Consumer拉取消息流程等内容。重点探讨了基于队列的消息分发、Hash有序分发、CommitLog内存写入优化、ConsumeQueue物理存储设计等关键技术点。同时分析了数据丢失场景及解决方案,如同步刷盘与JVM OffHeap缓存分离策略,并总结了写入与读取流程的性能优化方法,为理解和优化分布式消息系统提供了全面指导。
RocketMQ原理—4.消息读写的性能优化
|
4月前
|
消息中间件 存储 Kafka
一文带你从入门到实战全面掌握RocketMQ核心概念、架构部署、实践应用和高级特性
本文详细介绍了分布式消息中间件RocketMQ的核心概念、部署方式及使用方法。RocketMQ由阿里研发并开源,具有高性能、高可靠性和分布式特性,广泛应用于金融、互联网等领域。文章从环境搭建到消息类型的实战(普通消息、延迟消息、顺序消息和事务消息)进行了全面解析,并对比了三种消费者类型(PushConsumer、SimpleConsumer和PullConsumer)的特点与适用场景。最后总结了使用RocketMQ时的关键注意事项,如Topic和Tag的设计、监控告警的重要性以及性能与可靠性的平衡。通过学习本文,读者可掌握RocketMQ的使用精髓并灵活应用于实际项目中。
2669 9
 一文带你从入门到实战全面掌握RocketMQ核心概念、架构部署、实践应用和高级特性
|
9月前
|
消息中间件 存储 监控
活动实践 | 快速体验云消息队列RocketMQ版
本方案介绍如何使用阿里云消息队列RocketMQ版Serverless实例进行消息管理。主要步骤包括获取接入点、创建Topic和订阅组、收发消息、查看消息轨迹及仪表盘监控。通过这些操作,用户可以轻松实现消息的全生命周期管理,确保消息收发的高效与可靠。此外,还提供了消费验证、下载消息等功能,方便用户进行详细的消息处理与调试。
|
8月前
|
消息中间件 监控 数据挖掘
【有奖实践】轻量消息队列(原 MNS)订阅 OSS 事件实时处理文件变动
当你需要对对象存储 OSS(Object Storage Service)中的文件变动进行实时处理、同步、监听、业务触发、日志记录等操作时,你可以通过设置 OSS 的事件通知规则,自定义关注的文件,并将 OSS 事件推送到轻量消息队列(原 MNS)的队列或主题中,开发者的服务即可及时收到相关通知,并通过消费消息进行后续的业务处理。
168 100
|
6月前
|
消息中间件 存储 设计模式
RocketMQ原理—5.高可用+高并发+高性能架构
本文主要从高可用架构、高并发架构、高性能架构三个方面来介绍RocketMQ的原理。
1769 21
RocketMQ原理—5.高可用+高并发+高性能架构
|
6月前
|
存储 消息中间件 缓存
RocketMQ原理—3.源码设计简单分析下
本文介绍了Producer作为生产者是如何创建出来的、启动时是如何准备好相关资源的、如何从拉取Topic元数据的、如何选择MessageQueue的、与Broker是如何进行网络通信的,Broker收到一条消息后是如何存储的、如何实时更新索引文件的、如何实现同步刷盘以及异步刷盘的、如何清理存储较久的磁盘数据的,Consumer作为消费者是如何创建和启动的、消费者组的多个Consumer会如何分配消息、Consumer会如何从Broker拉取一批消息。
272 11
RocketMQ原理—3.源码设计简单分析下
|
6月前
|
存储 消息中间件 网络协议
RocketMQ原理—1.RocketMQ整体运行原理
本文详细解析了RocketMQ的整体运行原理,涵盖从生产者到消费者的全流程。首先介绍生产者发送消息的机制,包括Topic与MessageQueue的关系及写入策略;接着分析Broker如何通过CommitLog和ConsumeQueue实现消息持久化,并探讨同步与异步刷盘的优缺点。同时,讲解基于DLedger技术的主从同步原理,确保高可用性。消费者部分则重点讨论消费模式(集群 vs 广播)、拉取消息策略及负载均衡机制。网络通信层面,基于Netty的高性能架构通过多线程池分工协作提升并发能力。最后,揭示mmap与PageCache技术优化文件读写的细节,总结了RocketMQ的核心运行机制。
RocketMQ原理—1.RocketMQ整体运行原理

热门文章

最新文章