引言
本文代码已提交至Github(版本号:
52553aa6fe8b34ff162a1fb33e8f58494b4d2c3f
),有兴趣的同学可以下载来看看:https://github.com/ylw-github/taodong-shop
阅读本文前,有兴趣的同学可以参考我之前写的聚合支付的文章:
- 《淘东电商项目(52) -聚合支付开篇》
- 《淘东电商项目(53) -银联支付案例源码分析》
- 《淘东电商项目(54) -银联支付案例(同步与异步)》
- 《淘东电商项目(55) -支付系统核心表设计》
- 《淘东电商项目(56) -支付系统分布式事务的解决方案》
- 《淘东电商项目(57) -聚合支付(支付令牌接口)》
- 《淘东电商项目(58) -聚合支付(基于设计模式自动跳转支付接口)》
- 《淘东电商项目(59) -聚合支付(集成银联支付)》
- 《淘东电商项目(60) -聚合支付(集成支付宝)》
- 《淘东电商项目(61) -聚合支付(基于模板方法设计模式管理支付回调)》
- 《淘东电商项目(62) -聚合支付(基于模板方法设计模式管理支付回调-支付宝)》
- 《淘东电商项目(63) -聚合支付(多线程日志收集)》
- 《淘东电商项目(64) -聚合支付(XXL-JOB任务调度平台整合)》
- 《淘东电商项目(65) -聚合支付(异步对账)》
本文讲解聚合支付最后的一个问题 - 分布式事务。举个例子,比如要增加一个“积分功能”,当第三方服务器异步返回支付成功结果,请求我们的支付服务器时,同时也要做积分增加的功能,如何能保证,支付结果插入数据库成功的同时保证积分一定能增加成功呢?这里涉及到了分布式事务的问题,本文主要基于Rabbit来解决这个问题。
本文目录结构:
1.原理图
如上图,如果支付成功,第三方支付服务器会请求项目的支付服务,返回支付结果,这个时候,我们代码要处理的是如下步骤:
- 更新订单状态为“已支付”,即
status
为1(注意,这里的方法使用了@Transactional
事务注解修饰) - 更新了支付状态之后,会使用MQ来生产消息,生产增加积分消息MSG
- 如果这个时候程序出错,会回滚,也就是订单的状态在数据库中没有修改,而已经增加了积分。
针对以上的问题,做出了如下的解决方案:
- 对于第2个步骤,使用RabbitMQ的消息确认机制,保证消息一定可以投递到RabbitMQ服务器的增加积分队列,消费者使用手动签收的方式,保证消息一定可以消费到,并把增加积分消息更新到数据库的积分表中。
- 对于第3个步骤,如果程序出错了,会回滚,因此数据库部分的代码不生效,订单的支付状态没变,所以增加多了一个支付状态补偿队列,当支付状态补偿消费者接收到消息后,会检查支付状态是否已经修改,如果没有修改,则更新订单的状态。
从上面的解决步骤,可以知道,使用RabbitMQ保证了积分一定可以更新本地数据库,同时订单状态一定可以修改,达到最终一致性的效果,同时解决了分布式事务的问题。
2.积分数据库建表
讲解前,先贴上积分数据库的建表语句:
CREATE TABLE `integral` ( `ID` int(11) NOT NULL AUTO_INCREMENT COMMENT '主键ID', `USER_ID` int(11) DEFAULT NULL COMMENT '用户ID', `PAYMENT_ID` varchar(1024) DEFAULT NULL COMMENT '支付ID', `INTEGRAL` varchar(32) DEFAULT NULL COMMENT '积分', `AVAILABILITY` int(11) DEFAULT NULL COMMENT '是否可用', `REVISION` int(11) DEFAULT NULL COMMENT '乐观锁', `CREATED_BY` varchar(32) DEFAULT NULL COMMENT '创建人', `CREATED_TIME` datetime DEFAULT NULL COMMENT '创建时间', `UPDATED_BY` varchar(32) DEFAULT NULL COMMENT '更新人', `UPDATED_TIME` datetime DEFAULT NULL COMMENT '更新时间', PRIMARY KEY (`ID`) ) ENGINE=InnoDB AUTO_INCREMENT=47 DEFAULT CHARSET=utf8 COMMENT=' ';
3.核心代码
3.1 集成RabbitMQ
RabbitMQ的搭建本文不再详述,之前有讲解过,有兴趣的童鞋可以参阅之前写过的文章: 《消息中间件系列教程(04) -RabbitMQ -简介&安装》,下面开始讲解项目集成。
①添加maven依赖:
<!-- 添加springboot对amqp的支持 --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
②applicatoin.yml配置:
spring: rabbitmq: ####连接地址 host: 127.0.0.1 ####端口号 port: 5672 ####账号 username: guest ####密码 password: guest ### 地址 virtual-host: integral_host ###开启消息确认机制 confirms publisher-confirms: true publisher-returns: true
③RabbitMQ配置文件:
@Component public class RabbitmqConfig { // 添加积分队列 public static final String INTEGRAL_DIC_QUEUE = "integral_queue"; // 补单队列, public static final String INTEGRAL_CREATE_QUEUE = "integral_create_queue"; // 积分交换机 private static final String INTEGRAL_EXCHANGE_NAME = "integral_exchange_name"; // 1.定义订单队列 @Bean public Queue directIntegralDicQueue() { return new Queue(INTEGRAL_DIC_QUEUE); } // 2.定义补订单队列 @Bean public Queue directCreateintegralQueue() { return new Queue(INTEGRAL_CREATE_QUEUE); } // 2.定义交换机 @Bean DirectExchange directintegralExchange() { return new DirectExchange(INTEGRAL_EXCHANGE_NAME); } // 3.积分队列与交换机绑定 @Bean Binding bindingExchangeintegralDicQueue() { return BindingBuilder.bind(directIntegralDicQueue()).to(directintegralExchange()).with("integralRoutingKey"); } // 3.补单队列与交换机绑定 @Bean Binding bindingExchangeCreateintegral() { return BindingBuilder.bind(directCreateintegralQueue()).to(directintegralExchange()).with("integralRoutingKey"); } }
③在RabbitMQ控制台增加virtual-host:
④分配guest对新增的virtual-host有用户权限:
3.2 生产者代码
①生产者代码(注意里面用了消息确认机制,且使用订单的id作为全局唯一id来解决幂等性的问题):
/** * description: 生产者投递积分 * create by: YangLinWei * create time: 2020/5/19 11:37 上午 */ @Component @Slf4j public class IntegralProducer implements RabbitTemplate.ConfirmCallback { @Autowired private RabbitTemplate rabbitTemplate; @Transactional public void send(JSONObject jsonObject) { String jsonString = jsonObject.toJSONString(); System.out.println("jsonString:" + jsonString); String paymentId = jsonObject.getString("paymentId"); // 封装消息 Message message = MessageBuilder.withBody(jsonString.getBytes()) .setContentType(MessageProperties.CONTENT_TYPE_JSON).setContentEncoding("utf-8").setMessageId(paymentId) .build(); // 构建回调返回的数据(消息id) this.rabbitTemplate.setMandatory(true); this.rabbitTemplate.setConfirmCallback(this); CorrelationData correlationData = new CorrelationData(jsonString); rabbitTemplate.convertAndSend("integral_exchange_name", "integralRoutingKey", message, correlationData); } // 生产消息确认机制 生产者往服务器端发送消息的时候,采用应答机制 @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { String jsonString = correlationData.getId(); System.out.println("消息id:" + correlationData.getId()); if (ack) { log.info(">>>使用MQ消息确认机制确保消息一定要投递到MQ中成功"); return; } JSONObject jsonObject = JSONObject.parseObject(jsonString); // 生产者消息投递失败的话,采用递归重试机制 send(jsonObject); log.info(">>>使用MQ消息确认机制投递到MQ中失败"); } }
②调用生产者处的代码,在支付结果异步回调处处理(银联支付结果异步回调处处理UnionPayCallbackTemplate
,注意发送MQ使用了@Async
注解,不阻塞当前线程)注意下面模拟抛异常了:
@Override public String asyncService(Map<String, String> verifySignature) { String orderId = verifySignature.get("orderId"); // 获取后台通知的数据,其他字段也可用类似方式获取 String respCode = verifySignature.get("respCode"); // 判断respCode=00、A6后,对涉及资金类的交易,请再发起查询接口查询,确定交易成功后更新数据库。 System.out.println("orderId:" + orderId + ",respCode:" + respCode); // 1.判断respCode是否为已经支付成功断respCode=00、A6后, if (!(respCode.equals("00") || respCode.equals("A6"))) { return failResult(); } // 根据日志 手动补偿 使用支付id调用第三方支付接口查询 PaymentTransactionEntity paymentTransaction = paymentTransactionMapper.selectByPaymentId(orderId); if (paymentTransaction.getPaymentStatus().equals(PayConstant.PAY_STATUS_SUCCESS)) { // 网络重试中,之前已经支付过 return successResult(); } // 2.将状态改为已经支付成功 paymentTransactionMapper.updatePaymentStatus(PayConstant.PAY_STATUS_SUCCESS + "", orderId+"","yinlian_pay"); // 3.调用积分服务接口增加积分(处理幂等性问题) MQ addMQIntegral(paymentTransaction); // 使用MQ int i = 1 / 0; // 支付状态还是为待支付状态但是 积分缺增加 return successResult(); } /** * 基于MQ增加积分 */ @Async public void addMQIntegral(PaymentTransactionEntity paymentTransaction) { JSONObject jsonObject = new JSONObject(); jsonObject.put("paymentId", paymentTransaction.getPaymentId()); jsonObject.put("userId", paymentTransaction.getUserId()); jsonObject.put("integral", 100); integralProducer.send(jsonObject); }
3.3 消费者代码
①首先看看支付状态补偿消费者代码(注意这里使用了手动签收):
/** * description: 支付回调检查状态,是否为已经支付完成 * create by: YangLinWei * create time: 2020/5/19 1:52 下午 */ @Component @Slf4j public class PayCheckStateConsumer { @Autowired private PaymentTransactionMapper paymentTransactionMapper; // 死信队列(备胎) 消息被拒绝、队列长度满了 定时任务 人工补偿 @RabbitListener(queues = "integral_create_queue") public void process(Message message, @Headers Map<String, Object> headers, Channel channel) throws IOException { try { String messageId = message.getMessageProperties().getMessageId(); String msg = new String(message.getBody(), "UTF-8"); log.info(">>>messageId:{},msg:{}", messageId, msg); JSONObject jsonObject = JSONObject.parseObject(msg); String paymentId = jsonObject.getString("paymentId"); if (StringUtils.isEmpty(paymentId)) { log.error(">>>>支付id不能为空 paymentId:{}", paymentId); basicNack(message, channel); return; } // 1.使用paymentId查询之前是否已经支付过 PaymentTransactionEntity paymentTransactionEntity = paymentTransactionMapper.selectByPaymentId(paymentId); if (paymentTransactionEntity == null) { log.error(">>>>支付id paymentId:{} 未查询到", paymentId); basicNack(message, channel); return; } Integer paymentStatus = paymentTransactionEntity.getPaymentStatus(); if (paymentStatus.equals(PayConstant.PAY_STATUS_SUCCESS)) { log.error(">>>>支付id paymentId:{} ", paymentId); basicNack(message, channel); return; } // 安全期间 主动调用第三方接口查询 String paymentChannel = jsonObject.getString("paymentChannel"); int updatePaymentStatus = paymentTransactionMapper.updatePaymentStatus(PayConstant.PAY_STATUS_SUCCESS + "", paymentId, paymentChannel); if (updatePaymentStatus > 0) { basicNack(message, channel); return; } // 继续重试 } catch (Exception e) { e.printStackTrace(); basicNack(message, channel); } } private void basicNack(Message message, Channel channel) throws IOException { channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false); } }
②增加积分消费者代码(注意这里使用了手动签收)::
/** * description: 积分服务消费者 * create by: YangLinWei * create time: 2020/5/19 2:10 下午 */ @Component @Slf4j public class IntegralConsumer { @Autowired private IntegralMapper integralMapper; @RabbitListener(queues = "integral_queue") public void process(Message message, @Headers Map<String, Object> headers, Channel channel) throws IOException { try { String messageId = message.getMessageProperties().getMessageId(); String msg = new String(message.getBody(), "UTF-8"); log.info(">>>messageId:{},msg:{}", messageId, msg); JSONObject jsonObject = JSONObject.parseObject(msg); String paymentId = jsonObject.getString("paymentId"); if (StringUtils.isEmpty(paymentId)) { log.error(">>>>支付id不能为空 paymentId:{}", paymentId); basicNack(message, channel); return; } // 使用paymentId查询是否已经增加过积分 网络重试间隔 IntegralEntity resultIntegralEntity = integralMapper.findIntegral(paymentId); if (resultIntegralEntity != null) { log.error(">>>>paymentId:{}已经增加过积分", paymentId); // 已经增加过积分,通知MQ不要在继续重试。 basicNack(message, channel); return; } Integer userId = jsonObject.getInteger("userId"); if (userId == null) { log.error(">>>>paymentId:{},对应的用户userId参数为空", paymentId); basicNack(message, channel); return; } Long integral = jsonObject.getLong("integral"); if (integral == null) { log.error(">>>>paymentId:{},对应的用户integral参数为空", integral); return; } IntegralEntity integralEntity = new IntegralEntity(); integralEntity.setPaymentId(paymentId); integralEntity.setIntegral(integral); integralEntity.setUserId(userId); integralEntity.setAvailability(1); // 插入到数据库中 int insertIntegral = integralMapper.insertIntegral(integralEntity); if (insertIntegral > 0) { // 手动签收消息,通知mq服务器端删除该消息 basicNack(message, channel); } // 采用重试机制 } catch (Exception e) { log.error(">>>>ERROR MSG:", e.getMessage()); basicNack(message, channel); } } // 消费者获取到消息之后 手动签收 通知MQ删除该消息 private void basicNack(Message message, Channel channel) throws IOException { channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false); } }
4.测试
依次启动Eureka注册中心、xxlsso单点登录系统、member会员服务、pay支付服务、pay-web支付门户服务、还有integral积分服务,启动后如下图:
启动RabbitMQ服务(我的是Mac系统,已经启动的可以忽略):
cd /usr/local/Cellar/rabbitmq/3.8.2/sbin ./rabbitmq-server -detached
①模拟新增订单,浏览器输入:http://localhost:8600/cratePayToken?payAmount=999&orderId=20200513141452&userId=27&productName=玉米香肠
②确认提交订单,浏览器输入:http://localhost:8079/pay?payToken=pay_88c6262f3a494ae98d0873283514abf5
可以看到当前数据库,订单状态为未支付:
③按照提示,使用银联支付,一步一步直至支付完成:
可以看到,订单支付状态为已支付(也就是说订单支付状态补偿消费者已经接收到消息,并处理订单为已支付):
而且积分表也增加了一条数据(也是是说增加积分消费者已收到消息,并增加了一条积分数据):
本文完!