第一步:使用之前先安装好RabbitMQ,建议安装在linux系统下
安装配置RabbitMQ:https://blog.csdn.net/qq_33450681/article/details/85339315
第二步:在配置文件下配置
rabbitmq: host: 192.168.0.100 port: 5672 virtual-host: /mall username: mall password: mall publisher-confirms: true #如果对异步消息需要回调必须设置为true
浏览器访问http://192.168.0.100:15672/#/
第三步:业务中使用发送消息
@Autowired private OmsOrderSettingMapper orderSettingMapper; @Autowired private AmqpTemplate amqpTemplate; /** * 发送检查支付结果的消息队列 * @param orderSn * @param count */ @Override public void sendDelayPaymentCheck(String orderSn, int count) { //获取订单超时时间 OmsOrderSetting orderSetting = orderSettingMapper.selectByPrimaryKey(1L); long delayTimes = orderSetting.getNormalOrderOvertime() * 60 * 1000; //将需要发送的数据封装到hashmap中 HashMap<Object, Object> hashMap = new HashMap<>(); hashMap.put("out_trade_no",orderSn); hashMap.put("count",count); //给延迟队列发送消息 amqpTemplate.convertAndSend(QueueEnum.QUEUE_PAY_CANCEL.getExchange(), QueueEnum.QUEUE_PAY_CANCEL.getRouteKey(), hashMap, new MessagePostProcessor() { @Override public Message postProcessMessage(Message message) throws AmqpException { //给消息设置延迟毫秒值 message.getMessageProperties().setExpiration(String.valueOf(delayTimes)); return message; } }); }
第四步:定义QueueEnum枚举
/** * 支付通知队列 */ QUEUE_PAY_CANCEL("mall.pay.direct","mall.pay.cancel","mall.pay.cancel") /** * 交换名称 */ private String exchange; /** * 队列名称 */ private String name; /** * 路由键 */ private String routeKey; QueueEnum(String exchange, String name, String routeKey) { this.exchange = exchange; this.name = name; this.routeKey = routeKey; } public String getExchange() { return exchange; } public String getName() { return name; } public String getRouteKey() { return routeKey; }
第五步:配置
RabbitMQ参数配置:
使用一个RabbitMQ需要配置以下几个重要的参数
1.虚拟主机名称(Virtual host name),这个参数不是真正的IP地址或者域名,它是RabbitMQ内部的一个虚拟主机,就像是电脑安装了N台虚拟机,对外的名称一般是“/xxxx".
2.交换机名(Exchanges name):顾名思义,就是把生产者送来的消息来进行分发给下游的多个消费者,相当一个内部软交换机。交换机的类型有fanout,direct,topic,header,fanout类型类似以太网交换机的广播模式,把送来的消息给每个下游队列。direct类似单播(使用routingkey来指定目的队列),topic交换机类似组播,把消息传递给下面同一主题的队列,header交换机则忽略掉routingkey,使用hash数据结构来进行匹配和转发。
3.routingkey :前面讲过了,交换机在进行消息转发时候,要使用routingkey为关键字进行转发。
4.队列名称:可以为不同的消费者指定不同的队列,可以对消息进行分类到不同的队列进行转发。
配置类
/** * 消息队列配置 * Created by macro on 2018/9/14. */ @Configuration public class RabbitMqConfig { /** * 支付队列 * @return */ @Bean public Queue payQueue() { return new Queue(QueueEnum.QUEUE_PAY_CANCEL.getName()); } /** * 绑定支付交互机 * @return */ @Bean DirectExchange payDirect() { return (DirectExchange) ExchangeBuilder .directExchange(QueueEnum.QUEUE_PAY_CANCEL.getExchange()) .durable(true) .build(); } /** * 将支付队列绑定到支付交互机 * @param payDirect * @param payQueue * @return */ @Bean Binding payBinding(DirectExchange payDirect,Queue payQueue){ return BindingBuilder .bind(payQueue) .to(payDirect) .with(QueueEnum.QUEUE_PAY_CANCEL.getRouteKey()); }
第六步:处理支付信息
package com.macro.mall.portal.component; import com.macro.mall.model.PaymentInfo; import com.macro.mall.portal.service.PaymentService; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import java.util.HashMap; /** * 支付的处理者 */ @Component @RabbitListener(queues = "mall.pay.cancel") public class PayReceiver { @Autowired PaymentService paymentService; @RabbitHandler public void handle(HashMap mapMessage){ try { String outTradeNo = mapMessage.get("out_trade_no").toString(); int count = Integer.parseInt(mapMessage.get("count").toString()); // 如果没有支付成功,再次发送延迟检查队列 if (count > 0) { // 进行支付状态检查 System.out.println("正在进行第" + (6 - count) + "支付结果次检查"); //调用alipayClient接口,根据out_trade_no查询支付信息 PaymentInfo paymentInfo = paymentService.checkPaymentResult(outTradeNo); Thread thread = new Thread(); thread.start(); Thread.sleep(10000); //判断是否已经支付成功 if (paymentInfo.getPaymentStatus()!=null&&(paymentInfo.getPaymentStatus().equals("TRADE_SUCCESS") || paymentInfo.getPaymentStatus().equals("TRADE_FINISHED"))) { // 交易成功或者失败,记录交易状态 System.out.println("检查交易结果成功,记录交易状态。。。");// 修改支付的状态信息 // 修改支付信息 boolean b = paymentService.checkPaymentStatus(outTradeNo); if(!b){ //修改为已支付 paymentService.updatePayment(paymentInfo.getCallbackContent(),outTradeNo,paymentInfo.getAlipayTradeNo()); // 发送系统消息,出发并发商品支付业务消息队列 paymentService.sendPaymentSuccess(paymentInfo.getOutTradeNo(),paymentInfo.getPaymentStatus(),paymentInfo.getAlipayTradeNo()); } } else {//未支付 // 再次进行延迟检查 System.out.println("正在进行第" + (6 - count) + "支付结果次检查,检查用户尚未付款成功,继续巡检"); paymentService.sendDelayPaymentCheck(outTradeNo, count - 1); } } else { System.out.println("支付结果次检查次数耗尽,支付未果。。。"); } } catch (Exception e) { } } }