分布式事物【库存微服务业务层实现、实现充值微服务、充值微服务之业务层实现、账户微服务之业务层实现】(九)-全面详解(学习总结---从入门到深化)(下)

本文涉及的产品
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
云数据库 RDS MySQL,集群系列 2核4GB
推荐场景:
搭建个人博客
云数据库 RDS PostgreSQL,集群系列 2核4GB
简介: 分布式事物【库存微服务业务层实现、实现充值微服务、充值微服务之业务层实现、账户微服务之业务层实现】(九)-全面详解(学习总结---从入门到深化)

分布式事物【库存微服务业务层实现、实现充值微服务、充值微服务之业务层实现、账户微服务之业务层实现】(九)-全面详解(学习总结---从入门到深化)(上):https://developer.aliyun.com/article/1419997


最大努力通知型分布式事务实战_实现充值微服务



主要实现功能


1、充值接口

2、查询充值结果接口


创建父项目rocketmq-notifymsg


创建子工程


引入依赖

     <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starterweb</artifactId>
        </dependency>
        <dependency>
            <groupId>com.baomidou</groupId>
            <artifactId>mybatis-plus-bootstarter</artifactId>
        </dependency>
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connectorjava</artifactId>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
        </dependency>
        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-spring-bootstarter</artifactId>
            <version>2.0.1</version>
        </dependency>
        <!-- 引入nacos依赖 -->
        <dependency>
            <groupId>com.alibaba.cloud</groupId>
            <artifactId>spring-cloud-starteralibaba-nacos-discovery</artifactId>
        </dependency>
    </dependencies>


编写主启动类

@EnableDiscoveryClient
@MapperScan("com.tong.payment.mapper")
@SpringBootApplication
@Slf4j
public class PayMain7071 {
    public static void main(String[] args) {
       SpringApplication.run(PayMain7071.class,args);
        log.info("*********** 充值服务启动成功*********");
   }
}


编写配置文件

server:
 port: 7071
spring:
 cloud:
   nacos:
     discovery:
       server-addr: 192.168.66.100:8848
 application:
   name: tx-notifymsg-pay
 datasource:
   url: jdbc:mysql://192.168.66.100:3306/txnotifymsg-payment?
useUnicode=true&characterEncoding=UTF8&useOldAliasMetadataBehavior=true&autoReconnec
t=true&failOverReadOnly=false&useSSL=false
   username: root
   password01: 123456
   driver-class-name: com.mysql.cj.jdbc.Driver
################ RocketMQ 配置 ##########
rocketmq:
 name-server: 192.168.66.100:9876
 producer:
   group: payment-group


最大努力通知型分布式事务_充值微服务之业务层实现



充值微服务的业务逻辑层主要完成充值的业务逻辑处理,当充值成功时,会向RocketMQ发送充值结果信息,同时提供业务逻辑层查询充值结果信息的接口。


编写充值接口

public interface IPayInfoService extends IService<PayInfo> {
    /**
     * 保存充值信息
     */
    PayInfo savePayInfo(PayInfo payInfo);
    /**
     * 查询指定的充值信息
     */
    PayInfo getPayInfoByTxNo(String txNo);
}


充值接口实现

@Slf4j
@Service
public class PayInfoServiceImpl extends
ServiceImpl<PayInfoMapper, PayInfo> implements IPayInfoService {
    @Resource
    private PayInfoMapper payInfoMapper;
    @Resource
    private RocketMQTemplate rocketMQTemplate;
    @Override
    public PayInfo savePayInfo(PayInfo payInfo)
{
      payInfo.setTxNo(UUID.randomUUID().toString().replace("-",""));
        payInfo.setPayResult("success");
      payInfo.setPayTime(LocalDateTime.now());
        int count = payInfoMapper.insert(payInfo);
        //充值信息保存成功
        if(count > 0){
            log.info("充值微服务向账户微服务发送结果消息");
            //发送消息通知账户微服务
            rocketMQTemplate.convertAndSend("topic_nofitymsg",JSON.toJSONString(payInfo));
            return payInfo;
       }
        return null;
   }
    @Override
    public PayInfo getPayInfoByTxNo(String txNo) {
        return baseMapper.selectById(txNo);
   }
}


编写充值接口

@RestController
@RequestMapping("/payInfo")
public class PayInfoController {
    @Autowired
    private IPayInfoService payInfoService;
    /**
     * 充值
     * @param payInfo
     * @return
     */
    @GetMapping(value = "/pay_account")
    public PayInfo pay(PayInfo payInfo){
        //生成事务编号
        return payInfoService.savePayInfo(payInfo);
   }
    /**
     * 查询充值结果
     * @param txNo
     * @return
     */
    @GetMapping(value = "/query/payresult/{txNo}")
    public PayInfo payResult(@PathVariable("txNo") String txNo){
        return payInfoService.getPayInfoByTxNo(txNo);
   }
}


最大努力通知型分布式事务_实现账户微服务


创建子工程account


引入依赖

     <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starterweb</artifactId>
        </dependency>
        <dependency>
            <groupId>com.baomidou</groupId>
            <artifactId>mybatis-plus-bootstarter</artifactId>
        </dependency>
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connectorjava</artifactId>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
        </dependency>
        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-spring-bootstarter</artifactId>
            <version>2.0.1</version>
        </dependency>
        <!--   引入Nacos注册中心   -->
        <dependency>
           <groupId>com.alibaba.cloud</groupId>
           <artifactId>spring-cloud-starteralibaba-nacos-discovery</artifactId>
        </dependency>
        <!--   引入OpenFeign -->
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starteropenfeign</artifactId>
        </dependency>
        <!--   引入负载均衡器-->
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloudloadbalancer</artifactId>
        </dependency>
    </dependencies>


编写配置文件

server:
 port: 7070
spring:
 cloud:
   nacos:
     discovery:
       server-addr: 192.168.66.100:8848
 application:
   name: tx-notifymsg-account
 datasource:
   url: jdbc:mysql://192.168.66.100:3306/txnotifymsg-account?
useUnicode=true&characterEncoding=UTF8&useOldAliasMetadataBehavior=true&autoReconnec
t=true&failOverReadOnly=false&useSSL=false
   username: root
   password01: 123456
   driver-class-name: com.mysql.jdbc.Driver
################ RocketMQ 配置 ##########
rocketmq:
 name-server: 192.168.66.100:9876


最大努力通知型分布式事务_账户微服务之业务层实现



RocketMQ消费充值信息

@Slf4j
@Component
@RocketMQMessageListener(consumerGroup = "consumer_group_account", topic = "topic_nofitymsg")
public class NotifyMsgAccountListener implements RocketMQListener<String> {
    @Autowired
    private IAccountInfoService accountInfoService;
    @Override
    public void onMessage(String message) {
        log.info("账户微服务收到RocketMQ的消息: {}", JSONObject.toJSONString(message));
        //如果是充值成功,则修改账户余额
        PayInfo payInfo = JSON.parseObject(message, PayInfo.class);
      if("success".equals(payInfo.getPayResult())){
           accountInfoService.updateAccountBalance(payInfo);
       }
        log.info("更新账户余额完毕:{}", JSONObject.toJSONString(payInfo));
   }
}


编写账户操作接口

    /**
     * 更新账户余额
     */
    void updateAccountBalance(PayInfo payInfo);


实现账户操作接口

@Slf4j
@Service
public class AccountInfoServiceImpl extends
ServiceImpl<AccountInfoMapper, AccountInfo> implements IAccountInfoService {
    @Resource
    private AccountInfoMapper accountInfoMapper;
    @Resource
    private PayInfoMapper payInfoMapper;
    /**
     *
     * @param payInfo
     */
    @Transactional(rollbackFor = Exception.class)
    @Override
    public void updateAccountBalance(PayInfo payInfo) {
        if(payInfoMapper.selectById(payInfo.getTxNo()) != null){
            log.info("账户微服务已经处理过当前事务...");
            return;
       }
        LambdaUpdateWrapper<AccountInfo> lambdaUpdateWrapper = new LambdaUpdateWrapper<>();
        lambdaUpdateWrapper.eq(AccountInfo::getAccountNo,payInfo.getAccountNo());
        //更新账户余额
        List<AccountInfo> accountInfos = baseMapper.selectList(lambdaUpdateWrapper);
        if (accountInfos != null && !accountInfos.isEmpty()){
            AccountInfo accountInfo = accountInfos.get(0);
            accountInfo.setAccountBalance(accountInfo.getAccountBalance().add(payInfo.getPayAmount()));
            accountInfoMapper.updateById(accountInfo);
       }
        //保存充值记录
        payInfoMapper.insert(payInfo);
   }
}


最大努力通知型分布式事务_账户微服务远程调用实现



主启动类加Feign注解

@EnableDiscoveryClient
@EnableFeignClients
@MapperScan("com.tong.account.mapper")
@SpringBootApplication
@Slf4j
public class AccountMain7070 {
    public static void main(String[] args) {
       SpringApplication.run(AccountMain7070.class,args);
        log.info("*********** AccountMain7070启动成功 *********");
   }
}


编写远程调用接口

@Service
@FeignClient("tx-notifymsg-pay")
public interface IPayFeignService {
    @GetMapping(value = "/payInfo/query/payresult/{txNo}")
    PayInfo payResult(@PathVariable("txNo") String txNo);
}


编写查询账户接口

    /**
     * 查询充值结果
     */
    PayInfo queryPayResult(String txNo);


实现查询账户信息

    /**
     * 查询结果
     * @param txNo
     * @return
     */
    @Override
    public PayInfo queryPayResult(String txNo)
      {
        try{
            return iPayFeignService.payResult(txNo);
       }catch (Exception e){
            log.error("查询充值结果异常:{}", e);
       }
        return null;
   }


编写查询充值结果接口

    /**
     * 主动查询充值结果
     * @param txNo
     * @return
     */
  @GetMapping(value = "/query/payresult/{txNo}")
    public ResponseEntity result(@PathVariable("txNo") String txNo){
        return ResponseEntity.ok(accountInfoService.queryPayResult(txNo));
   }


最大努力通知型分布式事务_测试程序


查看account库和payment库数据


启动账户和充值微服务


调用充值微服务的接口http://localhost:7071/payInfo/pay_accoun t为账户编号为1001的账户充值1000元。



账户微服务的日志文件中输出如下信息


可以看到,充值微服务将充值结果信息成功发送到了RocketMQ, 并且账户微服务成功订阅了RocketMQ的消息并执行了本地事务。


查询充值结果

相关实践学习
如何快速连接云数据库RDS MySQL
本场景介绍如何通过阿里云数据管理服务DMS快速连接云数据库RDS MySQL,然后进行数据表的CRUD操作。
全面了解阿里云能为你做什么
阿里云在全球各地部署高效节能的绿色数据中心,利用清洁计算为万物互联的新世界提供源源不断的能源动力,目前开服的区域包括中国(华北、华东、华南、香港)、新加坡、美国(美东、美西)、欧洲、中东、澳大利亚、日本。目前阿里云的产品涵盖弹性计算、数据库、存储与CDN、分析与搜索、云通信、网络、管理与监控、应用服务、互联网中间件、移动服务、视频服务等。通过本课程,来了解阿里云能够为你的业务带来哪些帮助 &nbsp; &nbsp; 相关的阿里云产品:云服务器ECS 云服务器 ECS(Elastic Compute Service)是一种弹性可伸缩的计算服务,助您降低 IT 成本,提升运维效率,使您更专注于核心业务创新。产品详情: https://www.aliyun.com/product/ecs
目录
相关文章
|
4月前
|
安全 应用服务中间件 API
微服务分布式系统架构之zookeeper与dubbo-2
微服务分布式系统架构之zookeeper与dubbo-2
|
4月前
|
负载均衡 Java 应用服务中间件
微服务分布式系统架构之zookeeper与dubbor-1
微服务分布式系统架构之zookeeper与dubbor-1
|
5月前
|
存储 SQL 分布式数据库
OceanBase 入门:分布式数据库的基础概念
【8月更文第31天】在当今的大数据时代,随着业务规模的不断扩大,传统的单机数据库已经难以满足高并发、大数据量的应用需求。分布式数据库应运而生,成为解决这一问题的有效方案之一。本文将介绍一款由阿里巴巴集团自主研发的分布式数据库——OceanBase,并通过一些基础概念和实际代码示例来帮助读者理解其工作原理。
494 0
|
11天前
|
Java 关系型数据库 数据库
微服务SpringCloud分布式事务之Seata
SpringCloud+SpringCloudAlibaba的Seata实现分布式事务,步骤超详细,附带视频教程
32 1
|
1月前
|
存储 运维 数据可视化
如何为微服务实现分布式日志记录
如何为微服务实现分布式日志记录
81 1
|
3月前
|
消息中间件 关系型数据库 Java
‘分布式事务‘ 圣经:从入门到精通,架构师尼恩最新、最全详解 (50+图文4万字全面总结 )
本文 是 基于尼恩之前写的一篇 分布式事务的文章 升级而来 , 尼恩之前写的 分布式事务的文章, 在全网阅读量 100万次以上 , 被很多培训机构 作为 顶级教程。 此文修改了 老版本的 一个大bug , 大家不要再看老版本啦。
|
3月前
|
消息中间件 存储 负载均衡
微服务与分布式系统设计看这篇就够了!
【10月更文挑战第12天】 在现代软件架构中,微服务和分布式系统设计已经成为构建可扩展、灵活和可靠应用程序的主流方法。本文将深入探讨微服务架构的核心概念、设计原则和挑战,并提供一些关于如何在分布式系统中实现微服务的实用指导。
116 2
|
3月前
|
人工智能 文字识别 Java
SpringCloud+Python 混合微服务,如何打造AI分布式业务应用的技术底层?
尼恩,一位拥有20年架构经验的老架构师,通过其深厚的架构功力,成功指导了一位9年经验的网易工程师转型为大模型架构师,薪资逆涨50%,年薪近80W。尼恩的指导不仅帮助这位工程师在一年内成为大模型架构师,还让他管理起了10人团队,产品成功应用于多家大中型企业。尼恩因此决定编写《LLM大模型学习圣经》系列,帮助更多人掌握大模型架构,实现职业跃迁。该系列包括《从0到1吃透Transformer技术底座》、《从0到1精通RAG架构》等,旨在系统化、体系化地讲解大模型技术,助力读者实现“offer直提”。此外,尼恩还分享了多个技术圣经,如《NIO圣经》、《Docker圣经》等,帮助读者深入理解核心技术。
SpringCloud+Python 混合微服务,如何打造AI分布式业务应用的技术底层?
|
4月前
|
Dubbo Java 应用服务中间件
分布式-dubbo的入门
分布式-dubbo的入门
|
4月前
|
机器学习/深度学习 算法 自动驾驶
深度学习之分布式智能体学习
基于深度学习的分布式智能体学习是一种针对多智能体系统的机器学习方法,旨在通过多个智能体协作、分布式决策和学习来解决复杂任务。这种方法特别适用于具有大规模数据、分散计算资源、或需要智能体彼此交互的应用场景。
226 4