定时关单

本文涉及的产品
容器服务 Serverless 版 ACK Serverless,317元额度 多规格
容器服务 Serverless 版 ACK Serverless,952元额度 多规格
简介: 定时关单

公众号merlinsea


  • 定时关单功能
  • 生产者相当于下单接口,当用户下单时,发送一个延迟消息到消息队列中
  • 一段时间后延迟消息被消费者监听到[延迟队列特点]
  • 消费者通过查询订单是否支付/调用第三方支付的查询订单状态接口来确认订单是否支付,然后决定修改订单状态以及消息是否需要重新入队


  • 模型抽取
  • 生产者相当于下单服务,下单的时候会发送一个订单号的消息给延迟队列
  • 消费者相当于定时任务,当延迟消息被消费者监听到的时候,就会检查这笔订单是否支付成功,如果没有支付,则进行关单。

640.png640.png


  • 完成配置,即配置好消息队列
  • 引入依赖
<!--引入AMQP-->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>


  • 配置rabbitmq的服务端信息以及交换机,队列,路由key信息
#消息队列
rabbitmq:
  host: 8.129.113.233
  port: 5672
  virtual-host: /
  password: password
  username: admin
  #开启手动确认消息
  listener:
    simple:
      acknowledge-mode: manual
 #自定义消息队列配置,发送锁定库存消息-》延迟exchange-》lock.queue-》死信exchange-》release.queue
mqconfig:
  #延迟队列,不能被监听消费
  order_close_delay_queue: order.close.delay.queue
  #延迟队列的消息过期后转发的队列
  order_close_queue: order.close.queue
  #交换机
  order_event_exchange: order.event.exchange
  #进入延迟队列的路由key
  order_close_delay_routing_key: order.close.delay.routing.key
  #消息过期,进入释放队列的key,进入死信队列的key
  order_close_routing_key: order.close.routing.key
  #消息过期时间,毫秒,测试改为15秒
  ttl: 15000


  • 编写配置类,读取rabbitmq的相关信息,交给spring容器管理
@Configuration
@Data
public class RabbitMQConfig {
    /**
     * 交换机
     */
    @Value("${mqconfig.order_event_exchange}")
    private String eventExchange;
    /**
     * 延迟队列
     */
    @Value("${mqconfig.order_close_delay_queue}")
    private String orderCloseDelayQueue;
    /**
     * 关单队列
     */
    @Value("${mqconfig.order_close_queue}")
    private String orderCloseQueue;
    /**
     * 进入延迟队列的路由key
     */
    @Value("${mqconfig.order_close_delay_routing_key}")
    private String orderCloseDelayRoutingKey;
    /**
     * 进入死信队列的路由key
     */
    @Value("${mqconfig.order_close_routing_key}")
    private String orderCloseRoutingKey;
    /**
     * 过期时间
     */
    @Value("${mqconfig.ttl}")
    private Integer ttl;
    /**
     * 消息转换器
     * @return
     */
    @Bean
    public MessageConverter messageConverter(){
        return new Jackson2JsonMessageConverter();
    }
    /**
     * 创建交换机 Topic类型,也可以用dirct路由
     * 一般一个微服务一个交换机
     * @return
     */
    @Bean
    public Exchange orderEventExchange(){
        return new TopicExchange(eventExchange,true,false);
    }
    /**
     * 延迟队列
     */
    @Bean
    public Queue orderCloseDelayQueue(){
        Map<String,Object> args = new HashMap<>(3);
        args.put("x-dead-letter-exchange",eventExchange);
        args.put("x-dead-letter-routing-key",orderCloseRoutingKey);
        args.put("x-message-ttl",ttl);
        return new Queue(orderCloseDelayQueue,true,false,false,args);
    }
    /**
     * 死信队列,普通队列,用于被监听
     */
    @Bean
    public Queue orderCloseQueue(){
        return new Queue(orderCloseQueue,true,false,false);
    }
    /**
     * 第一个队列,即延迟队列的绑定关系建立
     * @return
     */
    @Bean
    public Binding orderCloseDelayBinding(){
        return new Binding(orderCloseDelayQueue,Binding.DestinationType.QUEUE,eventExchange,orderCloseDelayRoutingKey,null);
    }
    /**
     * 死信队列绑定关系建立
     * @return
     */
    @Bean
    public Binding orderCloseBinding(){
        return new Binding(orderCloseQueue,Binding.DestinationType.QUEUE,eventExchange,orderCloseRoutingKey,null);
    }
}


  • 定时关单服务生产者端代码设计与实现
  • 注意点
  • 用户下单以后需要投递一个延迟消息msg到延迟队列中
  • 延迟消息中的内容需要包含订单流水号,供消费者去查询订单支付信息


640.jpg


  • 延迟消息格式 = 消息id + 订单流水号


@Data
public class OrderMessage {
    /**
     * 消息id
     */
    private Long messageId;
    /**
     * 订单号
     */
    private String outTradeNo;
}


  • 下单以后发送延迟消息的代码
//发送延迟消息,用于自动关单
OrderMessage orderMessage = new OrderMessage();
orderMessage.setOutTradeNo(orderOutTradeNo);
rabbitTemplate.convertAndSend(rabbitMQConfig.getEventExchange(),rabbitMQConfig.getOrderCloseDelayRoutingKey(),orderMessage);


  • 定时关单服务消费者端代码设计与实现
  • 查询订单数据库中订单是否存在
  • 不存在:则说明消息错误,发送消息ack确认
  • 存在: 查询订单数据库中订单是否支付
  • 若支付:则说明消息正常已经付款,发送ack确认
  • 若未支付: 调用第三方支付平台查询订单接口查询支付状态
  • 若支付:则修改订单数据库中的订单状态,并发送ack确认
  • 若未支付:则修改订单状态为取消,发送ack确认

640.jpg

  • 消费者端的逻辑


@Slf4j
@Component
@RabbitListener(queues = "${mqconfig.order_close_queue}")
public class ProductOrderMQListener {
    @Autowired
    private ProductOrderService productOrderService;
    /**
     *
     * 消费重复消息,幂等性保证
     * 并发情况下如何保证安全
     *
     * @param orderMessage
     * @param message
     * @param channel
     * @throws IOException
     */
    @RabbitHandler
    public void closeProductOrder(OrderMessage orderMessage, Message message, Channel channel) throws IOException {
        log.info("监听到消息:closeProductOrder:{}",orderMessage);
        long msgTag = message.getMessageProperties().getDeliveryTag();
        try{
            boolean flag = productOrderService.closeProductOrder(orderMessage);
            //为true说明订单正常处理以付款,发送ack确认
            if(flag){
                channel.basicAck(msgTag,false);
            }else {
                //为false说明订单异常,重新入队
                channel.basicReject(msgTag,true);
            }
        }catch (IOException e){
            log.error("定时关单失败:",orderMessage);
            channel.basicReject(msgTag,true);
        }
    }
}


  • 查询订单状态并修改订单数据库的业务逻辑
/**
 * 定时关单
 * @param orderMessage
 * @return
 */
@Override
public boolean closeProductOrder(OrderMessage orderMessage) {
    ProductOrderDO productOrderDO = productOrderMapper.selectOne(new QueryWrapper<ProductOrderDO>().eq("out_trade_no",orderMessage.getOutTradeNo()));
    if(productOrderDO == null){
        //订单不存在
        log.warn("直接确认消息,订单不存在:{}",orderMessage);
        return true;
    }
    if(productOrderDO.getState().equalsIgnoreCase(ProductOrderStateEnum.PAY.name())){
        //已经支付
        log.info("直接确认消息,订单已经支付:{}",orderMessage);
        return true;
    }
    //向第三方支付查询订单是否真的未支付(开发了AlipayStrategy以后才完善了这部分代码)
    PayInfoVO payInfoVO = new PayInfoVO();
    payInfoVO.setPayType(productOrderDO.getPayType());
    payInfoVO.setOutTradeNo(orderMessage.getOutTradeNo());
    String payResult = payFactory.queryPaySuccess(payInfoVO);
    //结果为空,则未支付成功,本地取消订单
    if(StringUtils.isBlank(payResult)){
        productOrderMapper.updateOrderPayState(productOrderDO.getOutTradeNo(),ProductOrderStateEnum.CANCEL.name(),ProductOrderStateEnum.NEW.name());
        log.info("结果为空,则未支付成功,本地取消订单:{}",orderMessage);
        return true;
    }else {
        //支付成功,主动的把订单状态改成UI就支付,造成该原因的情况可能是支付通道回调有问题
        log.warn("支付成功,主动的把订单状态改成已经支付,造成该原因的情况可能是支付通道回调有问题:{}",orderMessage);
        productOrderMapper.updateOrderPayState(productOrderDO.getOutTradeNo(),ProductOrderStateEnum.PAY.name(),ProductOrderStateEnum.NEW.name());
        return true;
    }
}


  • 思考:在两个不同的系统之间进行交互的时候,我们首先需要考虑的是这两个系统之间需要设计什么徉的交互协议,即数据传输格式。而设计这种数据传输的格式需要根据这些消息的作用是什么来进行设计!!!


算法训练营永久班上课开始啦,欢迎大家积极报名~

奔跑的小梁,公众号:梁霖编程工具库算法训练营春节价格通知,2023年2月12日
相关实践学习
通过Ingress进行灰度发布
本场景您将运行一个简单的应用,部署一个新的应用用于新的发布,并通过Ingress能力实现灰度发布。
容器应用与集群管理
欢迎来到《容器应用与集群管理》课程,本课程是“云原生容器Clouder认证“系列中的第二阶段。课程将向您介绍与容器集群相关的概念和技术,这些概念和技术可以帮助您了解阿里云容器服务ACK/ACK Serverless的使用。同时,本课程也会向您介绍可以采取的工具、方法和可操作步骤,以帮助您了解如何基于容器服务ACK Serverless构建和管理企业级应用。 学习完本课程后,您将能够: 掌握容器集群、容器编排的基本概念 掌握Kubernetes的基础概念及核心思想 掌握阿里云容器服务ACK/ACK Serverless概念及使用方法 基于容器服务ACK Serverless搭建和管理企业级网站应用
相关文章
|
6月前
|
弹性计算 安全 机器人
一键搞定定时自动化通知
您是否经常忘了需要每周要填报工作时长?您的团队是否需要每月定时盘点HC?您是否每月末都在工作群提醒大家更新OKR? 这些简单的定时任务是不是经常会忘记或者占用您的精力?如果你也有这些烦恼,是时候来试试这个应用与数据集成平台——阿里云计算巢AppFlow了,它能够像一个神经中枢,高效地串联起所有关键数据流,并且能够巧妙地运用现代化的通讯工具如钉钉群机器人,实现定时消息通知,让每一个重要信息都能准时送达,不再因为简单重复的定时工作而占用您的时间和精力~
221 0
|
消息中间件 NoSQL Java
Springboot 指定重发的次数和延迟时间,定时异步执行 重发任务
Springboot 指定重发的次数和延迟时间,定时异步执行 重发任务
893 0
Springboot 指定重发的次数和延迟时间,定时异步执行 重发任务
|
2月前
|
Python
定时提醒程序
【9月更文挑战第10天】
53 9
|
3月前
|
Kubernetes Unix API
在K8S中,如果解决周期性任务?
在K8S中,如果解决周期性任务?
|
资源调度 运维 Java
定时任务报警通知解决方案详解
随着微服务和云计算的兴起,定时任务技术也是发展迅速,不仅能做单机的定时任务,而且在分布式系统下应用也很广泛,成为了业务做兜底、数据处理的第一选择。
2548 3
定时任务报警通知解决方案详解
|
消息中间件 存储 算法
【视频】定时消息 | 学习笔记
快速学习【视频】定时消息
138 0
【视频】定时消息 | 学习笔记
|
调度 Python
定时功能
利用Py简单实现定时功能
|
网络协议
10w定时任务,如何高效触发超时
很多时候,业务有定时任务或者定时超时的需求,当任务量很大时,可能需要维护大量的timer,或者进行低效的扫描。
809 0
|
监控
【新功能发布】定时暂停应用分组报警规则的通知发送
云监控的应用分组,旨在帮助用户按照业务维护管理监控报警。这次推出的报警规则定时暂停功能,可以在您的业务执行变更、升级,造成指标符合预期的波动时,定时关闭报警,不发送报警通知。
1487 0