RabbitMQ简介
RabbitMQ是一个由erlang开发的AMQP(Advanved Message Queue Protocol)的开源实现
核心概念
Message
消息,消息是不具名的,它由消息头和消息体组成。消息体是不透明的,而消息头则由一系列的可选属性组成, 这些属性包括routing-key(路由键)、priority(相对于其他消息的优先权)、delivery-mode(指出该消息可 能需要持久性存储)等。
Publisher
消息的生产者,也是一个向交换器发布消息的客户端应用程序。
Exchange类型
交换器,用来接收生产者发送的消息并将这些消息路由给服务器中的队列。
Exchange有4种类型:direct(默认),fanout,topic,和headers,不同类型的Exchange转发消息的策略有所区别
Direct Exchange
消息中的路由键(routing key)如果和 Binding 中的 binding key 一致, 交换器就将消息发到对应的队列中。路由键与队列名完全匹配,如果一个队列绑定到交换机要求路由键为“dog”,则只转发 routingkey 标记为“dog”的消息,不会转发“dog.puppy”,也不会转发“dog.guard” 等等。它是完全匹配、单播的模式。
Fanout Exchange
每个发到 fanout 类型交换器的消息都会分到所有绑定的队列上去。fanout 交换器不处理路由键,只是简单的将队列绑定到交换器上,每个发送到交换器的消息都会被转发到与该交换器绑定的所有队列上。很像子网广播,每台子网内的主机都获得了一份复制的消息。fanout 类型转发消息是最快的。
Topic Exchange
topic 交换器通过模式匹配分配消息的路由键属性,将路由键和某个模式进行匹配,此时队列需要绑定到一个模式上。 它将路由键和绑定键的字符串切分成单词,这些单词之间用点隔开。它同样也会识别两个通配符:符号“”和符号 。匹配0个或多个单词,匹配一个单词。
Queue
消息队列,用来保存消息直到发送给消费者。它是消息的容器,也是消息的终点。一个消息可投入一个或多个队列。消息一直 在队列里面,等待消费者连接到这个队列将其取走。
Binding
绑定,用于消息队列和交换器之间的关联。一个绑定就是基于路由键将交换器和消息队列连接起来的路由规则,所以可以将交 换器理解成一个由绑定构成的路由表。
Exchange和Queue的绑定可以是多对多的关系。
Connection
网络连接,比如一个TCP连接。
Channel
信道,多路复用连接中的一条独立的双向数据流通道。信道是建立在真实的TCP连接内的虚拟连接,AMQP命令都是通过信道 发出去的,不管是发布消息、订阅队列还是接收消息,这些动作都是通过信道完成。因为对于操作系统来说建立和销毁TCP都 是非常昂贵的开销,所以引入了信道的概念,以复用一条TCP连接。
Consumer
消息的消费者,表示一个从消息队列中取得消息的客户端应用程序。
Virtual Host
虚拟主机,表示一批交换器、消息队列和相关对象。虚拟主机是共享相同的身份认证和加 密环境的独立服务器域。每个 vhost 本质上就是一个mini版的RabbitMQ 服务器,拥 有自己的队列、交换器、绑定和权限机制。vhost是AMQP概念的基础,必须在连接时 指定,RabbitMQ 默认的vhost是/。
Broker
表示消息队列服务器实体
Docker安装RabbitMQ
docker run -d --name rabbitmq -p 5671:5671 -p 5672:5672 -p 4369:4369 -p 25672:25672 -p 15671:15671 -p 15672:15672 rabbitmq:management
访问15672端口
https://www.rabbitmq.com/networking.html
SpringCloud整合RabbitMQ
引入RabbitMQ包
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
引入RabbitMQ,RabbitAutoConfiguration就会自动生效
给容器中自动配置了RabbitTemplate、AmqpAdmin等等
配置文件
spring.rabbitmq.host=192.168.195.100 spring.rabbitmq.port=5672 spring.rabbitmq.virtual-host=/
测试类
@Slf4j @SpringBootTest class GulimallOrderApplicationTests { @Autowired AmqpAdmin amqpAdmin; @Autowired RabbitTemplate rabbitTemplate; @Test void sendMessageTest() { //因为存到rabbit中是经过序列化的,所以加上配置转成json发出去 OrderReturnReasonEntity orderReturnReasonEntity=new OrderReturnReasonEntity(); orderReturnReasonEntity.setId(1L); orderReturnReasonEntity.setCreateTime(new Date()); rabbitTemplate.convertAndSend("hello-java-exchange","hello-java",orderReturnReasonEntity); log.info("消息发送成功"); } @Test void createExchange() { //创建了一个Direct类型的交换机 是否持久化 是否自动删除 DirectExchange directExchange=new DirectExchange("hello-java-exchange",true,false); amqpAdmin.declareExchange(directExchange); log.info("Exchange创建成功"); } @Test void createQueue() { Queue queue=new Queue("hello-java-Queue",true,false,false); amqpAdmin.declareQueue(queue); log.info("Queue创建成功"); } @Test void createBinding() { //将exchange指定的交换机和Directnation目的地进行绑定,使用routingkey作为路由键 Binding binding=new Binding("hello-java-Queue", Binding.DestinationType.QUEUE, "hello-java-exchange", "hello-java",null); amqpAdmin.declareBinding(binding); log.info("Binding绑定成功"); } }
先创建交换机,然后创建对队列,绑定路由键,利用rabbitTemplate
发送消息
@RabbitListenter&@RabbitHandler接收消息
@RabbitListenter监听消息
@RabbitListener(queues = {"hello-java-Queue"}) public void recieveMessage(Message message, OrderReturnReasonEntity content, Channel channel) { System.out.println("接收到消息内容:"+message+"内容==》"+content); }
如果有多个客户端,只有一个会收到消息,并且只有当一个消息处理完才会收到下一个消息
如果需要监听一个队列里的多个消息,消息的类型都不一样利用@RabbitHandler
监听hello-java-Queue
队列里不同的消息
@RabbitListener(queues = {"hello-java-Queue"}) @Service("orderItemService") public class OrderItemServiceImpl extends ServiceImpl<OrderItemDao, OrderItemEntity> implements OrderItemService { @RabbitHandler public void recieveMessage(Message message, OrderReturnReasonEntity content) { System.out.println("接收到消息内容:"+message+"内容==》"+content); } @RabbitHandler public void recieveMessage2(OrderEntity orderEntity) { System.out.println("接收到消息内容:"+orderEntity); } }
控制器
@Slf4j @Controller public class RabbitController { @Autowired RabbitTemplate rabbitTemplate; @GetMapping("/sendMq") public String sendMessageTest() { for (int i = 0; i < 10; i++) { if(i%2==0) { //因为存到rabbit中是经过序列化的,所以加上配置转成json发出去 OrderReturnReasonEntity orderReturnReasonEntity=new OrderReturnReasonEntity(); orderReturnReasonEntity.setId(1L); orderReturnReasonEntity.setCreateTime(new Date()); rabbitTemplate.convertAndSend("hello-java-exchange","hello-java",orderReturnReasonEntity); log.info("消息发送成功"); } else { OrderEntity orderEntity=new OrderEntity(); orderEntity.setOrderSn(UUID.randomUUID().toString()); rabbitTemplate.convertAndSend("hello-java-exchange","hello-java",orderEntity); log.info("消息发送成功"); } } return ""; } }
RabbitMQ消息确认机制-可靠抵达
保证消息不丢失,可靠抵达,可以使用事务消息,性能下降250倍,为此引入确认机制
publisher confirmCallback 确认模式
publisher returnCallback 未投递到 queue 退回模式
consumer ack机制
可靠抵达-ConfirmCallback
如果要使用confirmCallback ,需要配置
#开启发送端确认 spring.rabbitmq.publisher-confirm-type=correlated
- 在创建 connectionFactory 的时候设置 PublisherConfirms(true) 选项,开启 confirmcallback 。
- CorrelationData:用来表示当前消息唯一性。
- 生产者只要把消息发送给Broker,消息只要被 broker 接收到就会执行 confirmCallback,如果是 cluster 模式,需要所有 broker 接收到才会调用 confirmCallback。
- 被 broker 接收到只能表示 message 已经到达服务器,并不能保证消息一定会被投递到目标 queue 里。所以需要用到接下来的 returnCallback 。
@PostConstruct public void initRabbitTemplate() { rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() { /* * @param correlationData 当前消息的唯一关联数据(消息的唯一id) * @param ack 消息是否成功收到 * @param cuase 失败的原因 * */ @Override public void confirm(CorrelationData correlationData, boolean ack, String cuase) { System.out.println("confirm... cottrlationData"+correlationData+",[ack]"+ack+",cuase"+cuase); } }); }
可靠抵达-ReturnCallback
开启发送消息抵达队列的确认
spring.rabbitmq.publisher-returns=true #只要抵达队列,以异步发动有限回调我们这个returnconfig spring.rabbitmq.template.mandatory=true
只有当消息没有抵达队列才会触发方法
@PostConstruct public void initRabbitTemplate() { rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() { /* * @param correlationData 当前消息的唯一关联数据(消息的唯一id) * @param ack 消息是否成功收到 * @param cuase 失败的原因 * */ @Override public void confirm(CorrelationData correlationData, boolean ack, String cuase) { System.out.println("confirm... cottrlationData"+correlationData+",[ack]"+ack+",cuase"+cuase); } }); rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() { //投递失败的详细信息 回复的状态码 回复的文本内容 当时这个消息给给哪个交换机 当时消息的路由键 @Override public void returnedMessage(Message message, int replaycode, String replytext, String exchange, String routekey) { System.out.println("Fail...message"+message+",[replaycode]"+replaycode+",[replytext]"+replytext+",[exchange]"+exchange+",[routekey]"+routekey); } }); }
可靠抵达-Ack消息确认机制
在不开启手动确认的时候,发送消息突然服务器关机会导致消息丢失,因此需要开启手动模式保证消息的可达性
#手动确认消息达到 spring.rabbitmq.listener.simple.acknowledge-mode=manual
消费者手动确认模式下 只要没有明确确认消息,就一直是unached状态,即使关机 消息也不会丢失,会重新变为Ready
@RabbitHandler public void recieveMessage(Message message, OrderReturnReasonEntity content, Channel channel) throws IOException { System.out.println("接收到消息内容:"+message+"内容==》"+content); long deliveryTag=message.getMessageProperties().getDeliveryTag(); //签收消息 channel.basicAck(deliveryTag,false);//true就是重新发回服务器 System.out.println("消息签收"+deliveryTag); } @RabbitHandler public void recieveMessage2(Message message,OrderEntity orderEntity,Channel channel) throws IOException { System.out.println("接收到消息内容:"+orderEntity); long deliveryTag=message.getMessageProperties().getDeliveryTag(); //签收消息 channel.basicNack(deliveryTag,false,true);// 退货 true就是重新发回服务器 System.out.println("没有签收"+deliveryTag); }
消息处理成功,ack(),接受下一个消息,此消息broker就会移除
消息处理失败,nack()/reject(),重新发送给其他人进行处理,或者容错处理后ack
消息一直没有调用ack/nack方法,broker认为此消息正在被处理,不会投递给别人,此时客户端断开,消息不会被broker移除,会投递给别人
如何签收
channel.basicAck(deliveryTag,false) 签收 channel.basicNack(deliveryTag,false,true); 拒签