公众号merlinsea
- 定时关单功能
- 生产者相当于下单接口,当用户下单时,发送一个延迟消息到消息队列中
- 一段时间后延迟消息被消费者监听到[延迟队列特点]
- 消费者通过查询订单是否支付/调用第三方支付的查询订单状态接口来确认订单是否支付,然后决定修改订单状态以及消息是否需要重新入队
- 模型抽取
- 生产者相当于下单服务,下单的时候会发送一个订单号的消息给延迟队列
- 消费者相当于定时任务,当延迟消息被消费者监听到的时候,就会检查这笔订单是否支付成功,如果没有支付,则进行关单。
- 完成配置,即配置好消息队列
- 引入依赖
<!--引入AMQP--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
- 配置rabbitmq的服务端信息以及交换机,队列,路由key信息
#消息队列 rabbitmq: host: 8.129.113.233 port: 5672 virtual-host: / password: password username: admin #开启手动确认消息 listener: simple: acknowledge-mode: manual #自定义消息队列配置,发送锁定库存消息-》延迟exchange-》lock.queue-》死信exchange-》release.queue mqconfig: #延迟队列,不能被监听消费 order_close_delay_queue: order.close.delay.queue #延迟队列的消息过期后转发的队列 order_close_queue: order.close.queue #交换机 order_event_exchange: order.event.exchange #进入延迟队列的路由key order_close_delay_routing_key: order.close.delay.routing.key #消息过期,进入释放队列的key,进入死信队列的key order_close_routing_key: order.close.routing.key #消息过期时间,毫秒,测试改为15秒 ttl: 15000
- 编写配置类,读取rabbitmq的相关信息,交给spring容器管理
@Configuration @Data public class RabbitMQConfig { /** * 交换机 */ @Value("${mqconfig.order_event_exchange}") private String eventExchange; /** * 延迟队列 */ @Value("${mqconfig.order_close_delay_queue}") private String orderCloseDelayQueue; /** * 关单队列 */ @Value("${mqconfig.order_close_queue}") private String orderCloseQueue; /** * 进入延迟队列的路由key */ @Value("${mqconfig.order_close_delay_routing_key}") private String orderCloseDelayRoutingKey; /** * 进入死信队列的路由key */ @Value("${mqconfig.order_close_routing_key}") private String orderCloseRoutingKey; /** * 过期时间 */ @Value("${mqconfig.ttl}") private Integer ttl; /** * 消息转换器 * @return */ @Bean public MessageConverter messageConverter(){ return new Jackson2JsonMessageConverter(); } /** * 创建交换机 Topic类型,也可以用dirct路由 * 一般一个微服务一个交换机 * @return */ @Bean public Exchange orderEventExchange(){ return new TopicExchange(eventExchange,true,false); } /** * 延迟队列 */ @Bean public Queue orderCloseDelayQueue(){ Map<String,Object> args = new HashMap<>(3); args.put("x-dead-letter-exchange",eventExchange); args.put("x-dead-letter-routing-key",orderCloseRoutingKey); args.put("x-message-ttl",ttl); return new Queue(orderCloseDelayQueue,true,false,false,args); } /** * 死信队列,普通队列,用于被监听 */ @Bean public Queue orderCloseQueue(){ return new Queue(orderCloseQueue,true,false,false); } /** * 第一个队列,即延迟队列的绑定关系建立 * @return */ @Bean public Binding orderCloseDelayBinding(){ return new Binding(orderCloseDelayQueue,Binding.DestinationType.QUEUE,eventExchange,orderCloseDelayRoutingKey,null); } /** * 死信队列绑定关系建立 * @return */ @Bean public Binding orderCloseBinding(){ return new Binding(orderCloseQueue,Binding.DestinationType.QUEUE,eventExchange,orderCloseRoutingKey,null); } }
- 定时关单服务生产者端代码设计与实现
- 注意点
- 用户下单以后需要投递一个延迟消息msg到延迟队列中
- 延迟消息中的内容需要包含订单流水号,供消费者去查询订单支付信息
- 延迟消息格式 = 消息id + 订单流水号
@Data public class OrderMessage { /** * 消息id */ private Long messageId; /** * 订单号 */ private String outTradeNo; }
- 下单以后发送延迟消息的代码
//发送延迟消息,用于自动关单 OrderMessage orderMessage = new OrderMessage(); orderMessage.setOutTradeNo(orderOutTradeNo); rabbitTemplate.convertAndSend(rabbitMQConfig.getEventExchange(),rabbitMQConfig.getOrderCloseDelayRoutingKey(),orderMessage);
- 定时关单服务消费者端代码设计与实现
- 查询订单数据库中订单是否存在
- 不存在:则说明消息错误,发送消息ack确认
- 存在: 查询订单数据库中订单是否支付
- 若支付:则说明消息正常已经付款,发送ack确认
- 若未支付: 调用第三方支付平台查询订单接口查询支付状态
- 若支付:则修改订单数据库中的订单状态,并发送ack确认
- 若未支付:则修改订单状态为取消,发送ack确认
- 消费者端的逻辑
@Slf4j @Component @RabbitListener(queues = "${mqconfig.order_close_queue}") public class ProductOrderMQListener { @Autowired private ProductOrderService productOrderService; /** * * 消费重复消息,幂等性保证 * 并发情况下如何保证安全 * * @param orderMessage * @param message * @param channel * @throws IOException */ @RabbitHandler public void closeProductOrder(OrderMessage orderMessage, Message message, Channel channel) throws IOException { log.info("监听到消息:closeProductOrder:{}",orderMessage); long msgTag = message.getMessageProperties().getDeliveryTag(); try{ boolean flag = productOrderService.closeProductOrder(orderMessage); //为true说明订单正常处理以付款,发送ack确认 if(flag){ channel.basicAck(msgTag,false); }else { //为false说明订单异常,重新入队 channel.basicReject(msgTag,true); } }catch (IOException e){ log.error("定时关单失败:",orderMessage); channel.basicReject(msgTag,true); } } }
- 查询订单状态并修改订单数据库的业务逻辑
/** * 定时关单 * @param orderMessage * @return */ @Override public boolean closeProductOrder(OrderMessage orderMessage) { ProductOrderDO productOrderDO = productOrderMapper.selectOne(new QueryWrapper<ProductOrderDO>().eq("out_trade_no",orderMessage.getOutTradeNo())); if(productOrderDO == null){ //订单不存在 log.warn("直接确认消息,订单不存在:{}",orderMessage); return true; } if(productOrderDO.getState().equalsIgnoreCase(ProductOrderStateEnum.PAY.name())){ //已经支付 log.info("直接确认消息,订单已经支付:{}",orderMessage); return true; } //向第三方支付查询订单是否真的未支付(开发了AlipayStrategy以后才完善了这部分代码) PayInfoVO payInfoVO = new PayInfoVO(); payInfoVO.setPayType(productOrderDO.getPayType()); payInfoVO.setOutTradeNo(orderMessage.getOutTradeNo()); String payResult = payFactory.queryPaySuccess(payInfoVO); //结果为空,则未支付成功,本地取消订单 if(StringUtils.isBlank(payResult)){ productOrderMapper.updateOrderPayState(productOrderDO.getOutTradeNo(),ProductOrderStateEnum.CANCEL.name(),ProductOrderStateEnum.NEW.name()); log.info("结果为空,则未支付成功,本地取消订单:{}",orderMessage); return true; }else { //支付成功,主动的把订单状态改成UI就支付,造成该原因的情况可能是支付通道回调有问题 log.warn("支付成功,主动的把订单状态改成已经支付,造成该原因的情况可能是支付通道回调有问题:{}",orderMessage); productOrderMapper.updateOrderPayState(productOrderDO.getOutTradeNo(),ProductOrderStateEnum.PAY.name(),ProductOrderStateEnum.NEW.name()); return true; } }
- 思考:在两个不同的系统之间进行交互的时候,我们首先需要考虑的是这两个系统之间需要设计什么徉的交互协议,即数据传输格式。而设计这种数据传输的格式需要根据这些消息的作用是什么来进行设计!!!
算法训练营永久班上课开始啦,欢迎大家积极报名~
奔跑的小梁,公众号:梁霖编程工具库算法训练营春节价格通知,2023年2月12日