RabbitMQ部署指北
下载镜像
docker pull rabbitmq:3.8-management
执行下面的命令来运行MQ容器:
docker run \
-e RABBITMQ_DEFAULT_USER=itcast \
-e RABBITMQ_DEFAULT_PASS=zhangbo123456* \
-v mq-plugins:/plugins \
--name mq \
--hostname mq1 \
-p 15672:15672 \
-p 5672:5672 \
-d \
rabbitmq:3.8-management
什么是消息队列
MQ全称为Message Queue,即消息队列。“消息队列”是在消息的传输过程中保存消息的容器。它是典型的:生产者、消费者模型。生产者不断向消息队列中生产消息,消费者不断的从队列中获取消息。因为消息的生产和消费都是异步的,而且只关心消息的发送和接收,没有业务逻辑的侵入,这样就实现了生产者和消费者的解耦。
RabbitMQ快速入门
RabbitMQ是由erlang语言开发,基于AMQP(Advanced Message Queue 高级消息队列协议)协议实现的消息队列,它是一种应用程序之间的通信方法,消息队列在分布式系统开发中应用非常广泛。RabbitMQ官方地址:http://www.rabbitmq.com
SpringAMQP
1,Basic Queue 简单队列模型
2,Work Queue 工作队列模型
3,发布订阅模型 fanout
4,发布订阅模型 Direct
5,发布订阅模型 Topic
6,消息转换器
概念:
AMQP:是用于在应用程序或之间传递业务消息的开放标准,该协议与语言和平台无关,更符合微服务中独立性的要求
SpringAMQP:是基于AMQP协议定义的一套API规范,提供了模板来发送和接收消息,包含两部分,其中spring-amqp是基础抽象,spring-rabbit是底层的默认实现
AMQP和JMS区别和联系
MQ是消息通信的模型,并发具体实现。现在实现MQ的有两种主流方式:AMQP、JMS。
两者间的区别和联系:
- JMS是定义了统一的接口,来对消息操作进行统一;AMQP是通过规定协议来统一数据交互的格式
- JMS限定了必须使用Java语言;AMQP只是协议,不规定实现方式,因此是跨语言的。
- JMS规定了两种消息模型;而AMQP的消息模型更加丰富
HelloWorld案例
官方的helloword是基于最基础的消息队列模型来实现的,其中包括三个角色
1,publisher:消息发布者,要将消息发布到队列queue
2,queue:消息队列,负责接收并缓存消息
3,consumer:订阅队列,处理队列中的消息
基本消息队列的消息发送流程
1,建立connection
2,创建channel
3,利用channel声名队列
4,利用channel向队列发送消息
基本消息队列的消息接收流程
1,建立connection
2,创建channel
3,利用channel声名队列
4,定义consumer的消费行为handleDelivery
5,利用channel将消费者与队列绑定
快速开始
第一步导入依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
第二步编写配置文件
1. spring: 2. rabbitmq: 3. host: 47.99.139.160 #主机 4. port: 5672 #端口号 5. virtual-host: / #虚拟主机 6. username: itcast #用户名 7. password: zhangbo123456* #密码
第三步编写测试方法
1. @Autowired 2. RabbitTemplate rabbitTemplate; 3. 4. @Test 5. void contextLoads() { 6. String queueName = "simple.queue"; 7. String message = "hello , spring amqp"; 8. rabbitTemplate.convertAndSend(queueName,message); 9. }
小注:这个消息不会 创建队列,所以要手动创建队列
第四步在Consumer中编写消费逻辑,监听队列
1. @Component 2. public class SpringRabbitListener { 3. 4. @RabbitListener(queues = "simple.queue") 5. public void listenSimplateQueueMessage(String msg) throws InterruptedException{ 6. System.out.println("spring消费者接收到消息:"+msg); 7. } 8. }
消息预取限制
修改application.yml,设置preFetch这个值,可以控制预取消息的上线
spring:
rabbitmq:
host: 47.99.139.160 #主机
port: 5672 #端口号
virtual-host: / #虚拟主机
username: itcast #用户名
password: zhangbo123456* #密码
listener:
simple:
prefetch: 1 #每次只能获取一条消息,处理完成才能获取下一条消息
发布 订阅
发布订阅模式允许将同一消息发送个多个消费者,实现方式是加入了exchange
常见exchange类型包括
- Fanout:广播
- Direct:路由
- Topic:话题
发布订阅-Fanout Exchange
Fanout Exchange会将接收到的消息路由到每一个跟其绑定的queue(可以用于实现广播模式)
实现思路:
1,在consumer服务中,利用代码声明队列,交换机,并将两者绑定
2,在consumer服务中,编写两个消费者方法,分别监听fanout.queue1和fanout.queue2
3,在publisher中编写测试方法,向itcast.fanout发送消息
步骤一 :在consumer服务声名exchange,queue,binding,在consumer服务声名一个配置类,添加@Configuration注解,并声明FanoutExchange,queue和绑定关系对象binding
1. @Configuration 2. public class FanoutConfig { 3. 4. //声名FanoutChange交换机 5. @Bean 6. public FanoutExchange fanoutExchange(){ 7. return new FanoutExchange("itcast.fanout"); 8. } 9. 10. //声名第一个队列 11. @Bean 12. public Queue fanoutQueue1(){ 13. return new Queue("fanout.queue1"); 14. } 15. 16. //绑定队列一和交换机 17. @Bean 18. public Binding bindingQueue1(Queue fanoutQueue1 , FanoutExchange fanoutExchange){ 19. return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange); 20. } 21. //...略,以相同的方式声名第二个队列,并完成绑定 22. }
consumer代码
1. //fanout 模式 2. @RabbitListener(queues = "fanout.queue1") 3. public void listenFanoutQueueMessage(String msg) throws InterruptedException{ 4. System.out.println("spring消费者接收到fanout.queue1消息:"+msg); 5. } 6. 7. //fanout 模式 8. @RabbitListener(queues = "fanout.queue2") 9. public void listenFanoutQueueMessage2(String msg) throws InterruptedException{ 10. System.out.println("spring消费者接收到fanout.queue2消息:"+msg); 11. }
publisher代码
1. //fanout 模式 2. @Test 3. public void testSendFanoutExchange(){ 4. //交换机名称 5. String exchangeName = "itcast.fanout"; 6. //消息 7. String message = "hello , every one"; 8. //发送消息 9. rabbitTemplate.convertAndSend(exchangeName,"",message); 10. }
总结:
交换机的作用?
1,接收publisher发送的消息
2,将消息按照路由规则路由到与之绑定的队列
3,不能缓存消息,路由失败,消息丢失
4,FanoutExchange的会将消息路由到每个绑定的队列
声名队列,交换机,绑定关系的bean是什么?
- queue
- fanoutExchange
- Binding
发布订阅-DirectExchange
Direct Exchange会将接收到的消息根据规则路由到指定的queue,因此称之为路由模式(routes)
- 每一个Queue都与Exchange设置一个BindingKey
- 发布者发送消息时,指定消息的RoutingKey
- Exchange将消息路由到BindingKey与消息RoutingKey一致的队列
案例实现思路
1,利用@RabbitListener声名Exchange,Queue,RoutingKey
2,zaiconsumer服务中,编写两个消费者方法,分别监听direct.queue和direct.queue2
3,在publisher中编写测试方法,向itcast.direct发送消息
consumer
1. //direct模式 2. @RabbitListener(bindings = @QueueBinding( 3. value = @Queue(name = "direct.queue1"), 4. exchange = @Exchange(name = "itcast.direct",type = ExchangeTypes.DIRECT), 5. key = {"red","blue"} 6. )) 7. public void listenDirectQueue(String msg){ 8. System.out.println("spring消费者接收到direct.queue1消息:"+msg); 9. } 10. 11. //direct模式 12. @RabbitListener(bindings = @QueueBinding( 13. value = @Queue(name = "direct.queue2"), 14. exchange = @Exchange(name = "itcast.direct",type = ExchangeTypes.DIRECT), 15. key = {"red","yellow"} 16. )) 17. public void listenDirectQueue2(String msg){ 18. System.out.println("spring消费者接收到direct.queue2消息:"+msg); 19. }
publisher
1. //direct 模式 2. @Test 3. public void testSendDirectExchange(){ 4. //交换机名称 5. String exchangeName = "itcast.direct"; 6. //消息 7. String message = "hello , smoky"; 8. //发送消息 参数分别是:交换机名称 RoutingKey(暂时为空,路由key),消息 9. rabbitTemplate.convertAndSend(exchangeName,"smoky",message); 10. }
总结:
描述direct交换机和fanout交换机的差异?
fanout交换机将消息发送给每一个与之绑定的队列
directii交换机根据RoutingKey判断路由给那个队列
如果多个队列具有相同的RoutingKey,则与Fanout功能类似
基于@RabbitListener注解声名队列和交换机有哪些常见注解?
@Queue
@Exchange
发布订阅-TopicExchange
TopicExchange与DirectExchange类似,区别在于routingKey必须是多个单词的列表,并且以.分割
Queue与Exchange指定BIndingKey时可以指定通配符
#:代指0个或多个单词
*:代指一个单词
案例实现思路
1,利用@RabbitListener声名Exchange Queue RoutingKey
2,在consumer服务中,编写两个消费者方法,分别监听topic.queue1和topic.queue2
3,在publisher中编写测试方法,向itcast.topic发送消息
consumer
1. //topic模式 2. @RabbitListener(bindings = @QueueBinding( 3. value = @Queue("topic.queue1"), 4. exchange = @Exchange(name = "itcast.topic",type = ExchangeTypes.TOPIC), 5. key = "chain.#" 6. )) 7. public void listenTopictQueue1(String msg){ 8. System.out.println("spring消费者接收到topic.queue1消息:"+msg); 9. } 10. 11. //topic模式 12. @RabbitListener(bindings = @QueueBinding( 13. value = @Queue("topic.queue2"), 14. exchange = @Exchange(name = "itcast.topic",type = ExchangeTypes.TOPIC), 15. key = "*.news" 16. )) 17. public void listenTopictQueue2(String msg){ 18. System.out.println("spring消费者接收到topic.queue2消息:"+msg); 19. }
publisher
1. //direct 模式 2. @Test 3. public void testSendTopictExchange(){ 4. //交换机名称 5. String exchangeName = "itcast.topic"; 6. //消息 7. String message = "今天天气很好呀"; 8. //发送消息 参数分别是:交换机名称 RoutingKey(暂时为空,路由key),消息 9. rabbitTemplate.convertAndSend(exchangeName,"chain.weather",message); 10. }
测试发送Object类型消息,消息转换器
说明:在SpringAMQP的发送方法中,接收到的消息类型是Object,也就是说我们可以发送任意对象类型的消息,SpringAMQP会帮我们序列化为字节后发送,用的jdk的序列化器
补充: 使用jdk的序列化器的缺点:1,性能比较差 2,安全性不好,容易出现注入的问题 3,数据长度长,占用额外内存
测试代码
1. //测试Object类型消息 2. @Test 3. public void sendObjectQueue(){ 4. Map<String,Object> msg = new HashMap<>(); 5. msg.put("name","柳岩"); 6. msg.put("age",21); 7. rabbitTemplate.convertAndSend("object.queue",msg); 8. }
Spring的对消息对象的处理是由import org.springframework.messaging.converter.MessageConverter;来处理的,而默认实现是SimpleMessageConverter,基于JDK的ObjectOutputStream完成序列化
如果要修改只需要定义一个MessageConverter类型的bean即可,推荐使用JSON的方式序列化
引入依赖
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</dependency>
声名一个MessageConverter类型的bean
1. @Bean 2. public MessageConverter jsonMessageConverter(){ 3. return new Jackson2JsonMessageConverter(); 4. }
consumer
引入依赖
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</dependency>
consumer服务定义MessageConverter
1. @Bean 2. public MessageConverter jsonMessageConverter(){ 3. return new Jackson2JsonMessageConverter(); 4. }
SpringAMQP中消息的序列化和反序列化是怎么实现的
- 利用MessageConverter实现的,默认是JDK的序列化
- 注意发送方接收必须使用相同的MessageConverter
MQ的一些常见问题
1,消息可靠性:如何确保发送的消息至少被消费一次
2,延迟消息问题:如何实现消息的延迟投递
3,高可用问题:如何避免单点的MQ故障而导致的不可用问题
4,消息堆积问题:如何解决数百万消息堆积,无法及时消费的问题
消息可靠性问题
消息从生产者发送到exchange,再到queue,再到消费者,有哪些导致消息丢失的可能性?
- 发送时丢失,
- 生产者发送的消息未到达exchange
- 消息到达exchange后未到达queue
- MQ宕机,queue将消息丢失
- consumer接收到消息后未消费就宕机
生产者确认机制
RabbitMq提供了publisher confirm机制避免消息发送到MQ的过程中丢失。消息发送到MQ以后,会返回一个结果给发送者,表示消息是否处理成功,结果有两种请求
- publisher-confirm,发送者确认
- 消息成功投递到交换机返回ack
- 消息未投递到交换机,返回nack
- publisher-return
- 消息投递到交换机了,但是没有路由到队列,返回ACK,及路由失败原因
注意:确认机制发送消息时,需要给每个消息设置一个全局唯一id,以区分不同的消息,避免ack冲突
消费者确认
RabbitMQ支持消费者确认机制,即消费者成功处理消息后可以向MQ发送ack回执,MQ收到ack回执后才会删除该消息,
而SpringAMQP允许配置三种确认模式
- manual:手动ack,需要在业务代码结束后,调用api发送ack
- auto:自动ack,由spring检测listener代码是否出现异常,没有异常则返回ack,抛出异常则返回nack
- none:关闭ack,MQ假定消费者获取消息后会成功处理,因此消息投递后会立即被删除