RabbitMQ消息队列的原理和实践

本文涉及的产品
容器服务 Serverless 版 ACK Serverless,317元额度 多规格
容器服务 Serverless 版 ACK Serverless,952元额度 多规格
简介: RabbitMQ消息队列的原理和实践

本文首先会介绍消息队列(MQ)的基本概念,也会对比目前业界常见的MQ区别,之后会介绍RabbitMQ的核心概念和其实用。

一、消息队列(MQ)介绍和对比

消息队列主要用来在不同系统之间进行消息传递,他是服务间进行异步通信的一种常见方式,RabbitMQ作为一种常见的消息中间件,经常被用在处理异步消息、进行系统间解耦、进行大流量的削峰填谷的作用。

但引入消息中间件同时也会带来一些坏处,那就是增加了系统的复杂性,如果消息中间件不可用,可能就会导致整个系统的不可用,这样就降低了系统的可用性,同时由于RabbitMQ的本身的特性,他在面对消息传递过大的时候,会造成消息积压的问题,这些都是在生产环境使用RabbitMQ需要考虑的问题。

下面是对比业界常用的3种MQ:

优点 缺点 使用场景
kafka 吞吐量非常大, 性能非常好, 集群高可用。 会丢数据, 功能比较单一。 日志分析 , 大数据采集
RabbitMQ 消息可靠性高, 功能全面。 吞吐量比较低, 消息积累会影响性能, erlang语言不好定制。 小规模场景
RocketMQ 高吞吐,高性能,高可用, 功能全面。 开源版功能不如云上版, 官方文档比较简单, 客户端只支持java。 几乎全场景

总的来说,kafka在常见MQ中性能最高,但是存在数据丢失的问题,所以一般用在日志采集中;RabbitMQ的可靠性最高、功能最全面,但是其存在消息积压的问题,所以一般用在需要保障消息不丢失的小规模场景中;RocketMQ汲取了kafka和RabbitMQ的长处,权衡了性能和可靠性,所以适用大多数场景。

二、RabbitMQ基本概念

以下是RabbitMQ的通信模型,总的来说这是一种生产者/消费者的通信模型,生产者和RabbitMQ通过Connection建立连接后,生产者将消息发送给RabbitMQ,消息会直接投递到Exchange,Exchange根据路由信息再转发到对应的Queue中,下游如果关联了消费者,就会直接推送给消费者。

  1. Message 消息:消息是不具名的,它由消息头header和消息体payload组成。消息体是不透明的,而消息头则由一系列的可选属性组成,这些属性包括routing-key(路由键)、priority(相对于其他消息的优先权)、delivery-mode(指出该消息可能需要持久性存储)等。
  2. Producer消息生产者:也是一个向交换器Exchange发布消息的客户端应用程序。
  3. Exchange 交换器:用来接收生产者发送的消息并将这些消息路由给服务器中的队列Queue。
  4. Binding 绑定:用于消息队列Queue和交换器Exchange之间的关联。一个绑定就是基于Routing key将交换器和消息队列连接起来的路由规则,所以可以将交换器理解成一个由绑定构成的路由表。
  5. Queue 消息队列:用来保存消息直到发送给消费者。它是消息的容器,也是消息的终点。一个消息可投入一个或多个队列。消息一直在队列里面,等待消费者连接到这个队列将其取走。
  6. Connection 网络连接:一般是一个TCP连接。
  7. Channel 信道:多路复用连接中的一条独立的双向数据流通道。信道是建立在真实的TCP连接内地虚拟连接,AMQP 命令都是通过信道发出去的,不管是发布消息、订阅队列还是接收消息,这些动作都是通过信道完成。因为对于操作系统来说建立和销毁 TCP 都是非常昂贵的开销,所以引入了信道的概念,以复用一条 TCP 连接。
  8. Consumer 消息消费者:表示一个从消息队列中取得消息的客户端应用程序。
  9. Virtual Host 虚拟主机:表示一批交换器、消息队列和相关对象。虚拟主机是共享相同的身份认证和加密环境的独立服务器域。每个 vhost 本质上就是一个 mini 版的 RabbitMQ 服务器,拥有自己的队列、交换器、绑定和权限机制。vhost 是 AMQP 概念的基础,必须在连接时指定,RabbitMQ 默认的 vhost 是 / 。
  10. Broker 服务器:表示消息队列服务器实体。

其中需要重点说明的是Exchange的类别,包括:

  1. direct模式:单播模式,每个Exchange根据Binding中routing key将消息转发到一个queue中;
  2. fanout模式:广播模式,一个Exchange可以关联多个queue,能将消息转发到多个queue中;
  3. topic模式:匹配模式,Exchange和queue通过消息中的routing key来匹配决定放到哪个queue;

总的来说,就是direct模式只会将消息投递到一个queue中,fanout模式会将消息投递绑定的所有queue中,topic模式会根据routing key进行模式匹配,最后投递到对应的queue中。

三、常见问题

RabbitMQ如何保障消息传递的可靠性?

RabbitMQ的消息在投递过程中经过如下阶段:生产者->RabbitMQ服务器->生产者,其中可能出现消息丢失的情况为:(1)生产者传递到RabbitMQ服务器消息丢失;(2)RabbitMQ服务器内部消息丢失;(3)消费者处理RabbitMQ服务器失败;

1.生产者传递到RabbitMQ服务器消息丢失

方案一:开启RabbitMQ的事务机制,通过对channel设置为事务模式,可以通过channel.txCommit()、 channel.txRollback()来控制提交或者回滚,但是但设置channel为事务模式时候,其消息传递是阻塞式,传递效率不到,所以一般不推荐使用。

方案二:利用消息confirm机制,当生产者在传递消息的时候,都会生产一个id,当消息被RabbitMQ的exchange接收到了就会进行confirm确认。

(2)RabbitMQ服务器内部消息丢失

方案一:设置exchange到queue的returnCallback,这个是当exchange进过路由无法正常投递到queue的时候,会进行回调处理。

方案二:设置备份交换机,在创建交换机的时候可以设置备份交换机,当该交换,机无效就会使用备份交换机。

方案三:设置持久化,为message、queue、exchange等资源设置持久化策略,这样但RabbitMQ宕机后重启数据也不会丢失。

(3)消费者处理RabbitMQ服务器失败;

方案一:针对RabbitMQ消费数据,默认情况是自动ack的,如果需要确认消费者正常处理消息了才ack,只需要将其改为手动ack即可。

2.RabbitMQ如何保障消息消费的顺序性?

四、代码实现

使用RabbitMQ的主要流程:

(1)引入maven依赖

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

(2)配置properties配置文件

spring.application.name=spring-boot-rabbitmq
spring.rabbitmq.host=xxxx
spring.rabbitmq.port=5672
spring.rabbitmq.username=xxxx
spring.rabbitmq.password=xxxx
spring.rabbitmq.virtual-host=vhost
# 开启发送失败退回(消息有没有找到合适的队列)
spring.rabbitmq.publisher-returns=true
#开启消息的confirm机制 correlated:开启;NONE:关闭
spring.rabbitmq.publisher-confirms-type=correlated
#在需要使用消息的return机制时候,此参数必须设置为true
spring.rabbitmq.template.mandatory=true
spring.rabbitmq.listener.type=simple
# 开启发送确认
spring.rabbitmq.publisher-confirm-type=simple
#消费方消息确认:手动确认
spring.rabbitmq.listener.simple.acknowledge-mode=manual
#决定被拒绝的消息是否重新入队;默认是true(与参数acknowledge-mode有关系)
spring.rabbitmq.listener.simple.default-requeue-rejected=false

(3)配置RabbitMQ的配置类

@Slf4j
@Configuration
public class RabbitConfig {
    // ------------------------普通队列 start------------------------
    // 普通队列
    @Bean
    public Queue normalQueue() {
        return new Queue(MqConstant.MQ_WEBSITE_NORMAL_QUEUE, true);
    }
    // 普通交换机
    @Bean
    public DirectExchange normalExchange() {
        return new DirectExchange(MqConstant.MQ_WEBSITE_NORMAL_EXCHANGE, true, false);
    }
    // 绑定普通消息队列
    @Bean
    public Binding normalBind() {
        return BindingBuilder.bind(normalQueue()).to(normalExchange()).with(MqConstant.MQ_WEBSITE_NORMAL_ROUTING_KEY);
    }
    // ------------------------普通队列 end------------------------
    @Bean("defaultRabbitConnectionFactory")
    @Primary
    public ConnectionFactory defaultRabbitConnectionFactory(RabbitProperties rabbitProperties){
        CachingConnectionFactory cachingConnectionFactory = new CachingConnectionFactory();
        cachingConnectionFactory.setHost(rabbitProperties.getHost());
        cachingConnectionFactory.setPort(rabbitProperties.getPort());
        cachingConnectionFactory.setUsername(rabbitProperties.getUsername());
        cachingConnectionFactory.setPassword(rabbitProperties.getPassword());
        cachingConnectionFactory.setVirtualHost("vhost");
        cachingConnectionFactory.setCacheMode(rabbitProperties.getCache().getConnection().getMode());
        // 配置发送确认回调时,次配置必须配置,否则即使在RabbitTemplate配置了ConfirmCallback也不会生效
        cachingConnectionFactory.setPublisherConfirmType(rabbitProperties.getPublisherConfirmType());
        return cachingConnectionFactory;
    }
    /** ======================== 定制一些处理策略 =============================*/
    /**
     * 定制化amqp模版
     * <p>
     * Rabbit MQ的消息确认有两种。
     * <p>
     * 一种是消息发送确认:这种是用来确认生产者将消息发送给交换机,交换机传递给队列过程中,消息是否成功投递。
     * 发送确认分两步:一是确认是否到达交换机,二是确认是否到达队列
     * <p>
     * 第二种是消费接收确认:这种是确认消费者是否成功消费了队列中的消息。
     * Springboot中使用ConfirmCallback和ReturnCallback
     * 注意:
     *  在需要使用消息的return机制时候,mandatory参数必须设置为true
     * 新版本开启消息的confirm配置publisher-confirms已经过时,改为使用publisher-confirm-type参数设置(correlated:开启;NONE:关闭)
     */
    @Bean
    public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
        RabbitTemplate rabbitTemplate = new RabbitTemplate();
        rabbitTemplate.setConnectionFactory(connectionFactory);
//        设置开启Mandatory才能触发回调函数,无论消息推送结果怎么样都强制调用回调函数
        rabbitTemplate.setMandatory(true);
        /**
         * 使用该功能需要开启消息确认 无论成功与否,需要配置 publisher-confirms: true
         * 通过实现ConfirmCallBack接口,用于实现消息发送到交换机Exchange后接收ack回调
         * correlationData  消息唯一标志
         * ack              确认结果
         * cause            失败原因
         */
        rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
            log.info("ConfirmCallback:" + "相关数据:" + correlationData);
            log.info("ConfirmCallback:" + "确认情况:" + ack);
            log.info("ConfirmCallback:" + "原因:" + cause);
        });
        /**
         * 消息从Exchange路由到Queue失败的回调
         * 使用该功能需要开启消息返回确认,需要配置 publisher-returns: true
         * 通过实现ReturnCallback接口,如果消息从交换机发送到对应队列失败时触发
         * message    消息主体 message
         * replyCode  消息主体 message
         * replyText  描述
         * exchange   消息使用的交换机
         * routingKey 消息使用的路由键
         */
        rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
            log.info("ReturnCallback:" + "消息:" + message);
            log.info("ReturnCallback:" + "回应码:" + replyCode);
            log.info("ReturnCallback:" + "回应信息:" + replyText);
            log.info("ReturnCallback:" + "交换机:" + exchange);
            log.info("ReturnCallback:" + "路由键:" + routingKey);
        });
        return rabbitTemplate;
    }
} 

(4)配置消息生产者

@Component
@Slf4j
public class Sender {
    @Autowired
    private AmqpTemplate rabbitTemplate;
    // 发送普通消息
    public void sendMsg(String exchange,String routingKey, String content) {
        // DirectExchange类型的交换机,必须指定对应的路由键
        rabbitTemplate.convertAndSend(exchange, routingKey, content);
        log.info("=================start sendMsg, exchange:{}, routingKey:{}, content:{}================",
                exchange, routingKey, content);
    }
}

(5)配置消息消费者

@Component
@Slf4j
public class Receiver {
    // 消费普通消息
    @RabbitListener(queues = {"website_normal_queue"})
    @RabbitHandler
    public void process1(String content, Message message, Channel channel) throws IOException {
        try {
            log.info("普通队列的内容[{}]", content);
            // 消息的可定确认,第二个参数如果为true将一次性确认所有小于deliveryTag的消息
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
            log.info("普通信息处理完毕");
        } catch (Exception e) {
            log.error("处理失败:{}", e.getMessage());
            // 直接拒绝消费该消息,后面的参数一定要是false,否则会重新进入业务队列,不会进入死信队列
            channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);
        }
    }
}

代码实现https://github.com/yangnk/SpringBoot_Learning/tree/master/RabbitMQDemo


TODO

  • 常见问题还需要整理一下;
  • 还需要描写一下死信队列;

参考资料

  1. springboot 整合 RabbitMQ#:https://www.cnblogs.com/MrYuChen-Blog/p/15984975.html#基础配置项


相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
目录
相关文章
|
6天前
|
消息中间件 存储 Kafka
RocketMQ 工作原理图解,看这篇就够了!
本文详细解析了 RocketMQ 的核心架构、消息领域模型、关键特性和应用场景,帮助深入理解消息中间件的工作原理。关注【mikechen的互联网架构】,10年+BAT架构经验倾囊相授。
RocketMQ 工作原理图解,看这篇就够了!
|
23天前
|
消息中间件 JSON Java
开发者如何使用轻量消息队列MNS
【10月更文挑战第19天】开发者如何使用轻量消息队列MNS
63 5
|
18天前
|
消息中间件 存储 Kafka
MQ 消息队列核心原理,12 条最全面总结!
本文总结了消息队列的12个核心原理,涵盖消息顺序性、ACK机制、持久化及高可用性等内容。关注【mikechen的互联网架构】,10年+BAT架构经验倾囊相授。
|
21天前
|
消息中间件
解决方案 | 云消息队列RabbitMQ实践获奖名单公布!
云消息队列RabbitMQ实践获奖名单公布!
|
29天前
|
消息中间件 存储 弹性计算
云消息队列RabbitMQ实践
云消息队列RabbitMQ实践
|
4月前
|
消息中间件 C语言 RocketMQ
消息队列 MQ操作报错合集之出现"Connection reset by peer"的错误,该如何处理
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
|
1月前
|
消息中间件 安全 Java
云消息队列RabbitMQ实践解决方案评测
一文带你详细了解云消息队列RabbitMQ实践的解决方案优与劣
63 7
|
1月前
|
消息中间件 存储 监控
解决方案 | 云消息队列RabbitMQ实践
在实际业务中,网站因消息堆积和高流量脉冲导致系统故障。为解决这些问题,云消息队列 RabbitMQ 版提供高性能的消息处理和海量消息堆积能力,确保系统在流量高峰时仍能稳定运行。迁移前需进行技术能力和成本效益评估,包括功能、性能、限制值及费用等方面。迁移步骤包括元数据迁移、创建用户、网络打通和数据迁移。
64 4
|
2月前
|
消息中间件 运维 监控
云消息队列RabbitMQ实践解决方案评测报告
本报告旨在对《云消息队列RabbitMQ实践》解决方案进行综合评测。通过对该方案的原理理解、部署体验、设计验证以及实际应用价值等方面进行全面分析,为用户提供详尽的反馈与建议。
80 16
|
2月前
|
消息中间件 弹性计算 运维
阿里云云消息队列RabbitMQ实践解决方案评测报告
阿里云云消息队列RabbitMQ实践解决方案评测报告
73 9