淘东电商项目(66) -聚合支付(基于RabbitMQ解决分布式事务-积分场景)

简介: 淘东电商项目(66) -聚合支付(基于RabbitMQ解决分布式事务-积分场景)

引言

本文代码已提交至Github(版本号:52553aa6fe8b34ff162a1fb33e8f58494b4d2c3f),有兴趣的同学可以下载来看看:https://github.com/ylw-github/taodong-shop

阅读本文前,有兴趣的同学可以参考我之前写的聚合支付的文章:

本文讲解聚合支付最后的一个问题 - 分布式事务。举个例子,比如要增加一个“积分功能”,当第三方服务器异步返回支付成功结果,请求我们的支付服务器时,同时也要做积分增加的功能,如何能保证,支付结果插入数据库成功的同时保证积分一定能增加成功呢?这里涉及到了分布式事务的问题,本文主要基于Rabbit来解决这个问题。

本文目录结构:

l____引言

l____ 1.原理图

l____ 2.积分数据库建表

l____ 3.核心代码

l________ 3.1 集成RabbitMQ

l________ 3.2 生产者代码

l________ 3.3 消费者代码

l____ 4.测试

1.原理图

如上图,如果支付成功,第三方支付服务器会请求项目的支付服务,返回支付结果,这个时候,我们代码要处理的是如下步骤:

  1. 更新订单状态为“已支付”,即status为1(注意,这里的方法使用了@Transactional事务注解修饰)
  2. 更新了支付状态之后,会使用MQ来生产消息,生产增加积分消息MSG
  3. 如果这个时候程序出错,会回滚,也就是订单的状态在数据库中没有修改,而已经增加了积分。

针对以上的问题,做出了如下的解决方案:

  • 对于第2个步骤,使用RabbitMQ的消息确认机制,保证消息一定可以投递到RabbitMQ服务器的增加积分队列,消费者使用手动签收的方式,保证消息一定可以消费到,并把增加积分消息更新到数据库的积分表中。
  • 对于第3个步骤,如果程序出错了,会回滚,因此数据库部分的代码不生效,订单的支付状态没变,所以增加多了一个支付状态补偿队列,当支付状态补偿消费者接收到消息后,会检查支付状态是否已经修改,如果没有修改,则更新订单的状态。

从上面的解决步骤,可以知道,使用RabbitMQ保证了积分一定可以更新本地数据库,同时订单状态一定可以修改,达到最终一致性的效果,同时解决了分布式事务的问题。

2.积分数据库建表

讲解前,先贴上积分数据库的建表语句:

CREATE TABLE `integral` (
  `ID` int(11) NOT NULL AUTO_INCREMENT COMMENT '主键ID',
  `USER_ID` int(11) DEFAULT NULL COMMENT '用户ID',
  `PAYMENT_ID` varchar(1024) DEFAULT NULL COMMENT '支付ID',
  `INTEGRAL` varchar(32) DEFAULT NULL COMMENT '积分',
  `AVAILABILITY` int(11) DEFAULT NULL COMMENT '是否可用',
  `REVISION` int(11) DEFAULT NULL COMMENT '乐观锁',
  `CREATED_BY` varchar(32) DEFAULT NULL COMMENT '创建人',
  `CREATED_TIME` datetime DEFAULT NULL COMMENT '创建时间',
  `UPDATED_BY` varchar(32) DEFAULT NULL COMMENT '更新人',
  `UPDATED_TIME` datetime DEFAULT NULL COMMENT '更新时间',
  PRIMARY KEY (`ID`)
) ENGINE=InnoDB AUTO_INCREMENT=47 DEFAULT CHARSET=utf8 COMMENT=' ';

3.核心代码

3.1 集成RabbitMQ

RabbitMQ的搭建本文不再详述,之前有讲解过,有兴趣的童鞋可以参阅之前写过的文章: 《消息中间件系列教程(04) -RabbitMQ -简介&安装》,下面开始讲解项目集成。

①添加maven依赖:

<!-- 添加springboot对amqp的支持 -->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

②applicatoin.yml配置:

spring:
  rabbitmq:
    ####连接地址
    host: 127.0.0.1
    ####端口号
    port: 5672
    ####账号
    username: guest
    ####密码
    password: guest
    ### 地址
    virtual-host: integral_host
    ###开启消息确认机制 confirms
    publisher-confirms: true
    publisher-returns: true

③RabbitMQ配置文件:

@Component
public class RabbitmqConfig {
    // 添加积分队列
    public static final String INTEGRAL_DIC_QUEUE = "integral_queue";
    // 补单队列,
    public static final String INTEGRAL_CREATE_QUEUE = "integral_create_queue";
    // 积分交换机
    private static final String INTEGRAL_EXCHANGE_NAME = "integral_exchange_name";
    // 1.定义订单队列
    @Bean
    public Queue directIntegralDicQueue() {
        return new Queue(INTEGRAL_DIC_QUEUE);
    }
    // 2.定义补订单队列
    @Bean
    public Queue directCreateintegralQueue() {
        return new Queue(INTEGRAL_CREATE_QUEUE);
    }
    // 2.定义交换机
    @Bean
    DirectExchange directintegralExchange() {
        return new DirectExchange(INTEGRAL_EXCHANGE_NAME);
    }
    // 3.积分队列与交换机绑定
    @Bean
    Binding bindingExchangeintegralDicQueue() {
        return BindingBuilder.bind(directIntegralDicQueue()).to(directintegralExchange()).with("integralRoutingKey");
    }
    // 3.补单队列与交换机绑定
    @Bean
    Binding bindingExchangeCreateintegral() {
        return BindingBuilder.bind(directCreateintegralQueue()).to(directintegralExchange()).with("integralRoutingKey");
    }
}

③在RabbitMQ控制台增加virtual-host:

④分配guest对新增的virtual-host有用户权限:

3.2 生产者代码

①生产者代码(注意里面用了消息确认机制,且使用订单的id作为全局唯一id来解决幂等性的问题):

/**
 * description: 生产者投递积分
 * create by: YangLinWei
 * create time: 2020/5/19 11:37 上午
 */
@Component
@Slf4j
public class IntegralProducer implements RabbitTemplate.ConfirmCallback {
  @Autowired
  private RabbitTemplate rabbitTemplate;
  @Transactional
  public void send(JSONObject jsonObject) {
    String jsonString = jsonObject.toJSONString();
    System.out.println("jsonString:" + jsonString);
    String paymentId = jsonObject.getString("paymentId");
    // 封装消息
    Message message = MessageBuilder.withBody(jsonString.getBytes())
        .setContentType(MessageProperties.CONTENT_TYPE_JSON).setContentEncoding("utf-8").setMessageId(paymentId)
        .build();
    // 构建回调返回的数据(消息id)
    this.rabbitTemplate.setMandatory(true);
    this.rabbitTemplate.setConfirmCallback(this);
    CorrelationData correlationData = new CorrelationData(jsonString);
    rabbitTemplate.convertAndSend("integral_exchange_name", "integralRoutingKey", message, correlationData);
  }
  // 生产消息确认机制 生产者往服务器端发送消息的时候,采用应答机制
  @Override
  public void confirm(CorrelationData correlationData, boolean ack, String cause) {
    String jsonString = correlationData.getId();
    System.out.println("消息id:" + correlationData.getId());
    if (ack) {
      log.info(">>>使用MQ消息确认机制确保消息一定要投递到MQ中成功");
      return;
    }
    JSONObject jsonObject = JSONObject.parseObject(jsonString);
    // 生产者消息投递失败的话,采用递归重试机制
    send(jsonObject);
    log.info(">>>使用MQ消息确认机制投递到MQ中失败");
  }
}

②调用生产者处的代码,在支付结果异步回调处处理(银联支付结果异步回调处处理UnionPayCallbackTemplate,注意发送MQ使用了@Async注解,不阻塞当前线程)注意下面模拟抛异常了:

@Override
public String asyncService(Map<String, String> verifySignature) {
  String orderId = verifySignature.get("orderId"); // 获取后台通知的数据,其他字段也可用类似方式获取
  String respCode = verifySignature.get("respCode");
  // 判断respCode=00、A6后,对涉及资金类的交易,请再发起查询接口查询,确定交易成功后更新数据库。
  System.out.println("orderId:" + orderId + ",respCode:" + respCode);
  // 1.判断respCode是否为已经支付成功断respCode=00、A6后,
  if (!(respCode.equals("00") || respCode.equals("A6"))) {
    return failResult();
  }
  // 根据日志 手动补偿 使用支付id调用第三方支付接口查询
  PaymentTransactionEntity paymentTransaction = paymentTransactionMapper.selectByPaymentId(orderId);
  if (paymentTransaction.getPaymentStatus().equals(PayConstant.PAY_STATUS_SUCCESS)) {
    // 网络重试中,之前已经支付过
    return successResult();
  }
  // 2.将状态改为已经支付成功
  paymentTransactionMapper.updatePaymentStatus(PayConstant.PAY_STATUS_SUCCESS + "", orderId+"","yinlian_pay");
  // 3.调用积分服务接口增加积分(处理幂等性问题) MQ
  addMQIntegral(paymentTransaction); // 使用MQ
  int i = 1 / 0; // 支付状态还是为待支付状态但是 积分缺增加
  return successResult();
}
/**
 * 基于MQ增加积分
 */
@Async
public void addMQIntegral(PaymentTransactionEntity paymentTransaction) {
  JSONObject jsonObject = new JSONObject();
  jsonObject.put("paymentId", paymentTransaction.getPaymentId());
  jsonObject.put("userId", paymentTransaction.getUserId());
  jsonObject.put("integral", 100);
  integralProducer.send(jsonObject);
}

3.3 消费者代码

①首先看看支付状态补偿消费者代码(注意这里使用了手动签收):

/**
 * description: 支付回调检查状态,是否为已经支付完成
 * create by: YangLinWei
 * create time: 2020/5/19 1:52 下午
 */
@Component
@Slf4j
public class PayCheckStateConsumer {
    @Autowired
    private PaymentTransactionMapper paymentTransactionMapper;
    // 死信队列(备胎) 消息被拒绝、队列长度满了 定时任务 人工补偿
    @RabbitListener(queues = "integral_create_queue")
    public void process(Message message, @Headers Map<String, Object> headers, Channel channel) throws IOException {
        try {
            String messageId = message.getMessageProperties().getMessageId();
            String msg = new String(message.getBody(), "UTF-8");
            log.info(">>>messageId:{},msg:{}", messageId, msg);
            JSONObject jsonObject = JSONObject.parseObject(msg);
            String paymentId = jsonObject.getString("paymentId");
            if (StringUtils.isEmpty(paymentId)) {
                log.error(">>>>支付id不能为空 paymentId:{}", paymentId);
                basicNack(message, channel);
                return;
            }
            // 1.使用paymentId查询之前是否已经支付过
            PaymentTransactionEntity paymentTransactionEntity = paymentTransactionMapper.selectByPaymentId(paymentId);
            if (paymentTransactionEntity == null) {
                log.error(">>>>支付id paymentId:{} 未查询到", paymentId);
                basicNack(message, channel);
                return;
            }
            Integer paymentStatus = paymentTransactionEntity.getPaymentStatus();
            if (paymentStatus.equals(PayConstant.PAY_STATUS_SUCCESS)) {
                log.error(">>>>支付id paymentId:{} ", paymentId);
                basicNack(message, channel);
                return;
            }
            // 安全期间 主动调用第三方接口查询
            String paymentChannel = jsonObject.getString("paymentChannel");
            int updatePaymentStatus = paymentTransactionMapper.updatePaymentStatus(PayConstant.PAY_STATUS_SUCCESS + "",
                    paymentId, paymentChannel);
            if (updatePaymentStatus > 0) {
                basicNack(message, channel);
                return;
            }
            // 继续重试
        } catch (Exception e) {
            e.printStackTrace();
            basicNack(message, channel);
        }
    }
    private void basicNack(Message message, Channel channel) throws IOException {
        channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);
    }
}

②增加积分消费者代码(注意这里使用了手动签收)::

/**
 * description: 积分服务消费者
 * create by: YangLinWei
 * create time: 2020/5/19 2:10 下午
 */
@Component
@Slf4j
public class IntegralConsumer {
  @Autowired
  private IntegralMapper integralMapper;
  @RabbitListener(queues = "integral_queue")
  public void process(Message message, @Headers Map<String, Object> headers, Channel channel) throws IOException {
    try {
      String messageId = message.getMessageProperties().getMessageId();
      String msg = new String(message.getBody(), "UTF-8");
      log.info(">>>messageId:{},msg:{}", messageId, msg);
      JSONObject jsonObject = JSONObject.parseObject(msg);
      String paymentId = jsonObject.getString("paymentId");
      if (StringUtils.isEmpty(paymentId)) {
        log.error(">>>>支付id不能为空 paymentId:{}", paymentId);
        basicNack(message, channel);
        return;
      }
      // 使用paymentId查询是否已经增加过积分 网络重试间隔
      IntegralEntity resultIntegralEntity = integralMapper.findIntegral(paymentId);
      if (resultIntegralEntity != null) {
        log.error(">>>>paymentId:{}已经增加过积分", paymentId);
        // 已经增加过积分,通知MQ不要在继续重试。
        basicNack(message, channel);
        return;
      }
      Integer userId = jsonObject.getInteger("userId");
      if (userId == null) {
        log.error(">>>>paymentId:{},对应的用户userId参数为空", paymentId);
        basicNack(message, channel);
        return;
      }
      Long integral = jsonObject.getLong("integral");
      if (integral == null) {
        log.error(">>>>paymentId:{},对应的用户integral参数为空", integral);
        return;
      }
      IntegralEntity integralEntity = new IntegralEntity();
      integralEntity.setPaymentId(paymentId);
      integralEntity.setIntegral(integral);
      integralEntity.setUserId(userId);
      integralEntity.setAvailability(1);
      // 插入到数据库中
      int insertIntegral = integralMapper.insertIntegral(integralEntity);
      if (insertIntegral > 0) {
        // 手动签收消息,通知mq服务器端删除该消息
        basicNack(message, channel);
      }
      // 采用重试机制
    } catch (Exception e) {
      log.error(">>>>ERROR MSG:", e.getMessage());
      basicNack(message, channel);
    }
  }
  // 消费者获取到消息之后 手动签收 通知MQ删除该消息
  private void basicNack(Message message, Channel channel) throws IOException {
    channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);
  }
}

4.测试

依次启动Eureka注册中心、xxlsso单点登录系统、member会员服务、pay支付服务、pay-web支付门户服务、还有integral积分服务,启动后如下图:

启动RabbitMQ服务(我的是Mac系统,已经启动的可以忽略):

cd /usr/local/Cellar/rabbitmq/3.8.2/sbin
./rabbitmq-server -detached

①模拟新增订单,浏览器输入:http://localhost:8600/cratePayToken?payAmount=999&orderId=20200513141452&userId=27&productName=玉米香肠

②确认提交订单,浏览器输入:http://localhost:8079/pay?payToken=pay_88c6262f3a494ae98d0873283514abf5

可以看到当前数据库,订单状态为未支付:

③按照提示,使用银联支付,一步一步直至支付完成:

可以看到,订单支付状态为已支付(也就是说订单支付状态补偿消费者已经接收到消息,并处理订单为已支付):

而且积分表也增加了一条数据(也是是说增加积分消费者已收到消息,并增加了一条积分数据):

本文完!

相关实践学习
快速体验阿里云云消息队列RocketMQ版
本实验将带您快速体验使用云消息队列RocketMQ版Serverless系列实例进行获取接入点、创建Topic、创建订阅组、收发消息、查看消息轨迹和仪表盘。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
目录
相关文章
|
3月前
|
物联网 Linux 开发者
快速部署自己私有MQTT-Broker-下载安装到运行不到一分钟,快速简单且易于集成到自己项目中
本文给物联网开发的朋友推荐的是GMQT,让物联网开发者快速拥有合适自己的MQTT-Broker,本文从下载程序到安装部署手把手教大家安装用上私有化MQTT服务器。
929 5
|
5月前
|
NoSQL 算法 安全
redis分布式锁在高并发场景下的方案设计与性能提升
本文探讨了Redis分布式锁在主从架构下失效的问题及其解决方案。首先通过CAP理论分析,Redis遵循AP原则,导致锁可能失效。针对此问题,提出两种解决方案:Zookeeper分布式锁(追求CP一致性)和Redlock算法(基于多个Redis实例提升可靠性)。文章还讨论了可能遇到的“坑”,如加从节点引发超卖问题、建议Redis节点数为奇数以及持久化策略对锁的影响。最后,从性能优化角度出发,介绍了减少锁粒度和分段锁的策略,并结合实际场景(如下单重复提交、支付与取消订单冲突)展示了分布式锁的应用方法。
379 3
|
5月前
|
存储 NoSQL Java
从扣减库存场景来讲讲redis分布式锁中的那些“坑”
本文从一个简单的库存扣减场景出发,深入分析了高并发下的超卖问题,并逐步优化解决方案。首先通过本地锁解决单机并发问题,但集群环境下失效;接着引入Redis分布式锁,利用SETNX命令实现加锁,但仍存在死锁、锁过期等隐患。文章详细探讨了通过设置唯一标识、续命机制等方法完善锁的可靠性,并最终引出Redisson工具,其内置的锁续命和原子性操作极大简化了分布式锁的实现。最后,作者剖析了Redisson源码,揭示其实现原理,并预告后续关于主从架构下分布式锁的应用与性能优化内容。
256 0
|
9月前
|
消息中间件 存储 Java
招行面试:10Wqps场景,RocketMQ 顺序消费 的性能 如何提升 ?
45岁资深架构师尼恩在其读者群中分享了关于如何提升RocketMQ顺序消费性能的高并发面试题解析。面对10W QPS的高并发场景,尼恩详细讲解了RocketMQ的调优策略,包括专用方案如增加ConsumeQueue数量、优化Topic设计等,以及通用方案如硬件配置(CPU、内存、磁盘、网络)、操作系统调优、Broker配置调整、客户端配置优化、JVM调优和监控与日志分析等方面。通过系统化的梳理,帮助读者在面试中充分展示技术实力,获得面试官的认可。相关真题及答案将收录于《尼恩Java面试宝典PDF》V175版本中,助力求职者提高架构、设计和开发水平。
招行面试:10Wqps场景,RocketMQ 顺序消费 的性能 如何提升 ?
|
7月前
|
消息中间件 存储 前端开发
MQ有什么应用场景
MQ有什么应用场景
|
10月前
|
NoSQL Java Redis
秒杀抢购场景下实战JVM级别锁与分布式锁
在电商系统中,秒杀抢购活动是一种常见的营销手段。它通过设定极低的价格和有限的商品数量,吸引大量用户在特定时间点抢购,从而迅速增加销量、提升品牌曝光度和用户活跃度。然而,这种活动也对系统的性能和稳定性提出了极高的要求。特别是在秒杀开始的瞬间,系统需要处理海量的并发请求,同时确保数据的准确性和一致性。 为了解决这些问题,系统开发者们引入了锁机制。锁机制是一种用于控制对共享资源的并发访问的技术,它能够确保在同一时间只有一个进程或线程能够操作某个资源,从而避免数据不一致或冲突。在秒杀抢购场景下,锁机制显得尤为重要,它能够保证商品库存的扣减操作是原子性的,避免出现超卖或数据不一致的情况。
280 10
|
10月前
|
消息中间件 存储 监控
说说MQ在你项目中的应用(一)
本文总结了消息队列(MQ)在项目中的应用,主要围绕异步处理、系统解耦和流量削峰三大功能展开。通过分析短信通知和业务日志两个典型场景,介绍了MQ的实现方式及其优势。短信通知中,MQ用于异步发送短信并处理状态更新;业务日志中,Kafka作为高吞吐量的消息系统,负责收集和传输系统及用户行为日志,确保数据的可靠性和高效处理。MQ不仅提高了系统的灵活性和响应速度,还提供了重试机制和状态追踪等功能,保障了业务的稳定运行。
293 7
|
12月前
|
消息中间件 网络协议 C#
C#使用Socket实现分布式事件总线,不依赖第三方MQ
`CodeWF.EventBus.Socket` 是一个轻量级的、基于Socket的分布式事件总线系统,旨在简化分布式架构中的事件通信。它允许进程之间通过发布/订阅模式进行通信,无需依赖外部消息队列服务。
C#使用Socket实现分布式事件总线,不依赖第三方MQ
|
11月前
|
NoSQL Java 数据处理
基于Redis海量数据场景分布式ID架构实践
【11月更文挑战第30天】在现代分布式系统中,生成全局唯一的ID是一个常见且重要的需求。在微服务架构中,各个服务可能需要生成唯一标识符,如用户ID、订单ID等。传统的自增ID已经无法满足在集群环境下保持唯一性的要求,而分布式ID解决方案能够确保即使在多个实例间也能生成全局唯一的标识符。本文将深入探讨如何利用Redis实现分布式ID生成,并通过Java语言展示多个示例,同时分析每个实践方案的优缺点。
361 8
|
10月前
|
调度 数据库
什么场景下要使用分布式锁
分布式锁用于确保多节点环境下的资源互斥访问、避免重复操作、控制并发流量、防止竞态条件及任务调度协调,常见于防止超卖等问题。
277 4

热门文章

最新文章

下一篇
日志分析软件