使用场景
对于Mq中可能产生堆积消息的情景来说,如果消息都是不需要实时处理的,那可以等待消费者慢慢的去处理就行了,然后实际使用中我们会遇到这样一种情况,就是mq队列堆积消息很多,但是每条消息处理的有的很慢有的很快,所以此时这种场景就比较适合使用mq的优先级来保证消息的提前消费,让处理慢的消息最后消费
下面使用java链接mq演示一下消息的处理
初始化优先级队列
# 交换机 public static final String EXCHANGE_TEST = "test";# 路由key public static final String PRIORITY_ROUTE_KEY_TEST = "priority.test";# 队列 public static final String QUEUE_PRIORITY = "queue.priority.1"; @Bean public TopicExchange testExchange() { return new TopicExchange(EXCHANGE_TEST); } # 声名优先级队列,制定最大优先级10 @Bean public Queue priority_Queue() { Map<String, Object> map = new HashMap<String, Object>(1); map.put("x-max-priority", 10); return new Queue(QUEUE_PRIORITY, true, false, false, map); } # 队列与交换机的绑定 @Bean public Binding myBindingPriority() { return new Binding(QUEUE_PRIORITY, Binding.DestinationType.QUEUE, EXCHANGE_TEST, PRIORITY_ROUTE_KEY, null); }
消息生产者代码
private RabbitTemplate rabbitTemplate; public void sendPriorityMsg(String exchange, String routeKey, Object msg, Integer priority) { MessagePostProcessor messagePostProcessor = message -> { MessageProperties messageProperties = message.getMessageProperties(); //设置编码 messageProperties.setContentEncoding("utf-8"); //设置过期时间10*1000毫秒 messageProperties.setPriority(priority); return message; }; rabbitTemplate.convertAndSend(exchange, routeKey, msg, messagePostProcessor); }
发送消息
public String sendMsg() { // 发送了两个优先级的任务 msgProducer.sendPriorityMsg(RabbitConfig.EXCHANGE_TEST, RabbitConfig.PRIORITY_ROUTE_KEY, "测试优先级任务1", 1); msgProducer.sendPriorityMsg(RabbitConfig.EXCHANGE_TEST, RabbitConfig.PRIORITY_ROUTE_KEY, "测试优先级任务1-2", 1); msgProducer.sendPriorityMsg(RabbitConfig.EXCHANGE_TEST, RabbitConfig.PRIORITY_ROUTE_KEY, "测试优先级任务1-3", 1); msgProducer.sendPriorityMsg(RabbitConfig.EXCHANGE_TEST, RabbitConfig.PRIORITY_ROUTE_KEY, "测试优先级任务1-4", 1); msgProducer.sendPriorityMsg(RabbitConfig.EXCHANGE_TEST, RabbitConfig.PRIORITY_ROUTE_KEY, "测试优先级任务1-5", 1); msgProducer.sendPriorityMsg(RabbitConfig.EXCHANGE_TEST, RabbitConfig.PRIORITY_ROUTE_KEY, "测试优先级任务2-5", 5); msgProducer.sendPriorityMsg(RabbitConfig.EXCHANGE_TEST, RabbitConfig.PRIORITY_ROUTE_KEY, "测试优先级任务2-4", 5); msgProducer.sendPriorityMsg(RabbitConfig.EXCHANGE_TEST, RabbitConfig.PRIORITY_ROUTE_KEY, "测试优先级任务2-3", 5); msgProducer.sendPriorityMsg(RabbitConfig.EXCHANGE_TEST, RabbitConfig.PRIORITY_ROUTE_KEY, "测试优先级任务2-2", 5); msgProducer.sendPriorityMsg(RabbitConfig.EXCHANGE_TEST, RabbitConfig.PRIORITY_ROUTE_KEY, "测试优先级任务2-1", 5); return "success"; }
启动程序发送消息
此时rabbitmq的管理页面可以看到队列已经创建成功
消费者也是有一个,然后与test交换机进行了绑定
下面开始发送消息,因为我们目前没有订阅队列,也就是和图中类似没有消费者产生,所以现在队列中堆积了10条消息
添加消费者
@RabbitListener(queues = RabbitConfig.QUEUE_PRIORITY) @RabbitHandler public void priority(String content) { log.info("交换机 test 的消息:" + content); }
此时重新启动程序,消费者消息会自动接受处理了,打印结果如下
此时我们可以明显的看到优先级5的消息优先被处理了,所以到此为止,一个简单的mq优先级消息队列就结束了
步骤
1:创建交换机,初始化指定最大优先级的队列
2: 创建生产者发送消息的优先级指定信息
3: 创建消费者监听
相关源码已经提交github或者点击原文
https://github.com/TianPuJun/springboot-demo/tree/master/springboot-rabbitmq