@[TOC]
一、前言
更多内容见Seata专栏:https://blog.csdn.net/saintmm/category_11953405.html
至此,seata系列的内容包括:
- can not get cluster name in registry config ‘service.vgroupMapping.xx‘, please make sure registry问题解决;
- Seata Failed to get available servers: endpoint format should like ip:port 报错原因/解决方案汇总版(看完本文必解决问题)
- Seata json decode exception, Cannot construct instance of java.time.LocalDateTime报错原因/解决方案最全汇总版
- 【微服务 31】超细的Spring Cloud 整合Seata实现分布式事务(排坑版)
- 【微服务 32】Spring Cloud整合Seata、Nacos实现分布式事务案例(巨细排坑版)【云原生】
- 【微服务33】分布式事务Seata源码解析一:在IDEA中启动Seata Server
- 【微服务34】分布式事务Seata源码解析二:Seata Server启动时都做了什么
- 【微服务35】分布式事务Seata源码解析三:从Spring Boot特性来看Seata Client 启动时都做了什么
- 【微服务36】分布式事务Seata源码解析四:图解Seata Client 如何与Seata Server建立连接、通信
- 【微服务37】分布式事务Seata源码解析五:@GlobalTransactional如何开启全局事务
- 【微服务38】分布式事务Seata源码解析六:全局/分支事务分布式ID如何生成?序列号超了怎么办?时钟回拨问题如何处理?
- 【微服务39】分布式事务Seata源码解析七:图解Seata事务执行流程之开启全局事务
- 分布式事务Seata源码解析八:本地事务执行流程(AT模式下)
Seata最核心的全局事务执行流程,上文我们聊了本地事务是如何执行的?在本地事务执行的过程中涉及到分支事务如何注册到全局事务、undo log的构建,本文我们接着聊分支事务如何注册到全局事务。
二、RM中分支事务注册入口
在上一文(分布式事务Seata源码解析八:本地事务执行流程(AT模式下))中,提到ConnectionProxy#processGlobalTransactionCommit()
最终处理本地事务的提交。
其中register()
方法向远程的TC中注册分支事务:
private void register() throws TransactionException {
if (!context.hasUndoLog() || !context.hasLockKey()) {
return;
}
// 分支事务注册:将事务类型AT、资源ID(资源在前面的流程已经注册过了)、事务xid、全局锁keys作为分支事务信息注册到seata server
Long branchId = DefaultResourceManager.get().branchRegister(BranchType.AT, getDataSourceProxy().getResourceId(),
null, context.getXid(), context.getApplicationData(), context.buildLockKeys());
context.setBranchId(branchId);
}
注册分支事务、获取分支事务ID的入口流程如下:
- DefaultResourceManager.get() 获取单例形式的资源管理器
DefaultResourceManager
,通过其注册分支事务; - 再根据分支类型(AT、TCC、XA、SAGA)获取相应类型的ResourceManager;
因为存在四种分布式事务的模式(AT、TCC、XA、SAGA),所以此处也正好对应四种ResourceManager:
这四种ResourceManager都继承了AbstractResourceManager
,并且都没有重写AbstractResourceManager
的branchRegister()
方法,所以无论是哪种全局事务模式,分支事务注册到全局事务的方式都一样,都体现在AbstractResourceManager
的branchRegister()
方法中;而分支事务的提交和回滚方式却各不相同。
@Override
public Long branchRegister(BranchType branchType, String resourceId, String clientId, String xid, String applicationData, String lockKeys) throws TransactionException {
try {
BranchRegisterRequest request = new BranchRegisterRequest();
// xid是全局事务ID
request.setXid(xid);
// 分布式事务要更新数据的全局锁keys
request.setLockKey(lockKeys);
// 分支事务对应的资源ID
request.setResourceId(resourceId);
// 分支事务类型
request.setBranchType(branchType);
// 引用的数据
request.setApplicationData(applicationData);
// 将请求通过RmNettyRemotingClient发送到seata-server
BranchRegisterResponse response = (BranchRegisterResponse) RmNettyRemotingClient.getInstance().sendSyncRequest(request);
if (response.getResultCode() == ResultCode.Failed) {
throw new RmTransactionException(response.getTransactionExceptionCode(), String.format("Response[ %s ]", response.getMsg()));
}
return response.getBranchId();
} catch (TimeoutException toe) {
throw new RmTransactionException(TransactionExceptionCode.IO, "RPC Timeout", toe);
} catch (RuntimeException rex) {
throw new RmTransactionException(TransactionExceptionCode.BranchRegisterFailed, "Runtime", rex);
}
}
方法中构建一个BranchRegisterRequest
,通过netty将请求发送到TC进行分支事务的注册;
三、TC中处理分支事务注册
在【微服务36】分布式事务Seata源码解析四:图解Seata Client 如何与Seata Server建立连接、通信一文中,我们聊了Seata Client 如何和Seata Server建立连接、通信;
又在【微服务34】分布式事务Seata源码解析二:Seata Server启动时都做了什么一文中,我们知道了TC(Seata Server)启动之后,AbstractNettyRemotingServer的内部类ServerHandler负责接收并处理请求。
ServerHandler类上有个@ChannelHandler.Sharable
注解,其表示所有的连接都会共用这一个ChannelHandler;所以当消息处理很慢时,会降低并发。
processMessage(ctx, (RpcMessage) msg)
方法中会根据消息类型获取到 请求处理组件(消息的处理过程是典型的策略模式),如果消息对应的处理器设置了线程池,则放到线程池中执行;如果对应的处理器没有设置线程池,则直接执行;如果某条消息处理特别慢,会严重影响并发;所以在seata-server中大部分处理器都有对应的线程池。
/**
* Rpc message processing.
*
* @param ctx Channel handler context.
* @param rpcMessage rpc message.
* @throws Exception throws exception process message error.
* @since 1.3.0
*/
protected void processMessage(ChannelHandlerContext ctx, RpcMessage rpcMessage) throws Exception {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug(String.format("%s msgId:%s, body:%s", this, rpcMessage.getId(), rpcMessage.getBody()));
}
Object body = rpcMessage.getBody();
if (body instanceof MessageTypeAware) {
MessageTypeAware messageTypeAware = (MessageTypeAware) body;
// 根据消息的类型获取到请求处理组件和请求处理线程池组成的Pair
final Pair<RemotingProcessor, ExecutorService> pair = this.processorTable.get((int) messageTypeAware.getTypeCode());
if (pair != null) {
// 如果消息对应的处理器设置了线程池,则放到线程池中执行
if (pair.getSecond() != null) {
try {
pair.getSecond().execute(() -> {
try {
pair.getFirst().process(ctx, rpcMessage);
} catch (Throwable th) {
LOGGER.error(FrameworkErrorCode.NetDispatch.getErrCode(), th.getMessage(), th);
} finally {
MDC.clear();
}
});
} catch (RejectedExecutionException e) {
// 线程池拒绝策略之一,抛出异常:RejectedExecutionException
LOGGER.error(FrameworkErrorCode.ThreadPoolFull.getErrCode(),
"thread pool is full, current max pool size is " + messageExecutor.getActiveCount());
if (allowDumpStack) {
String name = ManagementFactory.getRuntimeMXBean().getName();
String pid = name.split("@")[0];
long idx = System.currentTimeMillis();
try {
String jstackFile = idx + ".log";
LOGGER.info("jstack command will dump to " + jstackFile);
Runtime.getRuntime().exec(String.format("jstack %s > %s", pid, jstackFile));
} catch (IOException exx) {
LOGGER.error(exx.getMessage());
}
allowDumpStack = false;
}
}
} else {
// 对应的处理器没有设置线程池,则直接执行;如果某条消息处理特别慢,会严重影响并发;
try {
pair.getFirst().process(ctx, rpcMessage);
} catch (Throwable th) {
LOGGER.error(FrameworkErrorCode.NetDispatch.getErrCode(), th.getMessage(), th);
}
}
} else {
LOGGER.error("This message type [{}] has no processor.", messageTypeAware.getTypeCode());
}
} else {
LOGGER.error("This rpcMessage body[{}] is not MessageTypeAware type.", body);
}
}
Seata Serer接收到请求的执行链路为:
1、BranchRegisterRequest
又由于RM发送开启事务请求时的RPCMessage的body为BranchRegisterRequest,所以进入到:
又由于在DefaultCoordinator#onRequest()
方法中,将DefaultCoordinator
自身绑定到了AbstractTransactionRequestToTC
的handler
属性中:
所以会进入到:
DefaultCore封装了AT、TCC、Saga、XA分布式事务模式的具体实现类。
2、DefaultCore执行分支事务的注册
DefaultCore#branchRegister()方法中会首先根据分布式事务模式获取到相应的AbstractCore
,这里的处理方式和上面获取分布式事务模式对应的ResourceManager
的处理方式一样;
因为存在四种分布式事务的模式(AT、TCC、XA、SAGA),所以此处也正好对应四种AbstractCore:
这四种Core都继承了AbstractCore
,并且都没有重写AbstractResourceManager
的branchRegister()
方法,所以无论是哪种全局事务模式,分支事务注册到全局事务的方式都一样,都体现在AbstractCore
的branchRegister()
方法中;
@Override
public Long branchRegister(BranchType branchType, String resourceId, String clientId, String xid,
String applicationData, String lockKeys) throws TransactionException {
// 根据xid从DB中找到全局事务会话(会做一个DB查询操作)
GlobalSession globalSession = assertGlobalSessionNotNull(xid, false);
return SessionHolder.lockAndExecute(globalSession, () -> {
// 检查全局事务会话的状态
globalSessionStatusCheck(globalSession);
globalSession.addSessionLifecycleListener(SessionHolder.getRootSessionManager());
// 分支事务会话 根据全局事务开启一个分支事务
BranchSession branchSession = SessionHelper.newBranchByGlobal(globalSession, branchType, resourceId,
applicationData, lockKeys, clientId);
// 将branchID放到ThreadLocal中
MDC.put(RootContext.MDC_KEY_BRANCH_ID, String.valueOf(branchSession.getBranchId()));
// 给分支事务加全局锁
branchSessionLock(globalSession, branchSession);
try {
// 将分支事务会话添加到全局事务会话
globalSession.addBranch(branchSession);
} catch (RuntimeException ex) {
branchSessionUnlock(branchSession);
throw new BranchTransactionException(FailedToAddBranch, String
.format("Failed to store branch xid = %s branchId = %s", globalSession.getXid(),
branchSession.getBranchId()), ex);
}
if (LOGGER.isInfoEnabled()) {
LOGGER.info("Register branch successfully, xid = {}, branchId = {}, resourceId = {} ,lockKeys = {}",
globalSession.getXid(), branchSession.getBranchId(), resourceId, lockKeys);
}
return branchSession.getBranchId();
});
}
方法做的事情如下:
- 根据xid从DB / file / redis (由TC的
store.mode
配置决定)中找到全局事务会话(这里会做一个DB查询操作),并且断言globalSession不许为空;- 校验全局事务会话,全局事务会话必须是存活的,并且状态必须为:GlobalStatus.Begin;
- 构建一个分支事务会话BranchSession;给分支事务加全局锁,出现锁冲突则直接报错,抛出异常
BranchTransactionException
;- 将分支事务会话添加到全局事务会话,持久化分支事务会话;
1)获取全局事务会话GlobalSession
SessionHolder是会话管理者,其中包括四个会话管理器:
// 用于管理所有的Setssion,以及Session的创建、更新、删除等
private static SessionManager ROOT_SESSION_MANAGER;
// 用于管理所有的异步commit的Session,包括创建、更新以及删除
private static SessionManager ASYNC_COMMITTING_SESSION_MANAGER;
// 用于管理所有的重试commit的Session,包括创建、更新以及删除
private static SessionManager RETRY_COMMITTING_SESSION_MANAGER;
// 用于管理所有的重试rollback的Session,包括创建、更新以及删除
private static SessionManager RETRY_ROLLBACKING_SESSION_MANAGER;
用于获取全局事务会话的管理器为:ROOT_SESSION_MANAGER
;在初始化SessionHolder时,会根据store.mode对其进行赋值:
例如博主的TC采用的store.mode是DB,所以找到:DataBaseSessionManager;
DataBaseSessionManager#findGlobalSession()方法如下:
==注意:在AbstractCore#branchRegister()方法中查询全局事务会话时,withBranchSessions = false,所以不会把分支事务查出来。==
LogStore
LogStore是对全局事务、分支事务做DB操作的DAO层;LogStore接口只有一个实现LogStoreDataBaseDAO
,其queryGlobalTransactionDO()
方法内容很简单,就直接使用JDBC查表:
@Override
public GlobalTransactionDO queryGlobalTransactionDO(String xid) {
String sql = LogStoreSqlsFactory.getLogStoreSqls(dbType).getQueryGlobalTransactionSQL(globalTable);
Connection conn = null;
PreparedStatement ps = null;
ResultSet rs = null;
try {
conn = logStoreDataSource.getConnection();
conn.setAutoCommit(true);
ps = conn.prepareStatement(sql);
ps.setString(1, xid);
rs = ps.executeQuery();
if (rs.next()) {
return convertGlobalTransactionDO(rs);
} else {
return null;
}
} catch (SQLException e) {
throw new DataAccessException(e);
} finally {
IOUtil.close(rs, ps, conn);
}
}
本文后面相似的DB操作不再赘述。
2)校验全局事务会话GlobalSession
全局事务会话必须是存活的,并且状态必须为:GlobalStatus.Begin;
protected void globalSessionStatusCheck(GlobalSession globalSession) throws GlobalTransactionException {
if (!globalSession.isActive()) {
throw new GlobalTransactionException(GlobalTransactionNotActive, String.format(
"Could not register branch into global session xid = %s status = %s, cause by globalSession not active",
globalSession.getXid(), globalSession.getStatus()));
}
if (globalSession.getStatus() != GlobalStatus.Begin) {
throw new GlobalTransactionException(GlobalTransactionStatusInvalid, String
.format("Could not register branch into global session xid = %s status = %s while expecting %s",
globalSession.getXid(), globalSession.getStatus(), GlobalStatus.Begin));
}
}
3)分支事务会话BranchSession加全局锁
1> SessionHolder首先构建一个BranchSession
BranchSession的内容包括:全局事务xid、全局事务id、根据雪花算法生成的分支事务id、全局事务模式、RM资源Id、分支事务要加的全局锁keys、RM客户端ID、RM应用名;
public static BranchSession newBranchByGlobal(GlobalSession globalSession, BranchType branchType, String resourceId,
String applicationData, String lockKeys, String clientId) {
BranchSession branchSession = new BranchSession();
branchSession.setXid(globalSession.getXid());
branchSession.setTransactionId(globalSession.getTransactionId());
branchSession.setBranchId(UUIDGenerator.generateUUID());
branchSession.setBranchType(branchType);
branchSession.setResourceId(resourceId);
branchSession.setLockKey(lockKeys);
branchSession.setClientId(clientId);
branchSession.setApplicationData(applicationData);
return branchSession;
}
2> branchID放入ThreadLocal
MDC.put(RootContext.MDC_KEY_BRANCH_ID, String.valueOf(branchSession.getBranchId()));
3> *给分支事务加全局锁
AbstractCore中的branchSessionLock()方法没有具体的实现,并且在Seata中只有AT模式有全局锁的概念,因此只需要看ATCore的branchSessionLock()方法即可;
默认情况下seata在给分支事务加全局锁的同时,会检查全局锁是否冲突;
LockStoreDataBaseDAO#acquireLock()获取全局锁
LockStoreDataBaseDAO#acquireLock()方法如下:
@Override
public boolean acquireLock(List<LockDO> lockDOs, boolean autoCommit, boolean skipCheckLock) {
Connection conn = null;
PreparedStatement ps = null;
ResultSet rs = null;
// 已经存在的行锁key集合
Set<String> dbExistedRowKeys = new HashSet<>();
boolean originalAutoCommit = true;
if (lockDOs.size() > 1) {
lockDOs = lockDOs.stream().filter(LambdaUtils.distinctByKey(LockDO::getRowKey)).collect(Collectors.toList());
}
try {
// 从全局锁数据源里获取到一个连接
conn = lockStoreDataSource.getConnection();
// 把自动提交事务关闭
if (originalAutoCommit = conn.getAutoCommit()) {
conn.setAutoCommit(false);
}
List<LockDO> unrepeatedLockDOs = lockDOs;
//check lock
// 是否跳过锁检查
if (!skipCheckLock) {
boolean canLock = true;
//query,针对全局锁表查询某个数据
String checkLockSQL = LockStoreSqlFactory.getLogStoreSql(dbType).getCheckLockableSql(lockTable, lockDOs.size());
ps = conn.prepareStatement(checkLockSQL);
for (int i = 0; i < lockDOs.size(); i++) {
ps.setString(i + 1, lockDOs.get(i).getRowKey());
}
rs = ps.executeQuery();
// 获取到当前要加全局锁的事务xid
String currentXID = lockDOs.get(0).getXid();
boolean failFast = false;
// 查询结果为空时,说明没有事务加全局锁
while (rs.next()) {
String dbXID = rs.getString(ServerTableColumnsName.LOCK_TABLE_XID);
// 如果加全局锁的是其他的全局事务xid
if (!StringUtils.equals(dbXID, currentXID)) {
if (LOGGER.isInfoEnabled()) {
String dbPk = rs.getString(ServerTableColumnsName.LOCK_TABLE_PK);
String dbTableName = rs.getString(ServerTableColumnsName.LOCK_TABLE_TABLE_NAME);
long dbBranchId = rs.getLong(ServerTableColumnsName.LOCK_TABLE_BRANCH_ID);
LOGGER.info("Global lock on [{}:{}] is holding by xid {} branchId {}", dbTableName, dbPk, dbXID, dbBranchId);
}
if (!autoCommit) {
int status = rs.getInt(ServerTableColumnsName.LOCK_TABLE_STATUS);
if (status == LockStatus.Rollbacking.getCode()) {
failFast = true;
}
}
canLock = false;
break;
}
dbExistedRowKeys.add(rs.getString(ServerTableColumnsName.LOCK_TABLE_ROW_KEY));
}
// 不可以加全局锁,全局锁已经被其他事务占用
if (!canLock) {
conn.rollback();
if (failFast) {
throw new StoreException(new BranchTransactionException(LockKeyConflictFailFast));
}
return false;
}
// If the lock has been exists in db, remove it from the lockDOs
if (CollectionUtils.isNotEmpty(dbExistedRowKeys)) {
unrepeatedLockDOs = lockDOs.stream().filter(lockDO -> !dbExistedRowKeys.contains(lockDO.getRowKey()))
.collect(Collectors.toList());
}
if (CollectionUtils.isEmpty(unrepeatedLockDOs)) {
conn.rollback();
return true;
}
}
// lock
if (unrepeatedLockDOs.size() == 1) {
LockDO lockDO = unrepeatedLockDOs.get(0);
// 加全局锁
if (!doAcquireLock(conn, lockDO)) {
if (LOGGER.isInfoEnabled()) {
LOGGER.info("Global lock acquire failed, xid {} branchId {} pk {}", lockDO.getXid(), lockDO.getBranchId(), lockDO.getPk());
}
conn.rollback();
return false;
}
} else {
// 批量加全局锁
if (!doAcquireLocks(conn, unrepeatedLockDOs)) {
if (LOGGER.isInfoEnabled()) {
LOGGER.info("Global lock batch acquire failed, xid {} branchId {} pks {}", unrepeatedLockDOs.get(0).getXid(),
unrepeatedLockDOs.get(0).getBranchId(), unrepeatedLockDOs.stream().map(lockDO -> lockDO.getPk()).collect(Collectors.toList()));
}
conn.rollback();
return false;
}
}
conn.commit();
return true;
} catch (SQLException e) {
throw new StoreException(e);
} finally {
IOUtil.close(rs, ps);
if (conn != null) {
try {
if (originalAutoCommit) {
conn.setAutoCommit(true);
}
conn.close();
} catch (SQLException e) {
}
}
}
}
增加全局行锁、检查全局锁冲突的逻辑如下:
- 先对要加的全局行锁去重,然后关闭数据库连接的自动提交;
- 如果跳过了全局锁冲突检查,则直接持久化全局行锁,然后提交全局锁数据持久化事务;
如果需要进行全局锁冲突检查:
首先根据分支事务传入的全局行锁构建查询全局锁的SQL;
SQL模板(要上几个行锁,in后面就有几个?)
select xid, transaction_id, branch_id, resource_id, table_name, pk, row_key, gmt_create, gmt_modified,status from lock_table where row_key in ( ? ) order by status desc
行锁的key值(数据库连接URL + 表明 + 主键id)
jdbc:mysql://127.0.0.1:3306/seata_stock^^^stock_tbl^^^1
- 如果根据查询全局行锁SQL没有从DB中查出记录,说明没有其他事务加当前分支事务所需要的全局行锁;
则直接持久化全局行锁,然后提交全局锁数据持久化事务;
- 如果根据查询全局行锁SQL从DB中查出了记录,并且加全局锁的全局事务xid不是当前全局事务的,则说明全局锁已经被其他全局事务占用;
进而回滚当前
提交全局锁数据持久化事务
,返回false,表示加全局锁失败;方法返回到ATCore#branchSessionLock()方法中,如果加全局锁失败,则直接抛出异常
BranchTransactionException
。
所谓的加全局锁操作,其实就是针对每一行记录 持久化一条行锁记录到lock_table
表中:
protected boolean doAcquireLock(Connection conn, LockDO lockDO) {
PreparedStatement ps = null;
try {
//insert
String insertLockSQL = LockStoreSqlFactory.getLogStoreSql(dbType).getInsertLockSQL(lockTable);
ps = conn.prepareStatement(insertLockSQL);
// 全局事务xid
ps.setString(1, lockDO.getXid());
ps.setLong(2, lockDO.getTransactionId());
// 分支事务ID
ps.setLong(3, lockDO.getBranchId());
ps.setString(4, lockDO.getResourceId());
ps.setString(5, lockDO.getTableName());
// 主键
ps.setString(6, lockDO.getPk());
// rowKey
ps.setString(7, lockDO.getRowKey());
// 锁状态:Locked(已加锁)
ps.setInt(8, LockStatus.Locked.getCode());
return ps.executeUpdate() > 0;
} catch (SQLException e) {
throw new StoreException(e);
} finally {
IOUtil.close(ps);
}
}
4)分支事务添加到全局事务
1> 持久化分支事务
将分支事务注册到全局事务之后,会触发Session生命周期监听器SessionLifecycleListener
的onAddBranch()
事件;
// AbstractSessionManager 将 分支事务会话持久化到DB中
for (SessionLifecycleListener lifecycleListener : lifecycleListeners) {
lifecycleListener.onAddBranch(this, branchSession);
}
在此处,SessionLifecycleListener只有一个实现AbstractSessionManager:
DataBaseTransactionStoreManager是store.mode为db时的事务存储管理器,其writeSession()方法负责持久化全局事务、分支事务;
@Override
public boolean writeSession(LogOperation logOperation, SessionStorable session) {
if (LogOperation.GLOBAL_ADD.equals(logOperation)) {
return logStore.insertGlobalTransactionDO(SessionConverter.convertGlobalTransactionDO(session));
} else if (LogOperation.GLOBAL_UPDATE.equals(logOperation)) {
return logStore.updateGlobalTransactionDO(SessionConverter.convertGlobalTransactionDO(session));
} else if (LogOperation.GLOBAL_REMOVE.equals(logOperation)) {
return logStore.deleteGlobalTransactionDO(SessionConverter.convertGlobalTransactionDO(session));
} else if (LogOperation.BRANCH_ADD.equals(logOperation)) {
return logStore.insertBranchTransactionDO(SessionConverter.convertBranchTransactionDO(session));
} else if (LogOperation.BRANCH_UPDATE.equals(logOperation)) {
return logStore.updateBranchTransactionDO(SessionConverter.convertBranchTransactionDO(session));
} else if (LogOperation.BRANCH_REMOVE.equals(logOperation)) {
return logStore.deleteBranchTransactionDO(SessionConverter.convertBranchTransactionDO(session));
} else {
throw new StoreException("Unknown LogOperation:" + logOperation.name());
}
}
前面也提到过LogStore可以看做是封装了JDBC、操作DB的工具类 / DAO层,其只有一个实现LogStoreDataBaseDAO;
@Override
public boolean insertGlobalTransactionDO(GlobalTransactionDO globalTransactionDO) {
String sql = LogStoreSqlsFactory.getLogStoreSqls(dbType).getInsertGlobalTransactionSQL(globalTable);
Connection conn = null;
PreparedStatement ps = null;
try {
int index = 1;
conn = logStoreDataSource.getConnection();
conn.setAutoCommit(true);
ps = conn.prepareStatement(sql);
ps.setString(index++, globalTransactionDO.getXid());
ps.setLong(index++, globalTransactionDO.getTransactionId());
ps.setInt(index++, globalTransactionDO.getStatus());
ps.setString(index++, globalTransactionDO.getApplicationId());
ps.setString(index++, globalTransactionDO.getTransactionServiceGroup());
String transactionName = globalTransactionDO.getTransactionName();
transactionName = transactionName.length() > transactionNameColumnSize ?
transactionName.substring(0, transactionNameColumnSize) :
transactionName;
ps.setString(index++, transactionName);
ps.setInt(index++, globalTransactionDO.getTimeout());
ps.setLong(index++, globalTransactionDO.getBeginTime());
ps.setString(index++, globalTransactionDO.getApplicationData());
return ps.executeUpdate() > 0;
} catch (SQLException e) {
throw new StoreException(e);
} finally {
IOUtil.close(ps, conn);
}
}
这里只是单纯的利用JDBC对数据做持久化,将数据持久化到branch_table
。
2> JVM层面分支事务添加到全局事务
四、总结
无论是AT、TCC、XA、SAGA哪种分布式事务模式,分支事务注册到全局事务的方式都一样;
TC中对RM注册分支事务到全局事务的处理逻辑为:
- 首先根据xid从DB中找到全局事务会话(这里会做一个DB查询操作);
- 校验全局事务会话,全局事务会话必须是存活的,并且状态必须为:GlobalStatus.Begin;
- 构建一个分支事务会话BranchSession;给分支事务加全局锁,出现锁冲突则直接报错,抛出异常
BranchTransactionException
; - 将分支事务会话添加到全局事务会话,持久化分支事务会话;
TC在分支事务注册的同时,会同时增加全局行锁、检查全局锁冲突。