著作权归作者所有,任何形式的转载都请联系作者获得授权并注明出处。
分布式事务介绍
在分布式系统中实现的事务就是分布式事务,分布式系统的CAP原则是:
- 一致性
- 可用性
- 分区容错性
是分布式事务主要是保证数据的一致性,主要有三种不同的原则
- 强一致性
- 弱一致性
- 最终一致性
JTA与XA
共同点:
- Transaction Manager(事务管理器)
- XA Resource
- 两阶段提交
Orderservice监听新订单队列中的消息,获取之后新增订单,成功则往新订单缴费队列中写消息,中间新增订单的过程使用JTA事务管理,当新增失败则事务回滚,不会往新订单缴费队列中写消息;
再比如User service 扣费成功后,往新订单转移票队列写消息,这时Ticket service 正在处理中或者处理中发生了失败,这中间的过程中用户查看自己的余额已经扣费成功,但票的信息却没有,此时可以使用事务失败回滚的方式依次回退,这种叫弱一致性;又或者可以把处理失败的内容发送至一个错误队列中,由人工处理等方式解决,这种叫最终一致性。
Spring JTA分布式事务实现
- 可以使用如JBoss之类的应用服务器提供的JTA事务管理器
- 可以使用Atomikos、Bitronix等库提供的JTA事务管理器
不使用Spring JTA的分布式事务实现
为什么不使用JTA?
因为JTA采用两阶段提交方式,第一次是预备阶段,第二次才是正式提交。当第一次提交出现错误,则整个事务出现回滚,一个事务的时间可能会较长,因为它要跨越多个数据库多个数据资源的的操作,所以在性能上可能会造成吞吐量低。
不适用JTA,依次提交两事务
1.start message transaction 2.receive message 3.start database transaction 4.update database 5.commit database transaction 6.commit message transaction ##当这一步出现错误时,上面的因为已经commit,所以不会rollback
这时候就会出现问题
多个资源的事务同步方法
XA与最后资源博弈
1.start message transaction 2.receive message 3.start JTA transaction on DB 4.update database 5.phase-1 commit on DB transaction 6.commit message transaction ##当这一步出现错误时,上面的因为是XA的第一次提交预备状态,所以可以rollback 7.phase-2 commit on DB transaction ##当这一步出现错误时,因为message不是XA方式,commit后无法rollback
但这种相比不使用JTA,已经很大程度上避免了事务发生错误的可能性。
共享资源
- 两个数据源共享同一个底层资源
- 比如ActiveMQ使用DB作为底层资源存储
- 使用数据库的database transaction Manager事务管理器来控制事务提交
- 需要数据源支持指定底层资源存储方式
最大努力一次提交
- 依次提交事务
- 可能出错
- 通过AOP或Listener实现事务直接的同步
JMS最大努力一次提交+重试
- 适用于其中一个数据源是MQ,并且事务由读MQ消息开始
- 利用MQ消息的重试机制
- 重试的时候需要考虑重复消息
1.start message transaction 2.receive message 3.start database transaction 4.update database #数据库操作出错,消息被放回MQ队列,重试重新触发该方法 5.commit database transaction 6.commit message transaction
上面这种时候没有问题
1.start message transaction 2.receive message 3.start database transaction 4.update database 5.commit database transaction 6.commit message transaction #提交MQ事务出错,消息放回至MQ队列,重试重新触发该方法
可能存在问题:会重复数据库操作,因为database transaction不是使用JTA事务管理,所以database已经commit成功;如何避免,需要忽略重发消息,比如唯一性校验等手段。
链式事务管理
- 定义一个事务链
- 多个事务在一个事务管理器里依次提交
- 可能出错
如何选择(根据一致性要求)
- 强一致性事务:JTA(性能最差、只适用于单个服务内)
- 弱、最终一致性事务:最大努力一次提交、链式事务(设计相应的错误处理机制)
如何选择(根据场景)
- MQ-DB:最大努力一次提交+重试
- 多个DB:链式事务管理
- 多个数据源:链式事务、或其他事务同步方式
实例
实例1-DB-DB
application.properties中配置了两个数据源
# 默认的Datasource配置 # spring.datasource.url = jdbc:mysql://localhost:3307/user # spring.datasource.username = root # spring.datasource.password = 123456 # spring.datasource.driverClassName = com.mysql.jdbc.Driver spring.ds_user.url = jdbc:mysql://localhost:3307/js_user spring.ds_user.username = root spring.ds_user.password = 123456 spring.ds_user.driver-class-name = com.mysql.jdbc.Driver spring.ds_order.url = jdbc:mysql://localhost:3307/js_order spring.ds_order.username = root spring.ds_order.password = 123456 spring.ds_order.driver-class-name = com.mysql.jdbc.Driver
自定义配置类文件
@Configuration public class DBConfiguration{ @Bean @Primary @ConfigurationProperties(prefix="spring.ds_user") #设置读取在properties文件内容的前缀 public DataSourceProperties userDataSourceProperties() { return new DataSourceProperties(); } @Bean @Primary public DataSource userDataSource(){ return userDataSourceProperties().initializeDataSourceBuilder().type(HikariDataSource.class).build(); } @Bean public JdbcTemplate userJdbcTemplate(@Qualifier("userDataSource") DataSource userDataSource){ return new JdbcTemplate(userDataSource); } @Bean @ConfigurationProperties(prefix="spring.ds_order") #设置读取在properties文件内容的前缀 public DataSourceProperties orderDataSourceProperties() { return new DataSourceProperties(); } @Bean public DataSource orderDataSource(){ return userDataSourceProperties().initializeDataSourceBuilder().type(HikariDAtaSource.class).build(); } @Bean public JdbcTemplate orderJdbcTemplate(@Qualifier("orderDataSource") DataSource orderDataSource){ return new JdbcTemplate(orderDataSource); } }
Spring注解解释(@Primary、@Qualifier)
实际调用类
public class CustomerService{ @Autowired @Qualifier("userJdbcTemplate") private jdbcTemplate userJdbcTemplate; @Autowired @Qualifier("orderJdbcTemplate") private jdbcTemplate orderJdbcTemplate; private static final String UPDATE_CUSTOMER_SQL; private static final String INSERT_ORDER_SQL; @Transactional #事务管理注解 public void createOrder(Order order){ userJdbcTemplate.update(UPDATE_CUSTOMER_SQL, order) if(order.getTitle().contains("error1")){ #模拟异常出现 throw new RuntimeException("error1") } orderJdbcTemplate.update(INSERT_ORDER_SQL, order) #没有使用事务,直接提交 if(order.getTitle().contains("error2")){ #模拟异常出现 throw new RuntimeException("error2") } } }
关于上述过程的详细说明:
因为使用了标签 @Transactional的方式,使其在一个事务里面执行
com . imooc . examp le . springdtx . service . customerservice Without server ' s identity verification is not recommended . According to MySQL 5.5.45+,5.6.26+ and 5.7.6t requirements SsL co ixxer .h1kar1.H1kar1DataSource 11kar1Poot-1 roxyConnectione1762075984aofno.JDBC4conn . j . o .Dataso0rcelrahsactqnranageh Acquired Connection IHikariPr anSaCtlOhiMMahage Switching JDBC Connection ( HikariProxyConnection @1762075984 wrapping com . mysql . jdbc . JDa Executing prepared SQL statement [ uPDATE customer SET deposit = deposit -? where id = dDC .C0re. Jdb Lupdate 执行 sql dbc . core . JdbcTemplate s . jdbc . core . JdbcTemptae s . jdbc . datasource . DataSourceutiis com . Zaxxer .h1Kari. HikariDataSource : coting brepared SoL statement [ INsERT INro customer _ order ( customer -1d,t1te, Hikar1Po0l-2- Starting ...
: Executing prepared SQL statement IuPDATE customer SET deposit = deposit -? where id - n SQL update affected 1 Executing prepared SQL update : Executing prepared SQL statement IINSERT INT0 customer _ order ( customer _ id , title , amount ) VALUEs (?,?,?) Fetching JDBC Connection from DataSource : HikariPool -2- Starting ... asourceutils iDataSOUrCe dentity verification is not recommended . According to MySQL 5.5,45+,5,6.26+ and 5.7.6+ requirements SSL connection must be 0ataSo0C ourceutils aSourceUtils actionManager actionManager HikariPool -2 Registering transaction synchronization for JoBc connection SQL update affected 1 rows Returning JDBc connection to DataSource Initiating ti " ansaction commit : committing JDBC transaction on Connection IHikariProxyConnection @1762075984 wrapping cone beac Start comDleted
也就是同步到Transaction Manager上面,但是这边的同步不是说事务的同步,只是同步数据库连接的开关
Registering transaction synchronization for JoBc connection 5QL update affected 1 row Returning JDBc Connection to DataSource Tnmiating Tpanaransaction on connectionsaH1k4iP06C6nheCIahe1762075984 wrapping com . mysql . jdbc .JDBc4Conr C Connection [ HikariProxyConnection @1762075984 wrapping com . mysql . jdbc .JDBC4Connection@75357b82]: unina jpBc Connection to DataS0 cation is not recommended , According to MySQL 5,5:4343.6.26+ and 5.7.6t requirements ssL connection must be estat
特别说明: @Transactional 如果没有做任何配置的情况下,则会使用DBConfiguration类中@Primart注解下的DataSource,用它去做datasource connection
spring DataSourceUtils 使用已有的connection,只是控制数据库连接的释放,不是事务。
实例2-DB-DB.链式事务管理器
链式事务管理器在 这个库里面
< dependency > < groupId > org . springframework . data </ groupId >< artifactId > spring - data - commons </ artifactId >< version >1.13.7.RELEASE</ version > </ dependency >
DBConfiguration类中添加一段
@Bean public PlatformTransactionManager transactionManager(){ DataSourceTransactionManager userTM = new DataSourceTransactionManager(userDataSource()) #看似方法调用,实则从spring容器中获取 DataSourceTransactionManager orderTM = new DataSourceTransactionManager(orderDataSource()) # orderTM.setDataSource(orderDataSource()) 如果使用这种方式则不是从容器中去获取了,因为orderTM不是spring容器管理 ChainedTransactionManager tm = new ChainedTransactionManager(userTM, orderTM) ## order先执行,user后执行 return tm; }
链接事务管理器(Chaining transaction managers)
出现异常是否会有问题呢?
- 使用debug方式模拟运行,第一个order事务提交以后,第二user个事务执行的时候把mysql服务给停掉,出现如下异常
com . mysql . jdbc . exceptions .jdbc4.MySQLNonTransientConnectionExceptior: communications link failure during commit (). : com . mysql . jdbc . Utit . handlteNewInstance ( Util , iava :404): com . mysql . jdbc . Util . getInstance ( Util , iava :387)
- 重启启动msyql服务,程序继续运行,此时来看数据库order表中多了一条记录,而user表没有变化;第一个order事务并没有回滚;那如果是rollback的时候停掉mysql服务,其实是没有影响的,因为本身就没有commit, 执不执行rollback本身是没有影响的。
git代码地址 ☚
实例3-JPA-DB.链式事务管理器
- mysql + mysql
- 链式事务:JpaTransactionManager + DataSourceTransactionMananger
- 不处理重试
基于实例1的核心代码继续做修改演示:
git代码地址 ☚
实例4-JMS-DB.最大努力一次提交
- JMS-DB
- ActiveMQ + Mysql
- 最大努力一次提交:TransactionAwareConnectionFactoryProxy
分布式系统唯一性
什么是分布式系统ID?
- 分布式系统的全局唯一标识
- UUID:生成唯一ID的规范
- 用于唯一标识,处理重复消息
分布式系统唯一性ID生成策略:
- 数据库自增序列
- UUID:唯一ID标准,128位,几种生成方式(时间+版本等方式)
- MongDB的ObjectID:时间戳+机器ID+进程ID+序号
- Redis的INCR操作、Zookeeper节点的版本号
使用何种方式?
- 自增的ID:需要考虑安全性、部署
- 时间有序:便于通过ID判断创建时间
- 长度、是否数字类型:是否建立索引
分布式系统分布式对象
- Redis:Redisson库:RLock,RMap,RQueue等对象
- Zookeeper:Netflix Curator库:Lock,Queue等对象
分布式事务实现模式
- 消息驱动模式:Message Driven
- 事件溯源模式:Event Sourcing
- TCC模式:Try-Confirm-Cancel
幂等性
- 幂等操作:任意多次执行所产生的影响,与一次执行的影响相同
- 方法的幂等性:使用同样的参数调用一次方法多次,与调用一次结果相同
- 接口的幂等性:接口被重复调用,结果一致
微服务接口的幂等性
- 重要性:经常需要通过重试实现分布式事务的最终一致性
- GET方法不会对系统产生副作用,具有幂等性
- POST、PUT、DELETE方法的实现需要满足幂等性
Service方法实现幂等性
public OrderService{ Map disMap; # 用于存放已经处理的id @Transactional void ticketOrder(BuyTickerDTO dto){ String uid = createUUID(dto); # 创建并获取数据的唯一id if(!diMap.contains(uuid){ #disMap还没有处理过这个数据唯一id,则进入创建 Order order = createOrder(dto); disMap.append(uid) ## 追加Map } } userService.charge(dto); #调用user微服务 }
SQL实现幂等性
#通过调节限定,只有第一次支付的时候才会扣余额,被重复调用的时候就不会重复扣费用,通过paystatus判断 UPDATE customer SET deposit = deposit - ${value}, paystatus = 'PAID' WHERE orderId = ${id} and paystatus = 'UNPAID'