定时关单

简介: 定时关单

公众号merlinsea


  • 定时关单功能
  • 生产者相当于下单接口,当用户下单时,发送一个延迟消息到消息队列中
  • 一段时间后延迟消息被消费者监听到[延迟队列特点]
  • 消费者通过查询订单是否支付/调用第三方支付的查询订单状态接口来确认订单是否支付,然后决定修改订单状态以及消息是否需要重新入队


  • 模型抽取
  • 生产者相当于下单服务,下单的时候会发送一个订单号的消息给延迟队列
  • 消费者相当于定时任务,当延迟消息被消费者监听到的时候,就会检查这笔订单是否支付成功,如果没有支付,则进行关单。

640.png640.png


  • 完成配置,即配置好消息队列
  • 引入依赖
<!--引入AMQP-->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>


  • 配置rabbitmq的服务端信息以及交换机,队列,路由key信息
#消息队列
rabbitmq:
  host: 8.129.113.233
  port: 5672
  virtual-host: /
  password: password
  username: admin
  #开启手动确认消息
  listener:
    simple:
      acknowledge-mode: manual
 #自定义消息队列配置,发送锁定库存消息-》延迟exchange-》lock.queue-》死信exchange-》release.queue
mqconfig:
  #延迟队列,不能被监听消费
  order_close_delay_queue: order.close.delay.queue
  #延迟队列的消息过期后转发的队列
  order_close_queue: order.close.queue
  #交换机
  order_event_exchange: order.event.exchange
  #进入延迟队列的路由key
  order_close_delay_routing_key: order.close.delay.routing.key
  #消息过期,进入释放队列的key,进入死信队列的key
  order_close_routing_key: order.close.routing.key
  #消息过期时间,毫秒,测试改为15秒
  ttl: 15000


  • 编写配置类,读取rabbitmq的相关信息,交给spring容器管理
@Configuration
@Data
public class RabbitMQConfig {
    /**
     * 交换机
     */
    @Value("${mqconfig.order_event_exchange}")
    private String eventExchange;
    /**
     * 延迟队列
     */
    @Value("${mqconfig.order_close_delay_queue}")
    private String orderCloseDelayQueue;
    /**
     * 关单队列
     */
    @Value("${mqconfig.order_close_queue}")
    private String orderCloseQueue;
    /**
     * 进入延迟队列的路由key
     */
    @Value("${mqconfig.order_close_delay_routing_key}")
    private String orderCloseDelayRoutingKey;
    /**
     * 进入死信队列的路由key
     */
    @Value("${mqconfig.order_close_routing_key}")
    private String orderCloseRoutingKey;
    /**
     * 过期时间
     */
    @Value("${mqconfig.ttl}")
    private Integer ttl;
    /**
     * 消息转换器
     * @return
     */
    @Bean
    public MessageConverter messageConverter(){
        return new Jackson2JsonMessageConverter();
    }
    /**
     * 创建交换机 Topic类型,也可以用dirct路由
     * 一般一个微服务一个交换机
     * @return
     */
    @Bean
    public Exchange orderEventExchange(){
        return new TopicExchange(eventExchange,true,false);
    }
    /**
     * 延迟队列
     */
    @Bean
    public Queue orderCloseDelayQueue(){
        Map<String,Object> args = new HashMap<>(3);
        args.put("x-dead-letter-exchange",eventExchange);
        args.put("x-dead-letter-routing-key",orderCloseRoutingKey);
        args.put("x-message-ttl",ttl);
        return new Queue(orderCloseDelayQueue,true,false,false,args);
    }
    /**
     * 死信队列,普通队列,用于被监听
     */
    @Bean
    public Queue orderCloseQueue(){
        return new Queue(orderCloseQueue,true,false,false);
    }
    /**
     * 第一个队列,即延迟队列的绑定关系建立
     * @return
     */
    @Bean
    public Binding orderCloseDelayBinding(){
        return new Binding(orderCloseDelayQueue,Binding.DestinationType.QUEUE,eventExchange,orderCloseDelayRoutingKey,null);
    }
    /**
     * 死信队列绑定关系建立
     * @return
     */
    @Bean
    public Binding orderCloseBinding(){
        return new Binding(orderCloseQueue,Binding.DestinationType.QUEUE,eventExchange,orderCloseRoutingKey,null);
    }
}


  • 定时关单服务生产者端代码设计与实现
  • 注意点
  • 用户下单以后需要投递一个延迟消息msg到延迟队列中
  • 延迟消息中的内容需要包含订单流水号,供消费者去查询订单支付信息


640.jpg


  • 延迟消息格式 = 消息id + 订单流水号


@Data
public class OrderMessage {
    /**
     * 消息id
     */
    private Long messageId;
    /**
     * 订单号
     */
    private String outTradeNo;
}


  • 下单以后发送延迟消息的代码
//发送延迟消息,用于自动关单
OrderMessage orderMessage = new OrderMessage();
orderMessage.setOutTradeNo(orderOutTradeNo);
rabbitTemplate.convertAndSend(rabbitMQConfig.getEventExchange(),rabbitMQConfig.getOrderCloseDelayRoutingKey(),orderMessage);


  • 定时关单服务消费者端代码设计与实现
  • 查询订单数据库中订单是否存在
  • 不存在:则说明消息错误,发送消息ack确认
  • 存在: 查询订单数据库中订单是否支付
  • 若支付:则说明消息正常已经付款,发送ack确认
  • 若未支付: 调用第三方支付平台查询订单接口查询支付状态
  • 若支付:则修改订单数据库中的订单状态,并发送ack确认
  • 若未支付:则修改订单状态为取消,发送ack确认

640.jpg

  • 消费者端的逻辑


@Slf4j
@Component
@RabbitListener(queues = "${mqconfig.order_close_queue}")
public class ProductOrderMQListener {
    @Autowired
    private ProductOrderService productOrderService;
    /**
     *
     * 消费重复消息,幂等性保证
     * 并发情况下如何保证安全
     *
     * @param orderMessage
     * @param message
     * @param channel
     * @throws IOException
     */
    @RabbitHandler
    public void closeProductOrder(OrderMessage orderMessage, Message message, Channel channel) throws IOException {
        log.info("监听到消息:closeProductOrder:{}",orderMessage);
        long msgTag = message.getMessageProperties().getDeliveryTag();
        try{
            boolean flag = productOrderService.closeProductOrder(orderMessage);
            //为true说明订单正常处理以付款,发送ack确认
            if(flag){
                channel.basicAck(msgTag,false);
            }else {
                //为false说明订单异常,重新入队
                channel.basicReject(msgTag,true);
            }
        }catch (IOException e){
            log.error("定时关单失败:",orderMessage);
            channel.basicReject(msgTag,true);
        }
    }
}


  • 查询订单状态并修改订单数据库的业务逻辑
/**
 * 定时关单
 * @param orderMessage
 * @return
 */
@Override
public boolean closeProductOrder(OrderMessage orderMessage) {
    ProductOrderDO productOrderDO = productOrderMapper.selectOne(new QueryWrapper<ProductOrderDO>().eq("out_trade_no",orderMessage.getOutTradeNo()));
    if(productOrderDO == null){
        //订单不存在
        log.warn("直接确认消息,订单不存在:{}",orderMessage);
        return true;
    }
    if(productOrderDO.getState().equalsIgnoreCase(ProductOrderStateEnum.PAY.name())){
        //已经支付
        log.info("直接确认消息,订单已经支付:{}",orderMessage);
        return true;
    }
    //向第三方支付查询订单是否真的未支付(开发了AlipayStrategy以后才完善了这部分代码)
    PayInfoVO payInfoVO = new PayInfoVO();
    payInfoVO.setPayType(productOrderDO.getPayType());
    payInfoVO.setOutTradeNo(orderMessage.getOutTradeNo());
    String payResult = payFactory.queryPaySuccess(payInfoVO);
    //结果为空,则未支付成功,本地取消订单
    if(StringUtils.isBlank(payResult)){
        productOrderMapper.updateOrderPayState(productOrderDO.getOutTradeNo(),ProductOrderStateEnum.CANCEL.name(),ProductOrderStateEnum.NEW.name());
        log.info("结果为空,则未支付成功,本地取消订单:{}",orderMessage);
        return true;
    }else {
        //支付成功,主动的把订单状态改成UI就支付,造成该原因的情况可能是支付通道回调有问题
        log.warn("支付成功,主动的把订单状态改成已经支付,造成该原因的情况可能是支付通道回调有问题:{}",orderMessage);
        productOrderMapper.updateOrderPayState(productOrderDO.getOutTradeNo(),ProductOrderStateEnum.PAY.name(),ProductOrderStateEnum.NEW.name());
        return true;
    }
}


  • 思考:在两个不同的系统之间进行交互的时候,我们首先需要考虑的是这两个系统之间需要设计什么徉的交互协议,即数据传输格式。而设计这种数据传输的格式需要根据这些消息的作用是什么来进行设计!!!


算法训练营永久班上课开始啦,欢迎大家积极报名~

奔跑的小梁,公众号:梁霖编程工具库算法训练营春节价格通知,2023年2月12日
相关实践学习
深入解析Docker容器化技术
Docker是一个开源的应用容器引擎,让开发者可以打包他们的应用以及依赖包到一个可移植的容器中,然后发布到任何流行的Linux机器上,也可以实现虚拟化,容器是完全使用沙箱机制,相互之间不会有任何接口。Docker是世界领先的软件容器平台。开发人员利用Docker可以消除协作编码时“在我的机器上可正常工作”的问题。运维人员利用Docker可以在隔离容器中并行运行和管理应用,获得更好的计算密度。企业利用Docker可以构建敏捷的软件交付管道,以更快的速度、更高的安全性和可靠的信誉为Linux和Windows Server应用发布新功能。 在本套课程中,我们将全面的讲解Docker技术栈,从环境安装到容器、镜像操作以及生产环境如何部署开发的微服务应用。本课程由黑马程序员提供。 &nbsp; &nbsp; 相关的阿里云产品:容器服务 ACK 容器服务 Kubernetes 版(简称 ACK)提供高性能可伸缩的容器应用管理能力,支持企业级容器化应用的全生命周期管理。整合阿里云虚拟化、存储、网络和安全能力,打造云端最佳容器化应用运行环境。 了解产品详情: https://www.aliyun.com/product/kubernetes
相关文章
|
缓存 NoSQL Java
分布式锁有哪些应用场景和实现?
电商网站都会遇到秒杀、特价之类的活动,大促活动有一个共同特点就是访问量激增,在高并发下会出现成千上万人抢购一个商品的场景。虽然在系统设计时会通过限流、异步、排队等方式优化,但整体的并发还是平时的数倍以上,参加活动的商品一般都是限量库存,如何防止库存超卖,避免并发问题呢?分布式锁就是一个解决方案。
790 0
|
JavaScript C# 开发工具
22款Visual Studio Code实用插件推荐
Visual Studio Code是一个轻量级但功能强大的源代码编辑器,轻量级指的是下载下来的Visual Studio Code其实就是一个简单的编辑器,强大指的是支持多种语言的环境插件拓展,也正是因为这种支持插件式安装环境开发让Visual Studio Code成为了开发语言工具中的霸主,让其同时支持开发多种语言成为了可能。俗话说的好:“工欲善其事,必先利其器”,安装一些实用插件对自己日常的开发和工作效率能够大大的提升,避免996从选一款好的开发插件开始。以下是我整理的一些比较实用的Visual Studio Code插件希望对大家有用,大家有更好的插件推荐可在文末留言🤞。
500 0
|
开发者 iOS开发
iOS 源码分析(三):MLeaksFinder
iOS 源码分析(三):MLeaksFinder
858 0
iOS 源码分析(三):MLeaksFinder
|
10月前
|
消息中间件 存储 Kafka
RocketMQ 工作原理图解,看这篇就够了!
本文详细解析了 RocketMQ 的核心架构、消息领域模型、关键特性和应用场景,帮助深入理解消息中间件的工作原理。关注【mikechen的互联网架构】,10年+BAT架构经验倾囊相授。
RocketMQ 工作原理图解,看这篇就够了!
|
10月前
|
消息中间件 存储 Java
吃透 RocketMQ 消息中间件,看这篇就够了!
本文详细介绍 RocketMQ 的五大要点、核心特性及应用场景,涵盖高并发业务场景下的消息中间件关键知识点。关注【mikechen的互联网架构】,10年+BAT架构经验倾囊相授。
吃透 RocketMQ 消息中间件,看这篇就够了!
|
消息中间件 中间件 程序员
分布式事务大揭秘:使用MQ实现最终一致性
本文由小米分享,介绍分布式事务中的MQ最终一致性实现,以RocketMQ为例。RocketMQ的事务消息机制包括准备消息、本地事务执行、确认/回滚消息及事务状态检查四个步骤。这种机制通过消息队列协调多系统操作,确保数据最终一致。MQ最终一致性具有系统解耦、提高可用性和灵活事务管理等优点,广泛应用于分布式系统中。文章还讨论了RocketMQ的事务消息处理流程和失败情况下的处理策略,帮助读者理解如何在实际应用中解决分布式事务问题。
1170 6
|
存储 JavaScript 前端开发
不要滥用Pinia和Redux了!多组件之间交互可以手写一个调度器!
【8月更文挑战第24天】不要滥用Pinia和Redux了!多组件之间交互可以手写一个调度器!
212 2
不要滥用Pinia和Redux了!多组件之间交互可以手写一个调度器!
|
存储 算法 大数据
程序员的职业素养:技术追求与人文关怀的结合
程序员的职业素养:技术追求与人文关怀的结合
246 2
程序员的职业素养:技术追求与人文关怀的结合
|
消息中间件 Java Kafka
【消息中心】kafka消费失败重试10次的问题
【消息中心】kafka消费失败重试10次的问题
1621 0
|
自然语言处理 搜索推荐 API
【推荐100个unity插件之21】unity实现多语言切换功能——Localization插件的使用
【推荐100个unity插件之21】unity实现多语言切换功能——Localization插件的使用
936 0