分布式事物【库存微服务业务层实现、实现充值微服务、充值微服务之业务层实现、账户微服务之业务层实现】(九)-全面详解(学习总结---从入门到深化)(上):https://developer.aliyun.com/article/1419997
最大努力通知型分布式事务实战_实现充值微服务
主要实现功能
1、充值接口
2、查询充值结果接口
创建父项目rocketmq-notifymsg
创建子工程
引入依赖
<dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starterweb</artifactId> </dependency> <dependency> <groupId>com.baomidou</groupId> <artifactId>mybatis-plus-bootstarter</artifactId> </dependency> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connectorjava</artifactId> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> </dependency> <dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-spring-bootstarter</artifactId> <version>2.0.1</version> </dependency> <!-- 引入nacos依赖 --> <dependency> <groupId>com.alibaba.cloud</groupId> <artifactId>spring-cloud-starteralibaba-nacos-discovery</artifactId> </dependency> </dependencies>
编写主启动类
@EnableDiscoveryClient @MapperScan("com.tong.payment.mapper") @SpringBootApplication @Slf4j public class PayMain7071 { public static void main(String[] args) { SpringApplication.run(PayMain7071.class,args); log.info("*********** 充值服务启动成功*********"); } }
编写配置文件
server: port: 7071 spring: cloud: nacos: discovery: server-addr: 192.168.66.100:8848 application: name: tx-notifymsg-pay datasource: url: jdbc:mysql://192.168.66.100:3306/txnotifymsg-payment? useUnicode=true&characterEncoding=UTF8&useOldAliasMetadataBehavior=true&autoReconnec t=true&failOverReadOnly=false&useSSL=false username: root password01: 123456 driver-class-name: com.mysql.cj.jdbc.Driver ################ RocketMQ 配置 ########## rocketmq: name-server: 192.168.66.100:9876 producer: group: payment-group
最大努力通知型分布式事务_充值微服务之业务层实现
充值微服务的业务逻辑层主要完成充值的业务逻辑处理,当充值成功时,会向RocketMQ发送充值结果信息,同时提供业务逻辑层查询充值结果信息的接口。
编写充值接口
public interface IPayInfoService extends IService<PayInfo> { /** * 保存充值信息 */ PayInfo savePayInfo(PayInfo payInfo); /** * 查询指定的充值信息 */ PayInfo getPayInfoByTxNo(String txNo); }
充值接口实现
@Slf4j @Service public class PayInfoServiceImpl extends ServiceImpl<PayInfoMapper, PayInfo> implements IPayInfoService { @Resource private PayInfoMapper payInfoMapper; @Resource private RocketMQTemplate rocketMQTemplate; @Override public PayInfo savePayInfo(PayInfo payInfo) { payInfo.setTxNo(UUID.randomUUID().toString().replace("-","")); payInfo.setPayResult("success"); payInfo.setPayTime(LocalDateTime.now()); int count = payInfoMapper.insert(payInfo); //充值信息保存成功 if(count > 0){ log.info("充值微服务向账户微服务发送结果消息"); //发送消息通知账户微服务 rocketMQTemplate.convertAndSend("topic_nofitymsg",JSON.toJSONString(payInfo)); return payInfo; } return null; } @Override public PayInfo getPayInfoByTxNo(String txNo) { return baseMapper.selectById(txNo); } }
编写充值接口
@RestController @RequestMapping("/payInfo") public class PayInfoController { @Autowired private IPayInfoService payInfoService; /** * 充值 * @param payInfo * @return */ @GetMapping(value = "/pay_account") public PayInfo pay(PayInfo payInfo){ //生成事务编号 return payInfoService.savePayInfo(payInfo); } /** * 查询充值结果 * @param txNo * @return */ @GetMapping(value = "/query/payresult/{txNo}") public PayInfo payResult(@PathVariable("txNo") String txNo){ return payInfoService.getPayInfoByTxNo(txNo); } }
最大努力通知型分布式事务_实现账户微服务
创建子工程account
引入依赖
<dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starterweb</artifactId> </dependency> <dependency> <groupId>com.baomidou</groupId> <artifactId>mybatis-plus-bootstarter</artifactId> </dependency> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connectorjava</artifactId> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> </dependency> <dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-spring-bootstarter</artifactId> <version>2.0.1</version> </dependency> <!-- 引入Nacos注册中心 --> <dependency> <groupId>com.alibaba.cloud</groupId> <artifactId>spring-cloud-starteralibaba-nacos-discovery</artifactId> </dependency> <!-- 引入OpenFeign --> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starteropenfeign</artifactId> </dependency> <!-- 引入负载均衡器--> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloudloadbalancer</artifactId> </dependency> </dependencies>
编写配置文件
server: port: 7070 spring: cloud: nacos: discovery: server-addr: 192.168.66.100:8848 application: name: tx-notifymsg-account datasource: url: jdbc:mysql://192.168.66.100:3306/txnotifymsg-account? useUnicode=true&characterEncoding=UTF8&useOldAliasMetadataBehavior=true&autoReconnec t=true&failOverReadOnly=false&useSSL=false username: root password01: 123456 driver-class-name: com.mysql.jdbc.Driver ################ RocketMQ 配置 ########## rocketmq: name-server: 192.168.66.100:9876
最大努力通知型分布式事务_账户微服务之业务层实现
RocketMQ消费充值信息
@Slf4j @Component @RocketMQMessageListener(consumerGroup = "consumer_group_account", topic = "topic_nofitymsg") public class NotifyMsgAccountListener implements RocketMQListener<String> { @Autowired private IAccountInfoService accountInfoService; @Override public void onMessage(String message) { log.info("账户微服务收到RocketMQ的消息: {}", JSONObject.toJSONString(message)); //如果是充值成功,则修改账户余额 PayInfo payInfo = JSON.parseObject(message, PayInfo.class); if("success".equals(payInfo.getPayResult())){ accountInfoService.updateAccountBalance(payInfo); } log.info("更新账户余额完毕:{}", JSONObject.toJSONString(payInfo)); } }
编写账户操作接口
/** * 更新账户余额 */ void updateAccountBalance(PayInfo payInfo);
实现账户操作接口
@Slf4j @Service public class AccountInfoServiceImpl extends ServiceImpl<AccountInfoMapper, AccountInfo> implements IAccountInfoService { @Resource private AccountInfoMapper accountInfoMapper; @Resource private PayInfoMapper payInfoMapper; /** * * @param payInfo */ @Transactional(rollbackFor = Exception.class) @Override public void updateAccountBalance(PayInfo payInfo) { if(payInfoMapper.selectById(payInfo.getTxNo()) != null){ log.info("账户微服务已经处理过当前事务..."); return; } LambdaUpdateWrapper<AccountInfo> lambdaUpdateWrapper = new LambdaUpdateWrapper<>(); lambdaUpdateWrapper.eq(AccountInfo::getAccountNo,payInfo.getAccountNo()); //更新账户余额 List<AccountInfo> accountInfos = baseMapper.selectList(lambdaUpdateWrapper); if (accountInfos != null && !accountInfos.isEmpty()){ AccountInfo accountInfo = accountInfos.get(0); accountInfo.setAccountBalance(accountInfo.getAccountBalance().add(payInfo.getPayAmount())); accountInfoMapper.updateById(accountInfo); } //保存充值记录 payInfoMapper.insert(payInfo); } }
最大努力通知型分布式事务_账户微服务远程调用实现
主启动类加Feign注解
@EnableDiscoveryClient @EnableFeignClients @MapperScan("com.tong.account.mapper") @SpringBootApplication @Slf4j public class AccountMain7070 { public static void main(String[] args) { SpringApplication.run(AccountMain7070.class,args); log.info("*********** AccountMain7070启动成功 *********"); } }
编写远程调用接口
@Service @FeignClient("tx-notifymsg-pay") public interface IPayFeignService { @GetMapping(value = "/payInfo/query/payresult/{txNo}") PayInfo payResult(@PathVariable("txNo") String txNo); }
编写查询账户接口
/** * 查询充值结果 */ PayInfo queryPayResult(String txNo);
实现查询账户信息
/** * 查询结果 * @param txNo * @return */ @Override public PayInfo queryPayResult(String txNo) { try{ return iPayFeignService.payResult(txNo); }catch (Exception e){ log.error("查询充值结果异常:{}", e); } return null; }
编写查询充值结果接口
/** * 主动查询充值结果 * @param txNo * @return */ @GetMapping(value = "/query/payresult/{txNo}") public ResponseEntity result(@PathVariable("txNo") String txNo){ return ResponseEntity.ok(accountInfoService.queryPayResult(txNo)); }
最大努力通知型分布式事务_测试程序
查看account库和payment库数据
启动账户和充值微服务
调用充值微服务的接口http://localhost:7071/payInfo/pay_accoun t为账户编号为1001的账户充值1000元。
账户微服务的日志文件中输出如下信息
可以看到,充值微服务将充值结果信息成功发送到了RocketMQ, 并且账户微服务成功订阅了RocketMQ的消息并执行了本地事务。
查询充值结果