淘东电商项目(66) -聚合支付(基于RabbitMQ解决分布式事务-积分场景)

简介: 淘东电商项目(66) -聚合支付(基于RabbitMQ解决分布式事务-积分场景)

引言

本文代码已提交至Github(版本号:52553aa6fe8b34ff162a1fb33e8f58494b4d2c3f),有兴趣的同学可以下载来看看:https://github.com/ylw-github/taodong-shop

阅读本文前,有兴趣的同学可以参考我之前写的聚合支付的文章:

本文讲解聚合支付最后的一个问题 - 分布式事务。举个例子,比如要增加一个“积分功能”,当第三方服务器异步返回支付成功结果,请求我们的支付服务器时,同时也要做积分增加的功能,如何能保证,支付结果插入数据库成功的同时保证积分一定能增加成功呢?这里涉及到了分布式事务的问题,本文主要基于Rabbit来解决这个问题。

本文目录结构:

l____引言

l____ 1.原理图

l____ 2.积分数据库建表

l____ 3.核心代码

l________ 3.1 集成RabbitMQ

l________ 3.2 生产者代码

l________ 3.3 消费者代码

l____ 4.测试

1.原理图

如上图,如果支付成功,第三方支付服务器会请求项目的支付服务,返回支付结果,这个时候,我们代码要处理的是如下步骤:

  1. 更新订单状态为“已支付”,即status为1(注意,这里的方法使用了@Transactional事务注解修饰)
  2. 更新了支付状态之后,会使用MQ来生产消息,生产增加积分消息MSG
  3. 如果这个时候程序出错,会回滚,也就是订单的状态在数据库中没有修改,而已经增加了积分。

针对以上的问题,做出了如下的解决方案:

  • 对于第2个步骤,使用RabbitMQ的消息确认机制,保证消息一定可以投递到RabbitMQ服务器的增加积分队列,消费者使用手动签收的方式,保证消息一定可以消费到,并把增加积分消息更新到数据库的积分表中。
  • 对于第3个步骤,如果程序出错了,会回滚,因此数据库部分的代码不生效,订单的支付状态没变,所以增加多了一个支付状态补偿队列,当支付状态补偿消费者接收到消息后,会检查支付状态是否已经修改,如果没有修改,则更新订单的状态。

从上面的解决步骤,可以知道,使用RabbitMQ保证了积分一定可以更新本地数据库,同时订单状态一定可以修改,达到最终一致性的效果,同时解决了分布式事务的问题。

2.积分数据库建表

讲解前,先贴上积分数据库的建表语句:

CREATE TABLE `integral` (
  `ID` int(11) NOT NULL AUTO_INCREMENT COMMENT '主键ID',
  `USER_ID` int(11) DEFAULT NULL COMMENT '用户ID',
  `PAYMENT_ID` varchar(1024) DEFAULT NULL COMMENT '支付ID',
  `INTEGRAL` varchar(32) DEFAULT NULL COMMENT '积分',
  `AVAILABILITY` int(11) DEFAULT NULL COMMENT '是否可用',
  `REVISION` int(11) DEFAULT NULL COMMENT '乐观锁',
  `CREATED_BY` varchar(32) DEFAULT NULL COMMENT '创建人',
  `CREATED_TIME` datetime DEFAULT NULL COMMENT '创建时间',
  `UPDATED_BY` varchar(32) DEFAULT NULL COMMENT '更新人',
  `UPDATED_TIME` datetime DEFAULT NULL COMMENT '更新时间',
  PRIMARY KEY (`ID`)
) ENGINE=InnoDB AUTO_INCREMENT=47 DEFAULT CHARSET=utf8 COMMENT=' ';

3.核心代码

3.1 集成RabbitMQ

RabbitMQ的搭建本文不再详述,之前有讲解过,有兴趣的童鞋可以参阅之前写过的文章: 《消息中间件系列教程(04) -RabbitMQ -简介&安装》,下面开始讲解项目集成。

①添加maven依赖:

<!-- 添加springboot对amqp的支持 -->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

②applicatoin.yml配置:

spring:
  rabbitmq:
    ####连接地址
    host: 127.0.0.1
    ####端口号
    port: 5672
    ####账号
    username: guest
    ####密码
    password: guest
    ### 地址
    virtual-host: integral_host
    ###开启消息确认机制 confirms
    publisher-confirms: true
    publisher-returns: true

③RabbitMQ配置文件:

@Component
public class RabbitmqConfig {
    // 添加积分队列
    public static final String INTEGRAL_DIC_QUEUE = "integral_queue";
    // 补单队列,
    public static final String INTEGRAL_CREATE_QUEUE = "integral_create_queue";
    // 积分交换机
    private static final String INTEGRAL_EXCHANGE_NAME = "integral_exchange_name";
    // 1.定义订单队列
    @Bean
    public Queue directIntegralDicQueue() {
        return new Queue(INTEGRAL_DIC_QUEUE);
    }
    // 2.定义补订单队列
    @Bean
    public Queue directCreateintegralQueue() {
        return new Queue(INTEGRAL_CREATE_QUEUE);
    }
    // 2.定义交换机
    @Bean
    DirectExchange directintegralExchange() {
        return new DirectExchange(INTEGRAL_EXCHANGE_NAME);
    }
    // 3.积分队列与交换机绑定
    @Bean
    Binding bindingExchangeintegralDicQueue() {
        return BindingBuilder.bind(directIntegralDicQueue()).to(directintegralExchange()).with("integralRoutingKey");
    }
    // 3.补单队列与交换机绑定
    @Bean
    Binding bindingExchangeCreateintegral() {
        return BindingBuilder.bind(directCreateintegralQueue()).to(directintegralExchange()).with("integralRoutingKey");
    }
}

③在RabbitMQ控制台增加virtual-host:

④分配guest对新增的virtual-host有用户权限:

3.2 生产者代码

①生产者代码(注意里面用了消息确认机制,且使用订单的id作为全局唯一id来解决幂等性的问题):

/**
 * description: 生产者投递积分
 * create by: YangLinWei
 * create time: 2020/5/19 11:37 上午
 */
@Component
@Slf4j
public class IntegralProducer implements RabbitTemplate.ConfirmCallback {
  @Autowired
  private RabbitTemplate rabbitTemplate;
  @Transactional
  public void send(JSONObject jsonObject) {
    String jsonString = jsonObject.toJSONString();
    System.out.println("jsonString:" + jsonString);
    String paymentId = jsonObject.getString("paymentId");
    // 封装消息
    Message message = MessageBuilder.withBody(jsonString.getBytes())
        .setContentType(MessageProperties.CONTENT_TYPE_JSON).setContentEncoding("utf-8").setMessageId(paymentId)
        .build();
    // 构建回调返回的数据(消息id)
    this.rabbitTemplate.setMandatory(true);
    this.rabbitTemplate.setConfirmCallback(this);
    CorrelationData correlationData = new CorrelationData(jsonString);
    rabbitTemplate.convertAndSend("integral_exchange_name", "integralRoutingKey", message, correlationData);
  }
  // 生产消息确认机制 生产者往服务器端发送消息的时候,采用应答机制
  @Override
  public void confirm(CorrelationData correlationData, boolean ack, String cause) {
    String jsonString = correlationData.getId();
    System.out.println("消息id:" + correlationData.getId());
    if (ack) {
      log.info(">>>使用MQ消息确认机制确保消息一定要投递到MQ中成功");
      return;
    }
    JSONObject jsonObject = JSONObject.parseObject(jsonString);
    // 生产者消息投递失败的话,采用递归重试机制
    send(jsonObject);
    log.info(">>>使用MQ消息确认机制投递到MQ中失败");
  }
}

②调用生产者处的代码,在支付结果异步回调处处理(银联支付结果异步回调处处理UnionPayCallbackTemplate,注意发送MQ使用了@Async注解,不阻塞当前线程)注意下面模拟抛异常了:

@Override
public String asyncService(Map<String, String> verifySignature) {
  String orderId = verifySignature.get("orderId"); // 获取后台通知的数据,其他字段也可用类似方式获取
  String respCode = verifySignature.get("respCode");
  // 判断respCode=00、A6后,对涉及资金类的交易,请再发起查询接口查询,确定交易成功后更新数据库。
  System.out.println("orderId:" + orderId + ",respCode:" + respCode);
  // 1.判断respCode是否为已经支付成功断respCode=00、A6后,
  if (!(respCode.equals("00") || respCode.equals("A6"))) {
    return failResult();
  }
  // 根据日志 手动补偿 使用支付id调用第三方支付接口查询
  PaymentTransactionEntity paymentTransaction = paymentTransactionMapper.selectByPaymentId(orderId);
  if (paymentTransaction.getPaymentStatus().equals(PayConstant.PAY_STATUS_SUCCESS)) {
    // 网络重试中,之前已经支付过
    return successResult();
  }
  // 2.将状态改为已经支付成功
  paymentTransactionMapper.updatePaymentStatus(PayConstant.PAY_STATUS_SUCCESS + "", orderId+"","yinlian_pay");
  // 3.调用积分服务接口增加积分(处理幂等性问题) MQ
  addMQIntegral(paymentTransaction); // 使用MQ
  int i = 1 / 0; // 支付状态还是为待支付状态但是 积分缺增加
  return successResult();
}
/**
 * 基于MQ增加积分
 */
@Async
public void addMQIntegral(PaymentTransactionEntity paymentTransaction) {
  JSONObject jsonObject = new JSONObject();
  jsonObject.put("paymentId", paymentTransaction.getPaymentId());
  jsonObject.put("userId", paymentTransaction.getUserId());
  jsonObject.put("integral", 100);
  integralProducer.send(jsonObject);
}

3.3 消费者代码

①首先看看支付状态补偿消费者代码(注意这里使用了手动签收):

/**
 * description: 支付回调检查状态,是否为已经支付完成
 * create by: YangLinWei
 * create time: 2020/5/19 1:52 下午
 */
@Component
@Slf4j
public class PayCheckStateConsumer {
    @Autowired
    private PaymentTransactionMapper paymentTransactionMapper;
    // 死信队列(备胎) 消息被拒绝、队列长度满了 定时任务 人工补偿
    @RabbitListener(queues = "integral_create_queue")
    public void process(Message message, @Headers Map<String, Object> headers, Channel channel) throws IOException {
        try {
            String messageId = message.getMessageProperties().getMessageId();
            String msg = new String(message.getBody(), "UTF-8");
            log.info(">>>messageId:{},msg:{}", messageId, msg);
            JSONObject jsonObject = JSONObject.parseObject(msg);
            String paymentId = jsonObject.getString("paymentId");
            if (StringUtils.isEmpty(paymentId)) {
                log.error(">>>>支付id不能为空 paymentId:{}", paymentId);
                basicNack(message, channel);
                return;
            }
            // 1.使用paymentId查询之前是否已经支付过
            PaymentTransactionEntity paymentTransactionEntity = paymentTransactionMapper.selectByPaymentId(paymentId);
            if (paymentTransactionEntity == null) {
                log.error(">>>>支付id paymentId:{} 未查询到", paymentId);
                basicNack(message, channel);
                return;
            }
            Integer paymentStatus = paymentTransactionEntity.getPaymentStatus();
            if (paymentStatus.equals(PayConstant.PAY_STATUS_SUCCESS)) {
                log.error(">>>>支付id paymentId:{} ", paymentId);
                basicNack(message, channel);
                return;
            }
            // 安全期间 主动调用第三方接口查询
            String paymentChannel = jsonObject.getString("paymentChannel");
            int updatePaymentStatus = paymentTransactionMapper.updatePaymentStatus(PayConstant.PAY_STATUS_SUCCESS + "",
                    paymentId, paymentChannel);
            if (updatePaymentStatus > 0) {
                basicNack(message, channel);
                return;
            }
            // 继续重试
        } catch (Exception e) {
            e.printStackTrace();
            basicNack(message, channel);
        }
    }
    private void basicNack(Message message, Channel channel) throws IOException {
        channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);
    }
}

②增加积分消费者代码(注意这里使用了手动签收)::

/**
 * description: 积分服务消费者
 * create by: YangLinWei
 * create time: 2020/5/19 2:10 下午
 */
@Component
@Slf4j
public class IntegralConsumer {
  @Autowired
  private IntegralMapper integralMapper;
  @RabbitListener(queues = "integral_queue")
  public void process(Message message, @Headers Map<String, Object> headers, Channel channel) throws IOException {
    try {
      String messageId = message.getMessageProperties().getMessageId();
      String msg = new String(message.getBody(), "UTF-8");
      log.info(">>>messageId:{},msg:{}", messageId, msg);
      JSONObject jsonObject = JSONObject.parseObject(msg);
      String paymentId = jsonObject.getString("paymentId");
      if (StringUtils.isEmpty(paymentId)) {
        log.error(">>>>支付id不能为空 paymentId:{}", paymentId);
        basicNack(message, channel);
        return;
      }
      // 使用paymentId查询是否已经增加过积分 网络重试间隔
      IntegralEntity resultIntegralEntity = integralMapper.findIntegral(paymentId);
      if (resultIntegralEntity != null) {
        log.error(">>>>paymentId:{}已经增加过积分", paymentId);
        // 已经增加过积分,通知MQ不要在继续重试。
        basicNack(message, channel);
        return;
      }
      Integer userId = jsonObject.getInteger("userId");
      if (userId == null) {
        log.error(">>>>paymentId:{},对应的用户userId参数为空", paymentId);
        basicNack(message, channel);
        return;
      }
      Long integral = jsonObject.getLong("integral");
      if (integral == null) {
        log.error(">>>>paymentId:{},对应的用户integral参数为空", integral);
        return;
      }
      IntegralEntity integralEntity = new IntegralEntity();
      integralEntity.setPaymentId(paymentId);
      integralEntity.setIntegral(integral);
      integralEntity.setUserId(userId);
      integralEntity.setAvailability(1);
      // 插入到数据库中
      int insertIntegral = integralMapper.insertIntegral(integralEntity);
      if (insertIntegral > 0) {
        // 手动签收消息,通知mq服务器端删除该消息
        basicNack(message, channel);
      }
      // 采用重试机制
    } catch (Exception e) {
      log.error(">>>>ERROR MSG:", e.getMessage());
      basicNack(message, channel);
    }
  }
  // 消费者获取到消息之后 手动签收 通知MQ删除该消息
  private void basicNack(Message message, Channel channel) throws IOException {
    channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);
  }
}

4.测试

依次启动Eureka注册中心、xxlsso单点登录系统、member会员服务、pay支付服务、pay-web支付门户服务、还有integral积分服务,启动后如下图:

启动RabbitMQ服务(我的是Mac系统,已经启动的可以忽略):

cd /usr/local/Cellar/rabbitmq/3.8.2/sbin
./rabbitmq-server -detached

①模拟新增订单,浏览器输入:http://localhost:8600/cratePayToken?payAmount=999&orderId=20200513141452&userId=27&productName=玉米香肠

②确认提交订单,浏览器输入:http://localhost:8079/pay?payToken=pay_88c6262f3a494ae98d0873283514abf5

可以看到当前数据库,订单状态为未支付:

③按照提示,使用银联支付,一步一步直至支付完成:

可以看到,订单支付状态为已支付(也就是说订单支付状态补偿消费者已经接收到消息,并处理订单为已支付):

而且积分表也增加了一条数据(也是是说增加积分消费者已收到消息,并增加了一条积分数据):

本文完!

相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
目录
相关文章
|
5月前
|
消息中间件 前端开发 数据库
RocketMQ实战教程之MQ简介与应用场景
RocketMQ实战教程介绍了MQ的基本概念和应用场景。MQ(消息队列)是生产者和消费者模型,用于异步传输数据,实现系统解耦。消息中间件在生产者发送消息和消费者接收消息之间起到邮箱作用,简化通信。主要应用场景包括:1)应用解耦,如订单系统与库存系统的非直接交互;2)异步处理,如用户注册后的邮件和短信发送延迟处理,提高响应速度;3)流量削峰,如秒杀活动限制并发流量,防止系统崩溃。
|
2月前
|
消息中间件 存储 传感器
RabbitMQ 在物联网 (IoT) 项目中的应用案例
【8月更文第28天】随着物联网技术的发展,越来越多的设备被连接到互联网上以收集和传输数据。这些设备可以是传感器、执行器或其他类型的硬件。为了有效地管理这些设备并处理它们产生的大量数据,需要一个可靠的消息传递系统。RabbitMQ 是一个流行的开源消息中间件,它提供了一种灵活的方式来处理和转发消息,非常适合用于物联网环境。
91 1
|
2月前
|
消息中间件 存储 NoSQL
MQ的顺序性保证:顺序队列、消息编号、分布式锁,一文全掌握!
【8月更文挑战第24天】消息队列(MQ)是分布式系统的关键组件,用于实现系统解耦、提升可扩展性和可用性。保证消息顺序性是其重要挑战之一。本文介绍三种常用策略:顺序队列、消息编号与分布式锁,通过示例展示如何确保消息按需排序。这些方法各有优势,可根据实际场景灵活选用。提供的Java示例有助于加深理解与实践应用。
55 2
|
2月前
|
消息中间件 监控 RocketMQ
分布式事务实现方案:一文详解RocketMQ事务消息
分布式事务实现方案:一文详解RocketMQ事务消息
|
2月前
|
消息中间件 固态存储 RocketMQ
RocketMQ消息堆积常见场景与处理方案
文章分析了在使用RocketMQ时消息堆积的常见场景,如消费者注册失败或消费速度慢于生产速度,并提供了相应的处理方案,包括提高消费并行度、批量消费、跳过非重要消息以及优化消费代码业务逻辑等。
|
3月前
|
消息中间件 物联网 API
消息队列 MQ使用问题之如何在物联网项目中搭配使用 MQTT、AMQP 与 RabbitMQ
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
|
3月前
|
消息中间件 存储 RocketMQ
MetaQ/RocketMQ 原理问题之在解耦场景中,消息队列工作的问题如何解决
MetaQ/RocketMQ 原理问题之在解耦场景中,消息队列工作的问题如何解决
|
4月前
|
消息中间件 中间件 程序员
分布式事务大揭秘:使用MQ实现最终一致性
本文由小米分享,介绍分布式事务中的MQ最终一致性实现,以RocketMQ为例。RocketMQ的事务消息机制包括准备消息、本地事务执行、确认/回滚消息及事务状态检查四个步骤。这种机制通过消息队列协调多系统操作,确保数据最终一致。MQ最终一致性具有系统解耦、提高可用性和灵活事务管理等优点,广泛应用于分布式系统中。文章还讨论了RocketMQ的事务消息处理流程和失败情况下的处理策略,帮助读者理解如何在实际应用中解决分布式事务问题。
305 6
|
4月前
|
消息中间件 监控 调度
构建Python中的分布式系统结合Celery与RabbitMQ
在当今的软件开发中,构建高效的分布式系统是至关重要的。Python作为一种流行的编程语言,提供了许多工具和库来帮助开发人员构建分布式系统。其中,Celery和RabbitMQ是两个强大的工具,它们结合在一起可以为你的Python应用程序提供可靠的异步任务队列和消息传递机制。
|
3月前
|
运维 监控 Java
在大数据场景下,Elasticsearch作为分布式搜索与分析引擎,因其扩展性和易用性成为全文检索首选。
【7月更文挑战第1天】在大数据场景下,Elasticsearch作为分布式搜索与分析引擎,因其扩展性和易用性成为全文检索首选。本文讲解如何在Java中集成Elasticsearch,包括安装配置、使用RestHighLevelClient连接、创建索引和文档操作,以及全文检索查询。此外,还涉及高级查询、性能优化和故障排查,帮助开发者高效处理非结构化数据。
58 0