应用场景:
- 异步处理:如果用户注册后需要给用户发送邮件和短信,那么就可以使用消息队列中间件进行异步发送,因为发送邮件和发送短信这两个操作没有任何的关联顺序。
- 应用解耦:例如订单系统调用库存系统的时候,可以加个中间件做到应用解耦。
- 流量控制:流量控制也叫流量削峰,是说当高并发的时候或者是秒杀的场景下,可以将流量都放到消息队列中,然后处理秒杀的业务再以此进行处理,避免高并发下服务宕机。
消息中间件的模式:
- 点对点发布模式:消息发送者发布消息到消息代理,消息代理将消息放入队列,消息接收者从队列中获取消息,之后队列移除消息,消息可以有多个接收者进行处理
- 发布订阅模式:消息发送者发送到主题,多个消息接收者对这个主题进行监听,那么就会在消息到达的时候同时受到消息
概念
- Publisher生产者,会与代理服务器Broker建立一条长连接,并且开辟出很多信道Chanel,通过这些Chanel将消息发送给Broker
- Message 消息(头,路由键 + 体)
- Broker:代理服务器(有多个交换机多个队列)
- VHost:用于做隔离,例如隔离生不同语言之间的消息管理或者是生产环境和开发环境之间的隔离
- Exchange 交换机,负责接收消息,交换机和队列之间有绑定关系Binding,交换机根据消息头中的路由键转发到相应的队列里
- Exchange 交换机,负责接收消息,交换机和队列之间有绑定关系Binding,交换机根据消息头中的路由键转发到相应的队列里
- Exchange 交换机,负责接收消息,交换机和队列之间有绑定关系Binding,交换机根据消息头中的路由键转发到相应的队列里
- Queue队列,存储消息
- Queue队列,存储消息
- Queue队列,存储消息
- Consumer消费者,一个消费者也可以与代理服务器Broker建立一条长连接,并开辟很多Chanel,这些Chanel可以监听队列中的消息
外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传
安装
[root@localhost ~]# docker run -d --name rabbitmq -p 5671:5671 -p 5672:5672 -p 4369:4369 -p 25672:25672 -p 15671:15671 -p 15672:15672 rabbitmq:management
[root@localhost ~]# docker update rabbitmq --restart=always rabbitmq
交换机类型:
- Direct 直连,交换机会根据路由键精确匹配到一个队列上
- fanout广播,交换机会给已经绑定的所有队列群发消息
- topic 根据模糊匹配规则#和*进行路由匹配发送给队列消息,#代表多个字符, * 代表必须要有一个字符
SpringBoot整合RabbitMQ
引入依赖
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
在主启动类中开启@EnableRabbit
配置yml文件
spring: rabbitmq: port: 5672 host: 192.168.100.10 virtual-host: / publisher-confirms: true #开启手动确认机制 publisher-returns: true template: mandatory: true listener: simple: acknowledge-mode: manual
配置mq配置类,不选择的话是使用的jdk虚拟化而是使用JackSon的虚拟化,保证在传输对象到队列的时候显示json数据
@Configuration public class MyRabbitConfig { @Bean public MessageConverter messageConverter(){ return new Jackson2JsonMessageConverter(); } }
测试,创建exchange、binding、queue、message
@Slf4j @RunWith(SpringRunner.class) @SpringBootTest public class GulimallOrderApplicationTests { @Autowired AmqpAdmin amqpAdmin; @Autowired RabbitTemplate rabbitTemplate; @Test public void sendMsg() { String msg = "hello world"; rabbitTemplate.convertAndSend("hello.java.exchange","hello.java.exchange",msg); log.info("发送的消息是{}",msg); } @Test public void createExchange() { amqpAdmin.declareExchange(new DirectExchange("hello.java.exchange",true,false)); log.info("exchange创建成功hello.java.exchange"); } @Test public void createQueue() { Queue queue = new Queue("hello.java.queue",true,false,false); amqpAdmin.declareQueue(queue); log.info("queue创建成功hello.java.queue"); } @Test public void createBinding() { Binding binding = new Binding("hello.java.queue", Binding.DestinationType.QUEUE, "hello.java.exchange", "hello.java.exchange", null ); amqpAdmin.declareBinding(binding); log.info("binding成功"); }
两个重要的注解:
- @RabbitListener(queues = {“hello.java.queue”})这个注解可以标识在类和方法中
- @RabbitHandler这个注解只能放在方法上,并且搭配@RabbitListener(queues = {“hello.java.queue”})作为类注解来使用,使用场景是当消息中有不同类型的消息的时候,我们需要让不同的方法接收不同的消息进行处理,这时候我们可是用@RabbitHandler作为方法注解来重载不同的方法,来解决这个问题
消息确认机制
保证消息不丢失,可靠抵达,可以使用事务机制,但是性能会下降250倍,为此可以引入确认机制
外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传
消息确认机制配置
# RabbitMQ配置 spring.rabbitmq.host=192.168.77.130 spring.rabbitmq.port=5672 # 虚拟主机配置 spring.rabbitmq.virtual-host=/ # 开启发送端消息抵达Broker确认 spring.rabbitmq.publisher-confirms=true # 开启发送端消息抵达Queue确认 spring.rabbitmq.publisher-returns=true # 只要消息抵达Queue,就会异步发送优先回调returnfirm spring.rabbitmq.template.mandatory=true # 手动ack消息,不使用默认的消费端确认 spring.rabbitmq.listener.simple.acknowledge-mode=manual
@Configuration public class MyRabbitConfig { private RabbitTemplate rabbitTemplate; @Primary @Bean public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) { RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory); this.rabbitTemplate = rabbitTemplate; rabbitTemplate.setMessageConverter(messageConverter()); initRabbitTemplate(); return rabbitTemplate; } @Bean public MessageConverter messageConverter() { return new Jackson2JsonMessageConverter(); } /** * 定制RabbitTemplate * 1、服务收到消息就会回调 * 1、spring.rabbitmq.publisher-confirms: true * 2、设置确认回调 * 2、消息正确抵达队列就会进行回调 * 1、spring.rabbitmq.publisher-returns: true * spring.rabbitmq.template.mandatory: true * 2、设置确认回调ReturnCallback * * 3、消费端确认(保证每个消息都被正确消费,此时才可以broker删除这个消息) * */ // @PostConstruct //MyRabbitConfig对象创建完成以后,执行这个方法 public void initRabbitTemplate() { /** * 1、只要消息抵达Broker就ack=true * correlationData:当前消息的唯一关联数据(这个是消息的唯一id) * ack:消息是否成功收到 * cause:失败的原因 */ //设置确认回调 rabbitTemplate.setConfirmCallback((correlationData,ack,cause) -> { System.out.println("confirm...correlationData["+correlationData+"]==>ack:["+ack+"]==>cause:["+cause+"]"); }); /** * 只要消息没有投递给指定的队列,就触发这个失败回调 * message:投递失败的消息详细信息 * replyCode:回复的状态码 * replyText:回复的文本内容 * exchange:当时这个消息发给哪个交换机 * routingKey:当时这个消息用哪个路邮键 */ rabbitTemplate.setReturnCallback((message,replyCode,replyText,exchange,routingKey) -> { System.out.println("Fail Message["+message+"]==>replyCode["+replyCode+"]" + "==>replyText["+replyText+"]==>exchange["+exchange+"]==>routingKey["+routingKey+"]"); }); } }
创建一个订单的延迟队列和普通队列并设置延迟队列的路由键,当消息过期以后路由到取消队列
@Configuration public class MyRabbitConfig { /** * 使用JSON序列化机制,进行消息转换 * @return */ @Bean public MessageConverter messageConverter() { return new Jackson2JsonMessageConverter(); } // @RabbitListener(queues = "stock.release.stock.queue") // public void handle(Message message) { // // } /** * 库存服务默认的交换机 * @return */ @Bean public Exchange stockEventExchange() { //String name, boolean durable, boolean autoDelete, Map<String, Object> arguments TopicExchange topicExchange = new TopicExchange("stock-event-exchange", true, false); return topicExchange; } /** * 普通队列 * @return */ @Bean public Queue stockReleaseStockQueue() { //String name, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments Queue queue = new Queue("stock.release.stock.queue", true, false, false); return queue; } /** * 延迟队列 * @return */ @Bean public Queue stockDelay() { HashMap<String, Object> arguments = new HashMap<>(); arguments.put("x-dead-letter-exchange", "stock-event-exchange"); arguments.put("x-dead-letter-routing-key", "stock.release"); // 消息过期时间 2分钟 arguments.put("x-message-ttl", 120000); Queue queue = new Queue("stock.delay.queue", true, false, false,arguments); return queue; } /** * 交换机与普通队列绑定 * @return */ @Bean public Binding stockLocked() { //String destination, DestinationType destinationType, String exchange, String routingKey, // Map<String, Object> arguments Binding binding = new Binding("stock.release.stock.queue", Binding.DestinationType.QUEUE, "stock-event-exchange", "stock.release.#", null); return binding; } /** * 交换机与延迟队列绑定 * @return */ @Bean public Binding stockLockedBinding() { return new Binding("stock.delay.queue", Binding.DestinationType.QUEUE, "stock-event-exchange", "stock.locked", null); } }
向指定交换机和指定队列发送消息
rabbitTemplate.convertAndSend("stock-event-exchange","stock.release",orderEntityTo);
创建监听器监听发来的消息
@Service @RabbitListener(queues = "order.release.order.queue") public class OrderCloseListener { @Autowired OrderService orderService; @RabbitHandler public void orderClose(OrderEntity orderEntity, Channel channel, Message message) throws IOException { try { orderService.orderClose(orderEntity); channel.basicAck(message.getMessageProperties().getDeliveryTag(),false); }catch (Exception e){ channel.basicReject(message.getMessageProperties().getDeliveryTag(),true); } } }