19、分布式事务

简介: 分布式事务

订单支付流程:

提交订单及支付流程:


1、用户提交订单需要先登录系统

2、提交订单将订单信息保存到订单数据库

订单提交成功,向订单主表保存一条记录,向订单明细表保存一条或多条记录,向订单支付表插入一条标记订单状态的记录

3、订单支付,调用微信支付接口完成支付保存支付信息

4、完成支付,微信支付系统回调通知学成在线支付结果

5、学成在线接收到支付结果通知,更新支付结果


支付完成,收到微信支付系统的支付完成通知或请求微信查询支付已完成,更新学成在线订单支付表中的支付状态字段


自动选课的需求分析:

用户支付完成会将支付状态及订单状态保存在订单数据库中,由订单服务去维护订单数据库,而学生选课信息在学习中心数据库,由学习服务去维护学习中心数据库的信息

如何实现两个分布式服务(订单服务、学习服务)共同完成一件事即订单支付成功自动添加学生选课的需求,这里的关键是如何保证两个分布式服务的事务的一致性

为解决上边的需求,在订单服务中远程调用选课接口,具体实现:


订单支付结果通知方法{

更新支付表中支付状态为“成功”

远程调用选课接口添加选课记录


为保存事务上边两步操作由spring控制事务,当遇到Exception异常则回滚本地数据库操作

但涉及到如下问题:


1、如果更新支付表失败则抛出异常,不再执行远程调用,此设想没有问题

2、如果更新支付表成功,网络远程调用超时会拉长本地数据库事务时间,影响数据库性能

3、如果更新支付表成功,远程调用添加选课成功(选课数据库commit成功),最后更新支付表commit失败,此 时出现操作不一致

上边的问题涉及到分布式事务控制


分布式事务

1、什么是分布式系统?

部署在不同结点上的系统通过网络交互来完成协同工作的系统

比如:充值加积分的业务,用户在充值系统向自己的账户充钱,在积分系统中自己积分相应的增加。充值系统和积分系统是两个不同的系统,一次充值加积分的业务就需要这两个系统协同工作来完成


2、什么是事务?

事务: 在某些业务需求中,一系列操作必须全部执行,而不能仅执行一部分

比如在执行多条SQL时,需要两个SQL同时执行成功才能更新表数据,事务就是为了解决这个问题


事务的ACID特性

1、原子性(Atomicity):将多条SQL看做原子进行执行,要么全部执行成功,如果其中有失败的SQL则全部回滚到事务开始前的状态;比如A、B转账,不可能A钱少了,B钱没有增加。

2、一致性(Consistency):事务完成后,所有数据的状态都是一致的,即A账户只要减去了100,B账户则必定加上了100;A和B转账前后的数据总和要保持一致性。

3、隔离性(Isolation):在该事务执行过程中,每个事务作出的修改是与其他事务完全隔离的,对外界没有影响,比如第一条SQL执行的结果在第二条SQL中查询不到,只有事务提交后其它事务才可以查询到最新的数据;两个事务之间是互不影响的,只有提交后才会查到新数据。

4、持久性(Durability):事务完成后,对数据的改变会永久性的存储起来,即使发生断电宕机数据依然在。A、B转账完成后会把数据持久化存储到数据库。


3、什么是本地事务?

本地事务就是用关系数据库来控制事务,传统的单体应用通常会将数据全部存储在一个数据库中,会借助关系数据库来完成事务控制。


4、什么是分布式事务?

单库多系统:在分布式系统中一次操作由多个系统协同完成,这种一次事务操作涉及多个系统通过网络协同完成的过程称为分布式事务。这里强调的是多个系统通过网络协同完成一个事务的过程,并不强调多个系统访问了不同的数据库,即使多个系统访问的是同一个数据库也是分布式事务,如下图:

多库多系统:另外一种分布式事务的表现是,一个应用程序使用了多个数据源连接了不同的数据库,当一次事务需要操作多个数据源,此时也属于分布式事务,当系统作了数据库拆分后会出现此种情况。

上面两种分布式事务表现形式以第一种居多。


5、分布式事务有哪些场景?

1 ) 电商系统中的下单扣库存,在电商系统中,订单系统和库存系统是两个系统,一次下单的操作由两个系统协同完成

2)金融系统中的银行卡充值,在金融系统中通过银行卡向平台充值需要通过银行系统和金融系统协同完成

3)教育系统中下单选课业务,在线教育系统中,用户购买课程,下单支付成功后学生选课成功,此事务由订单系统和选课系统协同完成

4)SNS系统的消息发送,在社交系统中发送站内消息同时发送手机短信,一次消息发送由站内消息系统和手机通信系统协同完成。


CAP理论

分布式系统在设计时只能在一致性(Consistency)、可用性(Availability)、分区容忍性(Partition Tolerance)中满足两种,无法兼顾三种。

通过下图理解CAP理论:

一致性(Consistency):服务A、B、C三个结点都存储了用户数据, 三个结点的数据需要保持同一时刻数据的一致性

可用性(Availability):服务A、B、C三个结点,其中一个结点宕机不影响整个集群对外提供服务,如果只有服务A结点,当服务A宕机整个系统将无法提供服务,增加服务B、C是为了保证系统的可用性

分区容忍性(Partition Tolerance):分区容忍性就是允许系统通过网络协同工作,分区容忍性要解决由于网络分区导致数据的不完整及无法访问等问题

分布式系统不可避免的出现了多个系统通过网络协同工作的场景,结点之间难免会出现网络中断、网延延迟等现象,这种现象一旦出现就会导致数据被分散在不同的结点上,这就是网络分区


两阶段提交协议(2PC)

为解决分布式系统数据一致性问题出现了两阶段提交协议(2 Phase Commitment Protocol),两阶段提交由协调者和参与者组成,共经过两个阶段和三个操作,部分关系数据库如Oracle、MySQL支持两阶段提交协议,本节 讲解关系数据库两阶段提交协议。

参考:

2PC:https://en.wikipedia.org/wiki/Two-phase_commit_protocol


消息队列实现最终一致

本方案是将分布式事务拆分成多个本地事务来完成,并且由消息队列异步协调完成,如下图: 下边以下单减少库存为例来说明:

1、订单服务和库存服务完成检查和预留资源

2、订单服务在本地事务中完成添加订单表记录和添加“减少库存任务消息”

3、由定时任务根据消息表的记录发送给MQ通知库存服务执行减库存操作

4、库存服务执行减少库存,并且记录执行消息状态(为避免重复执行消息,在执行减库存之前查询是否执行过此 消息)

5、库存服务向MQ发送完成减少库存的消息

6、订单服务接收到完成库存减少的消息后删除原来添加的“减少库存任务消息”

实现最终事务一致要求:预留资源成功理论上要求正式执行成功,如果执行失败会进行重试,要求业务执行方法实 现幂等


Spring Task定时任务

根据分布式事务的研究结果,订单服务需要定时扫描任务表向MQ发送任务。本节研究定时任务处理的方案,并实现定时任务扫描任务表并向MQ发送消息

实现定时任务的方案如下:


1、使用jdk的Timer和TimerTask实现,可以实现简单的间隔执行任务,无法实现按日历去调度执行任务

2、使用Quartz实现,Quartz 是一个异步任务调度框架,功能丰富,可以实现按日历调度

3、使用Spring Task实现,Spring 3.0后提供Spring

Task实现任务调度,支持按日历调度,相比Quartz功能稍简单,但是在开发基本够用,支持注解编程方式


本项目使用Spring Task实现任务调度,具体使用方法:

SpringBoot2.0实现定时任务

1、在启动类上添加注解:@EnableScheduling

2、启动类中添加测试方法task1:

private static final Logger LOGGER = LoggerFactory.getLogger(ChooseCourseTask.class);
    // @Scheduled(fixedRate = 5000) //调用该方法之后5秒再次执行该方法
    // @Scheduled(fixedDelay = 5000) //该方法执行结束等5秒后再次执行该方法
    // @Scheduled(initialDelay=3000, fixedRate=5000) //第一次延迟3秒,以后每隔5秒执行一次
    @Scheduled(cron = "0/3 * * * * *")//每隔3秒执行一次,与fixedRate相同
    public void task1() {
        LOGGER.info("===============测试定时任务1开始===============");
        try {
            Thread.sleep(5000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        LOGGER.info("===============测试定时任务1结束===============");
    }

3、测试定时任务是串行还是并行,添加测试方法task2:

@Scheduled(fixedRate = 3000) //上次执行开始时间后5秒执行
    public void task2() {
        LOGGER.info("===============测试定时任务2开始===============");
        try {
            Thread.sleep(3000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        LOGGER.info("===============测试定时任务2结束===============");
    }

测试结果显示任务1执行完执行任务2,说明定时任务默认是串行执行的


Spring Task并行任务

当项目通常需要多个不同任务并行去执行的时候需要用到

创建异步任务配置类,需要配置线程池实现多线程调度任务

@Configuration
@EnableScheduling
public class AsyncTaskConfig implements SchedulingConfigurer, AsyncConfigurer {
    //线程池线程数量
    private int corePoolSize = 5;
    @Bean
    public ThreadPoolTaskScheduler taskScheduler() {
        ThreadPoolTaskScheduler scheduler = new ThreadPoolTaskScheduler();
        scheduler.initialize();//初始化线程池
        scheduler.setPoolSize(corePoolSize);//线程池容量
        return scheduler;
    }
    @Override
    public Executor getAsyncExecutor() {
        Executor executor = taskScheduler();
        return executor;
    }
    @Override
    public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
        return null;
    }
    @Override
    public void configureTasks(ScheduledTaskRegistrar scheduledTaskRegistrar) {
        scheduledTaskRegistrar.setTaskScheduler(taskScheduler());
    }
}

将@EnableScheduling添加到此配置类上,SpringBoot启动类上不用再添加@EnableScheduling。


订单服务定时发送消息

定时任务发送消息流程如下:

1、每隔1分钟扫描一次任务表。

2、定时任务扫描task表,一次取出多个任务,取出超过1分钟未处理的任务 3、考虑订单服务可能集群部署,为避免重复发送任务使用乐观锁的方式每次从任务列表取出要处理的任务

4、任务发送完毕后更新任务发送时间

关于任务表的添加:

正常的流程是订单支付成功后更新订单支付状态并向任务表写入“添加选课任务”。

目前订单支付功能没有开发,采用手动向任务表添加任务。


RabbitMQ配置

向RabbitMQ声明两个队列:添加选课、完成选课,交换机使用路由模式,配置RabbitMQConfig配置类。

Dao

在XcTaskRepository中自定义方法findByUpdateTimeBefore

//取出指定时间之前的记录 
Page<XcTask> findByUpdateTimeBefore(Pageable pageable,Date updateTime);

Service

//查询前n条任务
public List<XcTask> findXcTaskList(Date updateTime,int size){
    //设置分页参数
    Pageable pageable = new PageRequest(0,size);
    //查询前n条任务
    Page<XcTask> all = xcTaskRepository.findByUpdateTimeBefore(pageable, updateTime);
    List<XcTask> list = all.getContent();
    return list;
}

编写任务类,每分钟执行任务,启动订单工程,观察定时发送消息日志,观察rabbitMQ队列中是否有消息,在ChooseCourseTask类中配置sendChoosecourseTask方法:

@Scheduled(cron="0/3 * * * * *")
//定时发送加选课任务
public void sendChoosecourseTask(){
    //得到1分钟之前的时间
    Calendar calendar = new GregorianCalendar();
    calendar.setTime(new Date());
    calendar.set(GregorianCalendar.MINUTE,-1);
    Date time = calendar.getTime();
    List<XcTask> xcTaskList = taskService.findXcTaskList(time, 100);
    System.out.println(xcTaskList);
    //调用service发布消息,将添加选课的任务发送给mq
    for(XcTask xcTask:xcTaskList){
        //取任务
        if(taskService.getTask(xcTask.getId(),xcTask.getVersion())>0){
            String ex = xcTask.getMqExchange();//要发送的交换机
            String routingKey = xcTask.getMqRoutingkey();//发送消息要带routingKey
            taskService.publish(xcTask,ex,routingKey);
        }
    }
}

启动工程,测试读取任务列表的功能。


定时发送任务

Dao

添加更新任务方法:

//更新updateTime
    @Modifying
    @Query("update XcTask t set t.updateTime = :updateTime where t.id = :id")
    public int updateTaskTime(@Param(value = "id") String id,@Param(value = "updateTime") Date updateTime);

Service

添加发送消息方法:

//发布消息
public void publish(XcTask xcTask,String ex,String routingKey){
    Optional<XcTask> optional = xcTaskRepository.findById(xcTask.getId());
    if(optional.isPresent()){
        rabbitTemplate.convertAndSend(ex,routingKey,xcTask);
        //更新任务时间
        XcTask one = optional.get();
        one.setUpdateTime(new Date());
        xcTaskRepository.save(one);
}

编写任务类,每分钟执行任务,启动订单工程,观察定时发送消息日志,观察rabbitMQ队列中是否有消息,ChooseCourseTask类中添加sendChoosecourseTask方法:

@Scheduled(cron="0/3 * * * * *")
//定时发送加选课任务
public void sendChoosecourseTask(){
    //得到1分钟之前的时间
    Calendar calendar = new GregorianCalendar();
    calendar.setTime(new Date());
    calendar.set(GregorianCalendar.MINUTE,-1);
    Date time = calendar.getTime();
    List<XcTask> xcTaskList = taskService.findXcTaskList(time, 100);
    System.out.println(xcTaskList);
    //调用service发布消息,将添加选课的任务发送给mq
    for(XcTask xcTask:xcTaskList){
        //取任务
        if(taskService.getTask(xcTask.getId(),xcTask.getVersion())>0){
            String ex = xcTask.getMqExchange();//要发送的交换机
            String routingKey = xcTask.getMqRoutingkey();//发送消息要带routingKey
            taskService.publish(xcTask,ex,routingKey);
        }
    }
}


乐观锁取任务

考虑订单服务将来会集群部署,为了避免任务在1分钟内重复执行,这里使用乐观锁,实现思路如下:


每次取任务时判断当前版本及任务id是否匹配,如果匹配则执行任务,如果不匹配则取消执行。

如果当前版本和任务Id可以匹配到任务则更新当前版本加1.

1、在Dao中增加校验当前版本及任务id的匹配方法

@Modifying
@Query("update XcTask t set t.version = :version+1 where t.id = :id and t.version = :version")
public int updateTaskVersion(@Param(value = "id") String id,@Param(value = "version") int version);


2、在service中增加方法,使用乐观锁方法校验任务

//获取任务
@Transactional
public int getTask(String id,int version){
    //通过乐观锁的方式来更新数据表,如果结果大于0说明取到任务
    int count = xcTaskRepository.updateTaskVersion(id, version);
    return count;
}

3、执行任务类中修改

//取任务
if(taskService.getTask(xcTask.getId(),xcTask.getVersion())>0){
    String ex = xcTask.getMqExchange();//要发送的交换机
    String routingKey = xcTask.getMqRoutingkey();//发送消息要带routingKey
    taskService.publish(xcTask,ex,routingKey);
}


自动添加选课开发

1、需求分析

学习服务接收MQ发送添加选课消息,执行添加选课操作。

添加选课成功向学生选课表插入记录、向历史任务表插入记录、并向MQ发送“完成选课”消息。

2、 RabbitMQ配置

学习服务监听MQ的添加选课队列,并且声明完成选课队列,配置代码同订单服务中RabbitMQ配置

3、Dao

学生选课Dao:

在XcLearningCourseRepository类中配置findXcLearningCourseByUserIdAndCourseId方法

历史任务Dao:

配置XcTaskHisRepository接口

4、Service

1、添加选课方法

向xc_learning_course添加记录,为保证不重复添加选课,先查询历史任务表,如果从历史任务表查询不到任务说 明此任务还没有处理,此时则添加选课并添加历史任务。

在CourseService类中添加addCoursePic方法

2、接收添加选课消息

接收到添加选课的消息调用添加选课方法完成添加选课,并发送完成选课消息。

在com.xuecheng.learning.mq包下添加ChooseCourseTask类,在该类中添加receiveChoosecourseTask方法


订单服务结束任务

1、需求分析

订单服务接收MQ完成选课的消息,将任务从当前任务表删除,将完成的任务添加到完成任务表。

2、Dao

1、删除xc_task

2、添加xc_task_his

3、Service

在TaskService中定义删除任务方法finishTask

4、接收完成选课消息

在com.xuecheng.manage_order.mq包下ChooseCourseTask类中添加receiveChoosecourseTask方法,接收完成选 课任务消息并进行处理。

5、集成测试

测试流程如下:

1、手动向任务表添加一条任务。

2、启动rabbitMQ.

3、启动订单服务、选课服务。

4、观察日志是否添加选课成功

完成任务后将xc_task任务移动到xc_task_his表中

完成任务后在选课表中多了一条学生选课记录


测试消息重复消费:

1、手动向任务表添加一条任务。

2、启动rabbitMQ.

3、先启动订单表,等待消息队列是否积累了多个消息。

4、再启动选课服务,观察是否重复添加选课


相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
目录
相关文章
|
2月前
|
数据库
分布式事务(一)
分布式事务(一)
|
2月前
|
消息中间件 数据库
分布式事务(二)
分布式事务(二)
|
2月前
|
数据库 微服务
分布式事务系列(三)
分布式事务系列(三)
|
2月前
|
消息中间件 关系型数据库 调度
分布式事务系列(二)
分布式事务系列(二)
|
3月前
|
消息中间件 数据库 RocketMQ
关于分布式事务的理解
关于分布式事务的理解
38 0
|
SQL 存储 监控
浅谈分布式事务
浅谈分布式事务
49 0
|
存储 算法 网络协议
一文了解分布式事务
一文了解分布式事务
|
消息中间件 存储 Oracle
浅析分布式事务
分布式事务的概念讲解以及常用解决方案
169 0
浅析分布式事务
|
消息中间件 SQL 存储
分布式事务是什么?
分布式事务是什么?
134 0
分布式事务是什么?