开篇
这篇文章的目的主要是讲解TC的在处理分支事务注册过程中对全局锁的处理流程,理解了全局锁以后才能明白对DB同一个记录进行多次变更是如何解决的。
如上图所示,问最终全局事务A对资源R1应该回滚到哪种状态?很明显,如果再根据UndoLog去做回滚,就会发生严重问题:覆盖了全局事务B对资源R1的变更。
那Fescar是如何解决这个问题呢?答案就是 Fescar的全局写排它锁解决方案,在全局事务A执行过程中全局事务B会因为获取不到全局锁而处于等待状态。
Fescar 全局锁处理流程
RM 尝试获取全局锁
public class ConnectionProxy extends AbstractConnectionProxy {
public void commit() throws SQLException {
if (context.inGlobalTransaction()) {
try {
// 1、向TC发起注册操作并检查是否能够获取全局锁
register();
} catch (TransactionException e) {
recognizeLockKeyConflictException(e);
}
try {
if (context.hasUndoLog()) {
UndoLogManager.flushUndoLogs(this);
}
// 2、执行本地的事务的commit操作
targetConnection.commit();
} catch (Throwable ex) {
report(false);
if (ex instanceof SQLException) {
throw (SQLException) ex;
} else {
throw new SQLException(ex);
}
}
report(true);
context.reset();
} else {
targetConnection.commit();
}
}
private void register() throws TransactionException {
Long branchId = DataSourceManager.get().branchRegister(
BranchType.AT, getDataSourceProxy().getResourceId(),
null, context.getXid(), context.buildLockKeys());
context.setBranchId(branchId);
}
}
说明:
- RM 执行本地事务提交操作在ConnectionProxy的commit()完成。
- commit()的过程当中按照register->commit的流程执行。
- register()过程RM会向TC发起注册请求判断是否能够获取全局锁。
- register()通过DataSourceManager的branchRegister()操作完成。
TC处理全局锁申请流程
public class DefaultCore implements Core {
protected void doBranchRegister(BranchRegisterRequest request, BranchRegisterResponse response,
RpcContext rpcContext) throws TransactionException {
response.setTransactionId(request.getTransactionId());
response.setBranchId(
core.branchRegister(request.getBranchType(), request.getResourceId(), rpcContext.getClientId(),
XID.generateXID(request.getTransactionId()), request.getLockKey()));
}
public Long branchRegister(BranchType branchType, String resourceId,
String clientId, String xid, String lockKeys) throws TransactionException {
GlobalSession globalSession = assertGlobalSession(XID.getTransactionId(xid), GlobalStatus.Begin);
BranchSession branchSession = new BranchSession();
branchSession.setTransactionId(XID.getTransactionId(xid));
branchSession.setBranchId(UUIDGenerator.generateUUID());
branchSession.setApplicationId(globalSession.getApplicationId());
branchSession.setTxServiceGroup(globalSession.getTransactionServiceGroup());
branchSession.setBranchType(branchType);
branchSession.setResourceId(resourceId);
branchSession.setLockKey(lockKeys);
branchSession.setClientId(clientId);
// 判断branchSession是否能够获取锁
if (!branchSession.lock()) {
throw new TransactionException(LockKeyConflict);
}
try {
globalSession.addBranch(branchSession);
} catch (RuntimeException ex) {
throw new TransactionException(FailedToAddBranch);
}
return branchSession.getBranchId();
}
public boolean lock() throws TransactionException {
return LockManagerFactory.get().acquireLock(this);
}
}
说明:
- TC 在处理branchRegister()的过程中会判断branchResiter请求携带的session信息能否获取全局锁。
- branchSession.lock()判断能否获取锁,如果获取失败则抛出TransactionException(LockKeyConflict)异常。
- branchSession.lock()判断能否获取锁,如果获取成功则将branchSession添加到全局锁当中。
TC 判断全局锁分配流程
public class DefaultLockManagerImpl implements LockManager {
public boolean acquireLock(BranchSession branchSession) throws TransactionException {
String resourceId = branchSession.getResourceId();
long transactionId = branchSession.getTransactionId();
//1、根据resourceId去LOCK_MAP获取,获取失败则新增一个空的对象。
ConcurrentHashMap<String, ConcurrentHashMap<Integer, Map<String, Long>>> dbLockMap = LOCK_MAP.get(resourceId);
if (dbLockMap == null) {
LOCK_MAP.putIfAbsent(resourceId, new ConcurrentHashMap<String, ConcurrentHashMap<Integer, Map<String, Long>>>());
dbLockMap = LOCK_MAP.get(resourceId);
}
ConcurrentHashMap<Map<String, Long>, Set<String>> bucketHolder = branchSession.getLockHolder();
// 2、获取branchSession的全局锁的key对象
String lockKey = branchSession.getLockKey();
if(StringUtils.isEmpty(lockKey)) {
return true;
}
// 3、按照分号“;”切割多个LockKey,每个LockKey按照table:pk1;pk2;pk3格式组装。
String[] tableGroupedLockKeys = lockKey.split(";");
for (String tableGroupedLockKey : tableGroupedLockKeys) {
int idx = tableGroupedLockKey.indexOf(":");
if (idx < 0) {
branchSession.unlock();
throw new ShouldNeverHappenException("Wrong format of LOCK KEYS: " + branchSession.getLockKey());
}
// 4、分割获取branchRegister请求的表名和pks。
String tableName = tableGroupedLockKey.substring(0, idx);
String mergedPKs = tableGroupedLockKey.substring(idx + 1);
// 5、获取表下的已经加锁的记录tableLockMap
ConcurrentHashMap<Integer, Map<String, Long>> tableLockMap = dbLockMap.get(tableName);
if (tableLockMap == null) {
dbLockMap.putIfAbsent(tableName, new ConcurrentHashMap<Integer, Map<String, Long>>());
tableLockMap = dbLockMap.get(tableName);
}
// 6、遍历该表所有pks判断是否已加锁。
String[] pks = mergedPKs.split(",");
for (String pk : pks) {
// 7、同一个表的pk按照hash值进行hash分配到tableLockMap当中。
int bucketId = pk.hashCode() % BUCKET_PER_TABLE;
Map<String, Long> bucketLockMap = tableLockMap.get(bucketId);
if (bucketLockMap == null) {
tableLockMap.putIfAbsent(bucketId, new HashMap<String, Long>());
bucketLockMap = tableLockMap.get(bucketId);
}
// 8、根据pk去获取bucketLockMap当中获取锁对象。
synchronized (bucketLockMap) {
Long lockingTransactionId = bucketLockMap.get(pk);
if (lockingTransactionId == null) {
// No existing lock
// 9、将锁添加到branchSession当中
bucketLockMap.put(pk, transactionId);
Set<String> keysInHolder = bucketHolder.get(bucketLockMap);
if (keysInHolder == null) {
bucketHolder.putIfAbsent(bucketLockMap, new ConcurrentSet<String>());
keysInHolder = bucketHolder.get(bucketLockMap);
}
keysInHolder.add(pk);
} else if (lockingTransactionId.longValue() == transactionId) {
// Locked by me
continue;
} else {
// 直接返回异常
LOGGER.info("Global lock on [" + tableName + ":" + pk + "] is holding by " + lockingTransactionId);
branchSession.unlock(); // Release all acquired locks.
return false;
}
}
}
}
return true;
}
}
说明:
- TC 判断branchRegister操作能否获取锁按照维度层层递进判断。
- 1、先从ResourceId的维度进行判断, LOCK_MAP.get(resourceId)。
- 2、再从tableName的维度进行判断, dbLockMap.get(tableName)。
- 3、再从pk的hashcode的维度进行判断,tableLockMap.get(bucketId)。
- 4、在从pk的维度进行判断,bucketLockMap.get(pk)。
- 5、如果按照上述维度判断后未存在加锁的branchSession就返回能够加锁,否则返回不能加锁。
- 6、判断加锁过程中处理了幂等,如果自己已经加锁了可以再次获取锁。
TM 全局锁维护数据结构
private static final ConcurrentHashMap<String,
ConcurrentHashMap<String,
ConcurrentHashMap<Integer,
Map<String, Long>>>> LOCK_MAP =
new ConcurrentHashMap<String,
ConcurrentHashMap<String,
ConcurrentHashMap<Integer,
Map<String, Long>>>>();
说明:
- LOCK_MAP 作为TC全局锁的保存结构。
- 第一层ConcurrentHashMap的key为resourceId。
- 第二层ConcurrentHashMap的key为tableName。
- 第三层ConcurrentHashMap的key为pk的hashCode。
- 第四层的Map的key为pk,value为transactionId(RM携带过来)。