Fescar 全局锁介绍

简介: 开篇 这篇文章的目的主要是讲解TC的在处理分支事务注册过程中对全局锁的处理流程,理解了全局锁以后才能明白对DB同一个记录进行多次变更是如何解决的。 如上图所示,问最终全局事务A对资源R1应该回滚到哪种状态?很明显,如果再根据UndoLog去做回滚,就会发生严重问题:覆盖了全局事务B对资源R1的变更。

开篇

 这篇文章的目的主要是讲解TC的在处理分支事务注册过程中对全局锁的处理流程,理解了全局锁以后才能明白对DB同一个记录进行多次变更是如何解决的。

image

 如上图所示,问最终全局事务A对资源R1应该回滚到哪种状态?很明显,如果再根据UndoLog去做回滚,就会发生严重问题:覆盖了全局事务B对资源R1的变更。
 那Fescar是如何解决这个问题呢?答案就是 Fescar的全局写排它锁解决方案,在全局事务A执行过程中全局事务B会因为获取不到全局锁而处于等待状态。


Fescar 全局锁处理流程

RM 尝试获取全局锁

public class ConnectionProxy extends AbstractConnectionProxy {
    public void commit() throws SQLException {
        if (context.inGlobalTransaction()) {
            try {
                // 1、向TC发起注册操作并检查是否能够获取全局锁
                register();
            } catch (TransactionException e) {
                recognizeLockKeyConflictException(e);
            }

            try {
                if (context.hasUndoLog()) {
                    UndoLogManager.flushUndoLogs(this);
                }
                // 2、执行本地的事务的commit操作
                targetConnection.commit();
            } catch (Throwable ex) {
                report(false);
                if (ex instanceof SQLException) {
                    throw (SQLException) ex;
                } else {
                    throw new SQLException(ex);
                }
            }
            report(true);
            context.reset();

        } else {
            targetConnection.commit();
        }
    }

    private void register() throws TransactionException {
        Long branchId = DataSourceManager.get().branchRegister(
                BranchType.AT, getDataSourceProxy().getResourceId(),
                null, context.getXid(), context.buildLockKeys());
        context.setBranchId(branchId);
    }
}

说明:

  • RM 执行本地事务提交操作在ConnectionProxy的commit()完成。
  • commit()的过程当中按照register->commit的流程执行。
  • register()过程RM会向TC发起注册请求判断是否能够获取全局锁
  • register()通过DataSourceManager的branchRegister()操作完成。


TC处理全局锁申请流程

public class DefaultCore implements Core {

    protected void doBranchRegister(BranchRegisterRequest request, BranchRegisterResponse response,
                                    RpcContext rpcContext) throws TransactionException {
        response.setTransactionId(request.getTransactionId());
        response.setBranchId(
            core.branchRegister(request.getBranchType(), request.getResourceId(), rpcContext.getClientId(),
                XID.generateXID(request.getTransactionId()), request.getLockKey()));

    }

    public Long branchRegister(BranchType branchType, String resourceId, 
                 String clientId, String xid, String lockKeys) throws TransactionException {
        GlobalSession globalSession = assertGlobalSession(XID.getTransactionId(xid), GlobalStatus.Begin);

        BranchSession branchSession = new BranchSession();
        branchSession.setTransactionId(XID.getTransactionId(xid));
        branchSession.setBranchId(UUIDGenerator.generateUUID());
        branchSession.setApplicationId(globalSession.getApplicationId());
        branchSession.setTxServiceGroup(globalSession.getTransactionServiceGroup());
        branchSession.setBranchType(branchType);
        branchSession.setResourceId(resourceId);
        branchSession.setLockKey(lockKeys);
        branchSession.setClientId(clientId);

        // 判断branchSession是否能够获取锁
        if (!branchSession.lock()) {
            throw new TransactionException(LockKeyConflict);
        }
        try {
            globalSession.addBranch(branchSession);
        } catch (RuntimeException ex) {
            throw new TransactionException(FailedToAddBranch);

        }
        return branchSession.getBranchId();
    }

    public boolean lock() throws TransactionException {
        return LockManagerFactory.get().acquireLock(this);
    }

}

说明:

  • TC 在处理branchRegister()的过程中会判断branchResiter请求携带的session信息能否获取全局锁。
  • branchSession.lock()判断能否获取锁,如果获取失败则抛出TransactionException(LockKeyConflict)异常。
  • branchSession.lock()判断能否获取锁,如果获取成功则将branchSession添加到全局锁当中。


TC 判断全局锁分配流程

public class DefaultLockManagerImpl implements LockManager {

    public boolean acquireLock(BranchSession branchSession) throws TransactionException {
        String resourceId = branchSession.getResourceId();
        long transactionId = branchSession.getTransactionId();
        //1、根据resourceId去LOCK_MAP获取,获取失败则新增一个空的对象。
        ConcurrentHashMap<String, ConcurrentHashMap<Integer, Map<String, Long>>> dbLockMap = LOCK_MAP.get(resourceId);
        if (dbLockMap == null) {
            LOCK_MAP.putIfAbsent(resourceId, new ConcurrentHashMap<String, ConcurrentHashMap<Integer, Map<String, Long>>>());
            dbLockMap = LOCK_MAP.get(resourceId);
        }


        ConcurrentHashMap<Map<String, Long>, Set<String>> bucketHolder = branchSession.getLockHolder();
        
        // 2、获取branchSession的全局锁的key对象
        String lockKey = branchSession.getLockKey();
        if(StringUtils.isEmpty(lockKey)) {
            return true;
        }
        
        // 3、按照分号“;”切割多个LockKey,每个LockKey按照table:pk1;pk2;pk3格式组装。
        String[] tableGroupedLockKeys = lockKey.split(";");
        for (String tableGroupedLockKey : tableGroupedLockKeys) {
            int idx = tableGroupedLockKey.indexOf(":");
            if (idx < 0) {
                branchSession.unlock();
                throw new ShouldNeverHappenException("Wrong format of LOCK KEYS: " + branchSession.getLockKey());
            }
            // 4、分割获取branchRegister请求的表名和pks。
            String tableName = tableGroupedLockKey.substring(0, idx);
            String mergedPKs = tableGroupedLockKey.substring(idx + 1);
            // 5、获取表下的已经加锁的记录tableLockMap 
            ConcurrentHashMap<Integer, Map<String, Long>> tableLockMap = dbLockMap.get(tableName);
            if (tableLockMap == null) {
                dbLockMap.putIfAbsent(tableName, new ConcurrentHashMap<Integer, Map<String, Long>>());
                tableLockMap = dbLockMap.get(tableName);
            }
            // 6、遍历该表所有pks判断是否已加锁。
            String[] pks = mergedPKs.split(",");
            for (String pk : pks) {
                // 7、同一个表的pk按照hash值进行hash分配到tableLockMap当中。
                int bucketId = pk.hashCode() % BUCKET_PER_TABLE;
                Map<String, Long> bucketLockMap = tableLockMap.get(bucketId);
                if (bucketLockMap == null) {
                    tableLockMap.putIfAbsent(bucketId, new HashMap<String, Long>());
                    bucketLockMap = tableLockMap.get(bucketId);
                }
                // 8、根据pk去获取bucketLockMap当中获取锁对象。
                synchronized (bucketLockMap) {
                    Long lockingTransactionId = bucketLockMap.get(pk);
                    if (lockingTransactionId == null) {
                        // No existing lock
                        // 9、将锁添加到branchSession当中
                        bucketLockMap.put(pk, transactionId);
                        Set<String> keysInHolder = bucketHolder.get(bucketLockMap);
                        if (keysInHolder == null) {
                            bucketHolder.putIfAbsent(bucketLockMap, new ConcurrentSet<String>());
                            keysInHolder = bucketHolder.get(bucketLockMap);
                        }
                        keysInHolder.add(pk);

                    } else if (lockingTransactionId.longValue() == transactionId) {
                        // Locked by me
                        continue;
                    } else {
                        // 直接返回异常
                        LOGGER.info("Global lock on [" + tableName + ":" + pk + "] is holding by " + lockingTransactionId);
                        branchSession.unlock(); // Release all acquired locks.
                        return false;
                    }
                }
            }
        }
        return true;
    }
}

说明:

  • TC 判断branchRegister操作能否获取锁按照维度层层递进判断。
  • 1、先从ResourceId的维度进行判断, LOCK_MAP.get(resourceId)。
  • 2、再从tableName的维度进行判断, dbLockMap.get(tableName)。
  • 3、再从pk的hashcode的维度进行判断,tableLockMap.get(bucketId)。
  • 4、在从pk的维度进行判断,bucketLockMap.get(pk)。
  • 5、如果按照上述维度判断后未存在加锁的branchSession就返回能够加锁,否则返回不能加锁。
  • 6、判断加锁过程中处理了幂等,如果自己已经加锁了可以再次获取锁。


TM 全局锁维护数据结构

private static final ConcurrentHashMap<String, 
                     ConcurrentHashMap<String, 
                     ConcurrentHashMap<Integer, 
                     Map<String, Long>>>> LOCK_MAP = 
                     new ConcurrentHashMap<String, 
                         ConcurrentHashMap<String, 
                        ConcurrentHashMap<Integer, 
                        Map<String, Long>>>>();

说明:

  • LOCK_MAP 作为TC全局锁的保存结构。
  • 第一层ConcurrentHashMap的key为resourceId。
  • 第二层ConcurrentHashMap的key为tableName。
  • 第三层ConcurrentHashMap的key为pk的hashCode。
  • 第四层的Map的key为pk,value为transactionId(RM携带过来)。


参考文章


Fescar源码分析连载

目录
相关文章
|
数据采集 IDE 编译器
STM32微控制器入门及应用实例
STM32微控制器入门及应用实例
|
机器学习/深度学习 大数据 计算机视觉
【YOLOv8改进 - 特征融合】 GELAN:YOLOV9 通用高效层聚合网络,高效且涨点
YOLOv8专栏探讨了深度学习中信息瓶颈问题,提出可编程梯度信息(PGI)和广义高效层聚合网络(GELAN),改善轻量级模型的信息利用率。GELAN在MS COCO数据集上表现优越,且PGI适用于不同规模的模型,甚至能超越预训练SOTA。[论文](https://arxiv.org/pdf/2402.13616)和[代码](https://github.com/WongKinYiu/yolov9)已开源。核心组件RepNCSPELAN4整合了RepNCSP块和卷积。更多详情及配置参见相关链接。
|
机器学习/深度学习 分布式计算 并行计算
探索UCI心脏病数据:利用R语言和h2o深度学习构建预测模型
本文的研究目的是基于UCI心脏病数据集[1],利用R语言和h2o深度学习框架构建一个预测模型,旨在准确预测个体患心脏病的风险。通过使用该模型,医疗专业人员可以更好地进行早期干预和预防措施,从而提高患者的生活质量和健康状况。
1239 0
|
8天前
|
云安全 监控 安全
|
13天前
|
机器学习/深度学习 人工智能 自然语言处理
Z-Image:冲击体验上限的下一代图像生成模型
通义实验室推出全新文生图模型Z-Image,以6B参数实现“快、稳、轻、准”突破。Turbo版本仅需8步亚秒级生成,支持16GB显存设备,中英双语理解与文字渲染尤为出色,真实感和美学表现媲美国际顶尖模型,被誉为“最值得关注的开源生图模型之一”。
1451 8
|
7天前
|
人工智能 安全 前端开发
AgentScope Java v1.0 发布,让 Java 开发者轻松构建企业级 Agentic 应用
AgentScope 重磅发布 Java 版本,拥抱企业开发主流技术栈。
482 11