本文首先会介绍消息队列(MQ)的基本概念,也会对比目前业界常见的MQ区别,之后会介绍RabbitMQ的核心概念和其实用。
一、消息队列(MQ)介绍和对比
消息队列主要用来在不同系统之间进行消息传递,他是服务间进行异步通信的一种常见方式,RabbitMQ作为一种常见的消息中间件,经常被用在处理异步消息、进行系统间解耦、进行大流量的削峰填谷的作用。
但引入消息中间件同时也会带来一些坏处,那就是增加了系统的复杂性,如果消息中间件不可用,可能就会导致整个系统的不可用,这样就降低了系统的可用性,同时由于RabbitMQ的本身的特性,他在面对消息传递过大的时候,会造成消息积压的问题,这些都是在生产环境使用RabbitMQ需要考虑的问题。
下面是对比业界常用的3种MQ:
优点 | 缺点 | 使用场景 | |
kafka | 吞吐量非常大, 性能非常好, 集群高可用。 | 会丢数据, 功能比较单一。 | 日志分析 , 大数据采集 |
RabbitMQ | 消息可靠性高, 功能全面。 | 吞吐量比较低, 消息积累会影响性能, erlang语言不好定制。 | 小规模场景 |
RocketMQ | 高吞吐,高性能,高可用, 功能全面。 | 开源版功能不如云上版, 官方文档比较简单, 客户端只支持java。 | 几乎全场景 |
总的来说,kafka在常见MQ中性能最高,但是存在数据丢失的问题,所以一般用在日志采集中;RabbitMQ的可靠性最高、功能最全面,但是其存在消息积压的问题,所以一般用在需要保障消息不丢失的小规模场景中;RocketMQ汲取了kafka和RabbitMQ的长处,权衡了性能和可靠性,所以适用大多数场景。
二、RabbitMQ基本概念
以下是RabbitMQ的通信模型,总的来说这是一种生产者/消费者的通信模型,生产者和RabbitMQ通过Connection建立连接后,生产者将消息发送给RabbitMQ,消息会直接投递到Exchange,Exchange根据路由信息再转发到对应的Queue中,下游如果关联了消费者,就会直接推送给消费者。
- Message 消息:消息是不具名的,它由消息头header和消息体payload组成。消息体是不透明的,而消息头则由一系列的可选属性组成,这些属性包括routing-key(路由键)、priority(相对于其他消息的优先权)、delivery-mode(指出该消息可能需要持久性存储)等。
- Producer消息生产者:也是一个向交换器Exchange发布消息的客户端应用程序。
- Exchange 交换器:用来接收生产者发送的消息并将这些消息路由给服务器中的队列Queue。
- Binding 绑定:用于消息队列Queue和交换器Exchange之间的关联。一个绑定就是基于Routing key将交换器和消息队列连接起来的路由规则,所以可以将交换器理解成一个由绑定构成的路由表。
- Queue 消息队列:用来保存消息直到发送给消费者。它是消息的容器,也是消息的终点。一个消息可投入一个或多个队列。消息一直在队列里面,等待消费者连接到这个队列将其取走。
- Connection 网络连接:一般是一个TCP连接。
- Channel 信道:多路复用连接中的一条独立的双向数据流通道。信道是建立在真实的TCP连接内地虚拟连接,AMQP 命令都是通过信道发出去的,不管是发布消息、订阅队列还是接收消息,这些动作都是通过信道完成。因为对于操作系统来说建立和销毁 TCP 都是非常昂贵的开销,所以引入了信道的概念,以复用一条 TCP 连接。
- Consumer 消息消费者:表示一个从消息队列中取得消息的客户端应用程序。
- Virtual Host 虚拟主机:表示一批交换器、消息队列和相关对象。虚拟主机是共享相同的身份认证和加密环境的独立服务器域。每个 vhost 本质上就是一个 mini 版的 RabbitMQ 服务器,拥有自己的队列、交换器、绑定和权限机制。vhost 是 AMQP 概念的基础,必须在连接时指定,RabbitMQ 默认的 vhost 是 / 。
- Broker 服务器:表示消息队列服务器实体。
其中需要重点说明的是Exchange的类别,包括:
- direct模式:单播模式,每个Exchange根据Binding中routing key将消息转发到一个queue中;
- fanout模式:广播模式,一个Exchange可以关联多个queue,能将消息转发到多个queue中;
- topic模式:匹配模式,Exchange和queue通过消息中的routing key来匹配决定放到哪个queue;
总的来说,就是direct模式只会将消息投递到一个queue中,fanout模式会将消息投递绑定的所有queue中,topic模式会根据routing key进行模式匹配,最后投递到对应的queue中。
三、常见问题
RabbitMQ如何保障消息传递的可靠性?
RabbitMQ的消息在投递过程中经过如下阶段:生产者->RabbitMQ服务器->生产者,其中可能出现消息丢失的情况为:(1)生产者传递到RabbitMQ服务器消息丢失;(2)RabbitMQ服务器内部消息丢失;(3)消费者处理RabbitMQ服务器失败;
1.生产者传递到RabbitMQ服务器消息丢失
方案一:开启RabbitMQ的事务机制,通过对channel设置为事务模式,可以通过channel.txCommit()、 channel.txRollback()来控制提交或者回滚,但是但设置channel为事务模式时候,其消息传递是阻塞式,传递效率不到,所以一般不推荐使用。
方案二:利用消息confirm机制,当生产者在传递消息的时候,都会生产一个id,当消息被RabbitMQ的exchange接收到了就会进行confirm确认。
(2)RabbitMQ服务器内部消息丢失
方案一:设置exchange到queue的returnCallback,这个是当exchange进过路由无法正常投递到queue的时候,会进行回调处理。
方案二:设置备份交换机,在创建交换机的时候可以设置备份交换机,当该交换,机无效就会使用备份交换机。
方案三:设置持久化,为message、queue、exchange等资源设置持久化策略,这样但RabbitMQ宕机后重启数据也不会丢失。
(3)消费者处理RabbitMQ服务器失败;
方案一:针对RabbitMQ消费数据,默认情况是自动ack的,如果需要确认消费者正常处理消息了才ack,只需要将其改为手动ack即可。
2.RabbitMQ如何保障消息消费的顺序性?
四、代码实现
使用RabbitMQ的主要流程:
(1)引入maven依赖
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
(2)配置properties配置文件
spring.application.name=spring-boot-rabbitmq spring.rabbitmq.host=xxxx spring.rabbitmq.port=5672 spring.rabbitmq.username=xxxx spring.rabbitmq.password=xxxx spring.rabbitmq.virtual-host=vhost # 开启发送失败退回(消息有没有找到合适的队列) spring.rabbitmq.publisher-returns=true #开启消息的confirm机制 correlated:开启;NONE:关闭 spring.rabbitmq.publisher-confirms-type=correlated #在需要使用消息的return机制时候,此参数必须设置为true spring.rabbitmq.template.mandatory=true spring.rabbitmq.listener.type=simple # 开启发送确认 spring.rabbitmq.publisher-confirm-type=simple #消费方消息确认:手动确认 spring.rabbitmq.listener.simple.acknowledge-mode=manual #决定被拒绝的消息是否重新入队;默认是true(与参数acknowledge-mode有关系) spring.rabbitmq.listener.simple.default-requeue-rejected=false
(3)配置RabbitMQ的配置类
@Slf4j @Configuration public class RabbitConfig { // ------------------------普通队列 start------------------------ // 普通队列 @Bean public Queue normalQueue() { return new Queue(MqConstant.MQ_WEBSITE_NORMAL_QUEUE, true); } // 普通交换机 @Bean public DirectExchange normalExchange() { return new DirectExchange(MqConstant.MQ_WEBSITE_NORMAL_EXCHANGE, true, false); } // 绑定普通消息队列 @Bean public Binding normalBind() { return BindingBuilder.bind(normalQueue()).to(normalExchange()).with(MqConstant.MQ_WEBSITE_NORMAL_ROUTING_KEY); } // ------------------------普通队列 end------------------------ @Bean("defaultRabbitConnectionFactory") @Primary public ConnectionFactory defaultRabbitConnectionFactory(RabbitProperties rabbitProperties){ CachingConnectionFactory cachingConnectionFactory = new CachingConnectionFactory(); cachingConnectionFactory.setHost(rabbitProperties.getHost()); cachingConnectionFactory.setPort(rabbitProperties.getPort()); cachingConnectionFactory.setUsername(rabbitProperties.getUsername()); cachingConnectionFactory.setPassword(rabbitProperties.getPassword()); cachingConnectionFactory.setVirtualHost("vhost"); cachingConnectionFactory.setCacheMode(rabbitProperties.getCache().getConnection().getMode()); // 配置发送确认回调时,次配置必须配置,否则即使在RabbitTemplate配置了ConfirmCallback也不会生效 cachingConnectionFactory.setPublisherConfirmType(rabbitProperties.getPublisherConfirmType()); return cachingConnectionFactory; } /** ======================== 定制一些处理策略 =============================*/ /** * 定制化amqp模版 * <p> * Rabbit MQ的消息确认有两种。 * <p> * 一种是消息发送确认:这种是用来确认生产者将消息发送给交换机,交换机传递给队列过程中,消息是否成功投递。 * 发送确认分两步:一是确认是否到达交换机,二是确认是否到达队列 * <p> * 第二种是消费接收确认:这种是确认消费者是否成功消费了队列中的消息。 * Springboot中使用ConfirmCallback和ReturnCallback * 注意: * 在需要使用消息的return机制时候,mandatory参数必须设置为true * 新版本开启消息的confirm配置publisher-confirms已经过时,改为使用publisher-confirm-type参数设置(correlated:开启;NONE:关闭) */ @Bean public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) { RabbitTemplate rabbitTemplate = new RabbitTemplate(); rabbitTemplate.setConnectionFactory(connectionFactory); // 设置开启Mandatory才能触发回调函数,无论消息推送结果怎么样都强制调用回调函数 rabbitTemplate.setMandatory(true); /** * 使用该功能需要开启消息确认 无论成功与否,需要配置 publisher-confirms: true * 通过实现ConfirmCallBack接口,用于实现消息发送到交换机Exchange后接收ack回调 * correlationData 消息唯一标志 * ack 确认结果 * cause 失败原因 */ rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> { log.info("ConfirmCallback:" + "相关数据:" + correlationData); log.info("ConfirmCallback:" + "确认情况:" + ack); log.info("ConfirmCallback:" + "原因:" + cause); }); /** * 消息从Exchange路由到Queue失败的回调 * 使用该功能需要开启消息返回确认,需要配置 publisher-returns: true * 通过实现ReturnCallback接口,如果消息从交换机发送到对应队列失败时触发 * message 消息主体 message * replyCode 消息主体 message * replyText 描述 * exchange 消息使用的交换机 * routingKey 消息使用的路由键 */ rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> { log.info("ReturnCallback:" + "消息:" + message); log.info("ReturnCallback:" + "回应码:" + replyCode); log.info("ReturnCallback:" + "回应信息:" + replyText); log.info("ReturnCallback:" + "交换机:" + exchange); log.info("ReturnCallback:" + "路由键:" + routingKey); }); return rabbitTemplate; } }
(4)配置消息生产者
@Component @Slf4j public class Sender { @Autowired private AmqpTemplate rabbitTemplate; // 发送普通消息 public void sendMsg(String exchange,String routingKey, String content) { // DirectExchange类型的交换机,必须指定对应的路由键 rabbitTemplate.convertAndSend(exchange, routingKey, content); log.info("=================start sendMsg, exchange:{}, routingKey:{}, content:{}================", exchange, routingKey, content); } }
(5)配置消息消费者
@Component @Slf4j public class Receiver { // 消费普通消息 @RabbitListener(queues = {"website_normal_queue"}) @RabbitHandler public void process1(String content, Message message, Channel channel) throws IOException { try { log.info("普通队列的内容[{}]", content); // 消息的可定确认,第二个参数如果为true将一次性确认所有小于deliveryTag的消息 channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); log.info("普通信息处理完毕"); } catch (Exception e) { log.error("处理失败:{}", e.getMessage()); // 直接拒绝消费该消息,后面的参数一定要是false,否则会重新进入业务队列,不会进入死信队列 channel.basicReject(message.getMessageProperties().getDeliveryTag(), false); } } }
代码实现:https://github.com/yangnk/SpringBoot_Learning/tree/master/RabbitMQDemo
TODO
- 常见问题还需要整理一下;
- 还需要描写一下死信队列;
参考资料
- springboot 整合 RabbitMQ#:https://www.cnblogs.com/MrYuChen-Blog/p/15984975.html#基础配置项