Springboot----项目整合微信支付(利用RabbitMQ延迟队列处理用户退款)

简介: Springboot----项目整合微信支付(利用RabbitMQ延迟队列处理用户退款)

一:🧸问题引入

用户下单之后,很有可能因为某些原因需要申请退款,这时候我们就需要为用户提供退款接口。有了前面的处理流程,针对退款功能我们也能很容易实现,因为微信支付已经提供了一系列接口供我们调用,我们只需要专注自己的业务处理即可。

退款功能的实现其实和用户下单差不多,也需要设置相关参数然后发送给微信支付后台,参数设置中微信支付订单号和商户订单号需要二选一,我选取的是商户订单号,此外,还需要商户提供一个退款编号,这个编号需要自己在后台自动生成。

🚨注意事项:

  1. 交易时间超过一年的订单无法提交退款
  2. 微信支付退款支持单笔交易分多次退款(不超50次),多次退款需要提交原支付订单的商户订单号和设置不同的退款单号。
  3. 错误或无效请求频率限制:6qps,即每秒钟异常或错误的退款申请请求不超过6次
  4. 每个支付订单的部分退款次数不能超过50次
  5. 申请退款接口的返回仅代表业务的受理情况,具体退款是否成功,需要通过退款查询接口获取结果
  6. 一个月之前的订单申请退款频率限制为:5000/min
  7. 同一笔订单多次退款的请求需相隔1分钟

二:🧸处理流程

退款时序图:

2.1:用户申请退款

当用户点击退款申请之后,后台接到请求并在组装好参数之后向微信支付后台发送退款申请,这时候微信支付会返回一个JSON数据,里面包含微信支付退款单号、商户退款单号,微信支付订单号、商户订单号、退款状态等信息,见下图:

这时候的退款状态一般为退款中,这时候我们将数据库中订单状态更新为退款中,假如返回状态为退款成功则直接将订单状态改为退款成功并记录日志即可,当然,还有可能是退款异常的情况。

2.2:处理微信退款回调通知

和用户下单一样,用户申请退款后微信支付后台也会发送回调通知通知商户,这时候就需要我们对该回调进行解密并获取信息后返回正确的状态码给微信支付后台。其处理流程和前面提到的支付回调通知大同小异,需要注意的是回调通知地址必须要能被外网访问,所以我们就需要借助内网穿透工具,要是还设置了过滤器或者拦截器需要放行该请求,不然微信支付后台是通知不到商户的,而且一段时间内微信支付后台还会多次发送回调通知。

微信支付中是这样介绍的:同样的通知可能会多次发送给商户系统。商户系统必须能够正确处理重复的通知。 推荐的做法是,当商户系统收到通知进行处理时,先检查对应业务数据的状态,并判断该通知是否已经处理。如果未处理,则再进行处理;如果已处理,则直接返回结果成功。在对业务数据进行状态检查和处理之前,要采用数据锁进行并发控制,以避免函数重入造成的数据混乱,因此我们还需要注意接口的幂等性规则。

2.3:商户主动查询退款

由于某些原因商户可能会接收不到微信支付发给商户的回调通知,这时候就需要商户主动调用退款查询接口查询退款状态。但是退款一般存在延时,建议一分钟之后再进行查询。

当面对大量退款请求时候,假如采用定时器策略去定时查询数据库查看哪些退款订单还没处理成功这样不仅效率低,还会加重数据库的负担,因此我还是采用RabbitMQ的延迟队列进行处理。当用户申请退款之后,假如订单状态为退款中就将该订单加入延迟队列,存活时间为5分钟。5分钟之后先在数据库查看该订单状态,假如退款已处理完成则不做处理,假如订单仍为退款中则向微信支付后台主动发送查询请求,若返回结果也是处理中则将消息再次入队,5分钟后再次查询,假如订单已处理完成,则更新订单状态并记录相关日志。

三:🧸代码实现

3.1:controller层

/**
* 订单退款
* @param order
* @return
*/
@ApiOperation("退款申请")
@PostMapping("/refund")
public R<String> refund(@RequestBody Orders order) throws Exception {
    log.info("处理退款申请...");
    wxPayService.refund(order);
    return R.success("处理退款成功,订单正在退款中...");
}
/**
 * 获取退款通知
 * @param request
 * @param response
 * @return
 */
@ApiOperation("退款结果回调通知")
@PostMapping("/refunds/notify")
public String getRefundNotify(HttpServletRequest request, HttpServletResponse response){
    log.info("处理退款结果回调通知...");
    Gson gson = new Gson();
    //应答体
    Map<String,String> map = new HashMap<>();
    try {
        //处理通知参数
        String body = HttpUtils.readData(request);
        //回调通知的验签与解密
        String wechatPaySerial = request.getHeader(WECHAT_PAY_SERIAL);
        String apiV3Key = wxPayConfig.getApiV3Key();
        String nonce = request.getHeader(WECHAT_PAY_NONCE); // 请求头Wechatpay-Nonce
        String timestamp = request.getHeader(WECHAT_PAY_TIMESTAMP); // 请求头Wechatpay-Timestamp
        String signature = request.getHeader(WECHAT_PAY_SIGNATURE); // 请求头Wechatpay-Signature
        WechatPay2ValidatorForRequest wechatPay2ValidatorForRequest = new WechatPay2ValidatorForRequest(wechatPaySerial,apiV3Key,nonce, timestamp, signature, body,verifier);
        Notification notification = wechatPay2ValidatorForRequest.notificationHandler();
        log.info("notification===>{}",notification.toString());
        String eventType = notification.getEventType();
        if(eventType.length() == 0){
            log.error("退款结果回调通知验签失败");
            response.setStatus(500);
            map.put("code","ERROR");
            map.put("message","失败");
            return gson.toJson(map);
        }
        log.info("退款结果回调通知验签成功");
        //处理订单
        wxPayService.processRefund(notification);
        //应答响应码(200或者204表示成功)
        response.setStatus(200);
        map.put("code","SUCCESS");
        map.put("message","成功");
        return gson.toJson(map);
    } catch (Exception e) {
        e.printStackTrace();
        response.setStatus(500);
        map.put("code","ERROR");
        map.put("message","失败");
        return gson.toJson(map);
    }
}

3.2:service层

/**
 * 退款接口API
 * @param order
 */
 @Transactional(rollbackFor = Exception.class)
 @Override
 public void refund(Orders order) throws Exception {
     log.info("调用退款接口API");
     //调用统一退款API
     String url = wxPayConfig.getDomain().concat(WxApiType.DOMESTIC_REFUNDS.getType());
     HttpPost httpPost = new HttpPost(url);
     httpPost.addHeader("Accept","application/json");
     httpPost.addHeader("Content-type","application/json;charset=utf-8");
     ByteArrayOutputStream bos = new ByteArrayOutputStream();
     ObjectMapper objectMapper = new JacksonObjectMapper();
     ObjectNode rootNode = objectMapper.createObjectNode();
     String refundNumber = null;
     Orders order_temp = ordersService.getById(order.getId());
     if(order_temp.getRefundNumber() != null) {
         refundNumber = order_temp.getRefundNumber();
     } else {
         refundNumber = String.valueOf(IdWorker.getId());
         //记录订单退款编号及退款时间
         LambdaUpdateWrapper<Orders> luw = new LambdaUpdateWrapper<>();
         luw.eq(Orders::getNumber,order.getNumber());
         luw.set(Orders::getRefundNumber,refundNumber);
         luw.set(Orders::getRefundTime, LocalDateTime.now());
         ordersService.update(luw);
     }
     rootNode.put("out_trade_no",order.getNumber())
             .put("out_refund_no", refundNumber)
             .put("notify_url",wxPayConfig.getNotifyDomain().concat(WxNotifyType.REFUND_NOTIFY.getType()));
     rootNode.putObject("amount")
             .put("refund",order.getAmount())
             .put("total",order.getAmount())
             .put("currency","CNY");
     objectMapper.writeValue(bos,rootNode);
     httpPost.setEntity(new StringEntity(bos.toString(StandardCharsets.UTF_8),"UTF-8"));
     CloseableHttpResponse response = wxPayClient.execute(httpPost);
     String bodyAsString = EntityUtils.toString(response.getEntity());
     JSONObject body = JSON.parseObject(bodyAsString);
     String refundStatus = (String) body.get("status");
     if(WxRefundStatus.PROCESSING.getType().equals(refundStatus)) {
         //退款处理中
         log.info("退款处理中...");
         //更新订单状态
         ordersService.updateStatusByOrderNo(order.getNumber(),refundStatus);
         //更新日志信息
         paymentInfoService.updateStatus(order.getNumber(),refundStatus);
         //将退款编号存入退款延迟队列,延迟时间为5分钟
         log.info("退款编号:{}进入延迟队列...",refundNumber);
         delayProducer.publish(refundNumber, order.getNumber(),
                 DelayMessageConfig.DELAY_EXCHANGE_NAME,DelayMessageConfig.ROUTING_KEY_REFUND,1000*60*5);
     }
     if (WxRefundStatus.SUCCESS.getType().equals(refundStatus)){
         //退款处理成功
         log.info("退款处理成功");
         //更新订单状态
         ordersService.updateStatusByOrderNo(order.getNumber(),"REFUND_SUCCESS");
         //更新日志信息
         paymentInfoService.updateStatus(order.getNumber(),"REFUND_SUCCESS");
     }
     if (WxRefundStatus.ABNORMAL.getType().equals(refundStatus)){
         //退款处理异常
         log.warn("退款处理异常");
         //更新订单状态
         ordersService.updateStatusByOrderNo(order.getNumber(),refundStatus);
         //更新日志信息
         paymentInfoService.updateStatus(order.getNumber(),"REFUND_ABNORMAL");
     }
 }
 /**
  * 处理退款通知结果
  */
 @Transactional(rollbackFor = Exception.class)
 @Override
 public void processRefund(Notification notification) {
     String decryptData = notification.getDecryptData();
     JSONObject data = JSON.parseObject(decryptData);
     //获取订单号
     String orderNumber = (String) data.get("out_trade_no");
     //获取支付状态
     String refundState = (String) data.get("refund_status");
     /*在对业务数据进行状态检查和处理之前,
     要采用数据锁进行并发控制,
     以避免函数重入造成的数据混乱*/
     //尝试获取锁:
     // 成功获取则立即返回true,获取失败则立即返回false。不必一直等待锁的释放
     if(lock.tryLock()) {
         try {
             //处理重复通知
             //接口调用的幂等性:无论接口被调用多少次,产生的结果是一致的
             Integer status = ordersService.getOrderStatus(orderNumber);
             if(status == null || status == 7) {   //该订单已经退款成功
                 log.info("该退款已处理完成");
                 return;
             }
             if(WxRefundStatus.SUCCESS.getType().equals(refundState)) {
                 //更新订单状态
                 log.info("退款结果回调成功,退款结果为SUCCESS...");
                 ordersService.updateStatusByOrderNo(orderNumber,"REFUND_SUCCESS");
                 //记录日志
                 paymentInfoService.updateStatus(orderNumber,"REFUND_SUCCESS");
             }
             else {
                 //更新订单状态
                 log.info("退款结果回调成功,回调结果不为SUCCESS...");
                 ordersService.updateStatusByOrderNo(orderNumber,refundState);
                 //记录日志
                 paymentInfoService.updateStatus(orderNumber,refundState);
             }
         } finally {
             //主动释放锁
             lock.unlock();
         }
     }
 }
 @Override
 public String queryRefund(String refundNo) throws Exception {
     log.info("查询退款接口的调用==>{}", refundNo);
     //构建请求链接
     String url = String.format(WxApiType.DOMESTIC_REFUNDS_QUERY.getType(),refundNo);
     url = wxPayConfig.getDomain().concat(url);
     URIBuilder uriBuilder = new URIBuilder(url);
     HttpGet httpGet = new HttpGet(uriBuilder.build());
     httpGet.addHeader("Accept","application/json");
     CloseableHttpResponse response = wxPayClient.execute(httpGet);
     return EntityUtils.toString(response.getEntity());
 }
 /**
  * 查询订单退款状态
  * 如果超过5分钟未退款成功转入退款异常
  * 如果已经退款成功则更新订单状态
  * @param refundNo
  */
 @Transactional(rollbackFor = Exception.class)
 @Override
 public void checkRefundStatus(String refundNo) throws Exception {
     log.info("根据退款编号核实退款状态==>{}",refundNo);
     String result = this.queryRefund(refundNo);
     JSONObject resultJson = JSON.parseObject(result);
     String refundStatus = (String) resultJson.get("status");
     //获取订单编号
     LambdaUpdateWrapper<Orders> lqw = new LambdaUpdateWrapper<>();
     lqw.eq(Orders::getRefundNumber,refundNo);
     Orders order = ordersService.getOne(lqw);
     String orderNo = order.getNumber();
     //判断订单状态
     if(WxRefundStatus.SUCCESS.getType().equals(refundStatus)) {
         log.warn("核实到订单已退款成功,订单号==>{}",orderNo);
         //更新订单状态
         ordersService.updateStatusByOrderNo(orderNo,"REFUND_SUCCESS");
         //更新支付日志
         paymentInfoService.updateStatus(orderNo,"REFUND_SUCCESS");
     }
     else if(WxRefundStatus.PROCESSING.getType().equals(refundStatus)) {
         log.info("退款正在处理中,退款编号==>{}",refundNo);
         //将退款消息存入退款延迟队列,5分钟之后再次查询
         delayProducer.publish(order.getRefundNumber(),order.getRefundNumber(),
                 DelayMessageConfig.DELAY_EXCHANGE_NAME,DelayMessageConfig.ROUTING_KEY_REFUND,1000*60*5);
     }
     else{
         log.warn("核实到订单退款异常,订单号==>{}",orderNo);
         if(!WxRefundStatus.ABNORMAL.getType().equals(refundStatus)){
             log.info("更新订单信息及日志...");
             //更新订单状态
             ordersService.updateStatusByOrderNo(orderNo,WxRefundStatus.ABNORMAL.getType());
             //更新支付日志
             paymentInfoService.updateStatus(orderNo,WxRefundStatus.ABNORMAL.getType());
         }
     }
 }

3.3:RabbitMQ代码

/**
 * 监听退款延迟队列
 * @param refundNo
 */
 @RabbitListener(queues = {"plugin.delay.refund.queue"})
 public void refundDelayQueue(String refundNo, Message message, Channel channel) throws Exception {
     log.info("退款延迟队列开始消费...");
     try {
         //处理退款信息
         wxPayService.checkRefundStatus(refundNo);
         //告诉服务器收到这条消息 已经被我消费了 可以在队列删掉 这样以后就不会再发了 否则消息服务器以为这条消息没处理掉 后续还会在发
         channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
         log.info("消息接收成功");
     } catch (Exception e) {
         e.printStackTrace();
         //消息重新入队
         channel.basicNack(message.getMessageProperties().getDeliveryTag(), false,true);
         log.info("消息接收失败,重新入队");
     }
 }

说明:关于延迟队列的使用这里就不再赘述了,要是有什么疑问可以查看这篇文章,里面有详细介绍,包括原理以及配置信息等,当然也可以查看我之前关于订单处理的文章,思路都是差不多的,最大的不同就是退款处理多了个重新入队操作(查询到订单状态仍然为退款中时候),至此这一系列文章就完结了,后续项目还有优化我会继续发文,欢迎大家关注,有什么问题可以私信博主。

四:🧸友情链接

相关实践学习
快速体验阿里云云消息队列RocketMQ版
本实验将带您快速体验使用云消息队列RocketMQ版Serverless系列实例进行获取接入点、创建Topic、创建订阅组、收发消息、查看消息轨迹和仪表盘。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
7月前
|
物联网 Linux 开发者
快速部署自己私有MQTT-Broker-下载安装到运行不到一分钟,快速简单且易于集成到自己项目中
本文给物联网开发的朋友推荐的是GMQT,让物联网开发者快速拥有合适自己的MQTT-Broker,本文从下载程序到安装部署手把手教大家安装用上私有化MQTT服务器。
1835 5
|
编解码 NoSQL Java
使用Spring Boot + Redis 队列实现视频文件上传及FFmpeg转码的技术分享
【8月更文挑战第30天】在当前的互联网应用中,视频内容的处理与分发已成为不可或缺的一部分。对于视频平台而言,高效、稳定地处理用户上传的视频文件,并对其进行转码以适应不同设备的播放需求,是提升用户体验的关键。本文将围绕使用Spring Boot结合Redis队列技术来实现视频文件上传及FFmpeg转码的过程,分享一系列技术干货。
1052 4
|
10月前
|
监控 Java 关系型数据库
Spring Boot整合MySQL主从集群同步延迟解决方案
本文针对电商系统在Spring Boot+MyBatis架构下的典型问题(如大促时订单状态延迟、库存超卖误判及用户信息更新延迟)提出解决方案。核心内容包括动态数据源路由(强制读主库)、大事务拆分优化以及延迟感知补偿机制,配合MySQL参数调优和监控集成,有效将主从延迟控制在1秒内。实际测试表明,在10万QPS场景下,订单查询延迟显著降低,超卖误判率下降98%。
455 5
|
12月前
|
小程序 JavaScript Java
基于SpringBoot的智慧停车场微信小程序源码分享
智慧停车场微信小程序主要包含管理端和小程序端。管理端包括停车场管理,公告信息管理,用户信息管理,预定信息管理,用户反馈管理等功能。小程序端包括登录注册,预约停车位,停车导航,停车缴费,用户信息,车辆信息,钱包充值,意见反馈等功能。
576 5
基于SpringBoot的智慧停车场微信小程序源码分享
|
消息中间件 存储 监控
说说MQ在你项目中的应用(一)
本文总结了消息队列(MQ)在项目中的应用,主要围绕异步处理、系统解耦和流量削峰三大功能展开。通过分析短信通知和业务日志两个典型场景,介绍了MQ的实现方式及其优势。短信通知中,MQ用于异步发送短信并处理状态更新;业务日志中,Kafka作为高吞吐量的消息系统,负责收集和传输系统及用户行为日志,确保数据的可靠性和高效处理。MQ不仅提高了系统的灵活性和响应速度,还提供了重试机制和状态追踪等功能,保障了业务的稳定运行。
429 7
|
移动开发 安全 JavaScript
SpringBoot接入微信JSSDK,看这篇妥妥的
这篇教程详细介绍了如何在Spring Boot项目中接入微信JSSDK,实现H5页面的自定义分享和调用相册选取图片等功能。文章首先通过对比理想与现实的分享效果,引出了接入微信JSSDK的必要性。接着,作者提供了GitHub和Gitee上的项目源码链接,并逐步讲解了整个接入过程的关键步骤,包括配置文件、主要类和方法的实现细节,以及必要的微信公众号设置。此外,还特别强调了几个常见问题及其解决方案,如域名绑定、IP白名单设置和签名验证等。最后,通过实际测试验证了功能的正确性。适合初学者快速上手微信JSSDK接入。
431 8
SpringBoot接入微信JSSDK,看这篇妥妥的
|
消息中间件 存储 中间件
说说MQ在你项目中的应用(二)商品支付
本文总结了消息队列(MQ)在支付订单业务中的应用,重点分析了RabbitMQ的优势。通过异步处理、系统解耦和流量削峰等功能,RabbitMQ确保了支付流程的高效与稳定。具体场景包括用户下单、支付请求、商品生产和物流配送等环节。相比Kafka,RabbitMQ在低吞吐量、高实时性需求下表现更优,提供了更低延迟和更高的可靠性。
471 0
|
小程序 JavaScript Java
微信小程序+SpringBoot接入后台服务,接口数据来自后端
这篇文章介绍了如何将微信小程序与SpringBoot后端服务进行数据交互,包括后端接口的编写、小程序获取接口数据的方法,以及数据在小程序中的展示。同时,还涉及到了使用Vue搭建后台管理系统,方便数据的查看和管理。
微信小程序+SpringBoot接入后台服务,接口数据来自后端
|
消息中间件 存储 传感器
RabbitMQ 在物联网 (IoT) 项目中的应用案例
【8月更文第28天】随着物联网技术的发展,越来越多的设备被连接到互联网上以收集和传输数据。这些设备可以是传感器、执行器或其他类型的硬件。为了有效地管理这些设备并处理它们产生的大量数据,需要一个可靠的消息传递系统。RabbitMQ 是一个流行的开源消息中间件,它提供了一种灵活的方式来处理和转发消息,非常适合用于物联网环境。
843 1