Fescar 全局锁介绍

简介: 开篇 这篇文章的目的主要是讲解TC的在处理分支事务注册过程中对全局锁的处理流程,理解了全局锁以后才能明白对DB同一个记录进行多次变更是如何解决的。 如上图所示,问最终全局事务A对资源R1应该回滚到哪种状态?很明显,如果再根据UndoLog去做回滚,就会发生严重问题:覆盖了全局事务B对资源R1的变更。

开篇

 这篇文章的目的主要是讲解TC的在处理分支事务注册过程中对全局锁的处理流程,理解了全局锁以后才能明白对DB同一个记录进行多次变更是如何解决的。

image

 如上图所示,问最终全局事务A对资源R1应该回滚到哪种状态?很明显,如果再根据UndoLog去做回滚,就会发生严重问题:覆盖了全局事务B对资源R1的变更。
 那Fescar是如何解决这个问题呢?答案就是 Fescar的全局写排它锁解决方案,在全局事务A执行过程中全局事务B会因为获取不到全局锁而处于等待状态。


Fescar 全局锁处理流程

RM 尝试获取全局锁

public class ConnectionProxy extends AbstractConnectionProxy {
    public void commit() throws SQLException {
        if (context.inGlobalTransaction()) {
            try {
                // 1、向TC发起注册操作并检查是否能够获取全局锁
                register();
            } catch (TransactionException e) {
                recognizeLockKeyConflictException(e);
            }

            try {
                if (context.hasUndoLog()) {
                    UndoLogManager.flushUndoLogs(this);
                }
                // 2、执行本地的事务的commit操作
                targetConnection.commit();
            } catch (Throwable ex) {
                report(false);
                if (ex instanceof SQLException) {
                    throw (SQLException) ex;
                } else {
                    throw new SQLException(ex);
                }
            }
            report(true);
            context.reset();

        } else {
            targetConnection.commit();
        }
    }

    private void register() throws TransactionException {
        Long branchId = DataSourceManager.get().branchRegister(
                BranchType.AT, getDataSourceProxy().getResourceId(),
                null, context.getXid(), context.buildLockKeys());
        context.setBranchId(branchId);
    }
}

说明:

  • RM 执行本地事务提交操作在ConnectionProxy的commit()完成。
  • commit()的过程当中按照register->commit的流程执行。
  • register()过程RM会向TC发起注册请求判断是否能够获取全局锁
  • register()通过DataSourceManager的branchRegister()操作完成。


TC处理全局锁申请流程

public class DefaultCore implements Core {

    protected void doBranchRegister(BranchRegisterRequest request, BranchRegisterResponse response,
                                    RpcContext rpcContext) throws TransactionException {
        response.setTransactionId(request.getTransactionId());
        response.setBranchId(
            core.branchRegister(request.getBranchType(), request.getResourceId(), rpcContext.getClientId(),
                XID.generateXID(request.getTransactionId()), request.getLockKey()));

    }

    public Long branchRegister(BranchType branchType, String resourceId, 
                 String clientId, String xid, String lockKeys) throws TransactionException {
        GlobalSession globalSession = assertGlobalSession(XID.getTransactionId(xid), GlobalStatus.Begin);

        BranchSession branchSession = new BranchSession();
        branchSession.setTransactionId(XID.getTransactionId(xid));
        branchSession.setBranchId(UUIDGenerator.generateUUID());
        branchSession.setApplicationId(globalSession.getApplicationId());
        branchSession.setTxServiceGroup(globalSession.getTransactionServiceGroup());
        branchSession.setBranchType(branchType);
        branchSession.setResourceId(resourceId);
        branchSession.setLockKey(lockKeys);
        branchSession.setClientId(clientId);

        // 判断branchSession是否能够获取锁
        if (!branchSession.lock()) {
            throw new TransactionException(LockKeyConflict);
        }
        try {
            globalSession.addBranch(branchSession);
        } catch (RuntimeException ex) {
            throw new TransactionException(FailedToAddBranch);

        }
        return branchSession.getBranchId();
    }

    public boolean lock() throws TransactionException {
        return LockManagerFactory.get().acquireLock(this);
    }

}

说明:

  • TC 在处理branchRegister()的过程中会判断branchResiter请求携带的session信息能否获取全局锁。
  • branchSession.lock()判断能否获取锁,如果获取失败则抛出TransactionException(LockKeyConflict)异常。
  • branchSession.lock()判断能否获取锁,如果获取成功则将branchSession添加到全局锁当中。


TC 判断全局锁分配流程

public class DefaultLockManagerImpl implements LockManager {

    public boolean acquireLock(BranchSession branchSession) throws TransactionException {
        String resourceId = branchSession.getResourceId();
        long transactionId = branchSession.getTransactionId();
        //1、根据resourceId去LOCK_MAP获取,获取失败则新增一个空的对象。
        ConcurrentHashMap<String, ConcurrentHashMap<Integer, Map<String, Long>>> dbLockMap = LOCK_MAP.get(resourceId);
        if (dbLockMap == null) {
            LOCK_MAP.putIfAbsent(resourceId, new ConcurrentHashMap<String, ConcurrentHashMap<Integer, Map<String, Long>>>());
            dbLockMap = LOCK_MAP.get(resourceId);
        }


        ConcurrentHashMap<Map<String, Long>, Set<String>> bucketHolder = branchSession.getLockHolder();
        
        // 2、获取branchSession的全局锁的key对象
        String lockKey = branchSession.getLockKey();
        if(StringUtils.isEmpty(lockKey)) {
            return true;
        }
        
        // 3、按照分号“;”切割多个LockKey,每个LockKey按照table:pk1;pk2;pk3格式组装。
        String[] tableGroupedLockKeys = lockKey.split(";");
        for (String tableGroupedLockKey : tableGroupedLockKeys) {
            int idx = tableGroupedLockKey.indexOf(":");
            if (idx < 0) {
                branchSession.unlock();
                throw new ShouldNeverHappenException("Wrong format of LOCK KEYS: " + branchSession.getLockKey());
            }
            // 4、分割获取branchRegister请求的表名和pks。
            String tableName = tableGroupedLockKey.substring(0, idx);
            String mergedPKs = tableGroupedLockKey.substring(idx + 1);
            // 5、获取表下的已经加锁的记录tableLockMap 
            ConcurrentHashMap<Integer, Map<String, Long>> tableLockMap = dbLockMap.get(tableName);
            if (tableLockMap == null) {
                dbLockMap.putIfAbsent(tableName, new ConcurrentHashMap<Integer, Map<String, Long>>());
                tableLockMap = dbLockMap.get(tableName);
            }
            // 6、遍历该表所有pks判断是否已加锁。
            String[] pks = mergedPKs.split(",");
            for (String pk : pks) {
                // 7、同一个表的pk按照hash值进行hash分配到tableLockMap当中。
                int bucketId = pk.hashCode() % BUCKET_PER_TABLE;
                Map<String, Long> bucketLockMap = tableLockMap.get(bucketId);
                if (bucketLockMap == null) {
                    tableLockMap.putIfAbsent(bucketId, new HashMap<String, Long>());
                    bucketLockMap = tableLockMap.get(bucketId);
                }
                // 8、根据pk去获取bucketLockMap当中获取锁对象。
                synchronized (bucketLockMap) {
                    Long lockingTransactionId = bucketLockMap.get(pk);
                    if (lockingTransactionId == null) {
                        // No existing lock
                        // 9、将锁添加到branchSession当中
                        bucketLockMap.put(pk, transactionId);
                        Set<String> keysInHolder = bucketHolder.get(bucketLockMap);
                        if (keysInHolder == null) {
                            bucketHolder.putIfAbsent(bucketLockMap, new ConcurrentSet<String>());
                            keysInHolder = bucketHolder.get(bucketLockMap);
                        }
                        keysInHolder.add(pk);

                    } else if (lockingTransactionId.longValue() == transactionId) {
                        // Locked by me
                        continue;
                    } else {
                        // 直接返回异常
                        LOGGER.info("Global lock on [" + tableName + ":" + pk + "] is holding by " + lockingTransactionId);
                        branchSession.unlock(); // Release all acquired locks.
                        return false;
                    }
                }
            }
        }
        return true;
    }
}

说明:

  • TC 判断branchRegister操作能否获取锁按照维度层层递进判断。
  • 1、先从ResourceId的维度进行判断, LOCK_MAP.get(resourceId)。
  • 2、再从tableName的维度进行判断, dbLockMap.get(tableName)。
  • 3、再从pk的hashcode的维度进行判断,tableLockMap.get(bucketId)。
  • 4、在从pk的维度进行判断,bucketLockMap.get(pk)。
  • 5、如果按照上述维度判断后未存在加锁的branchSession就返回能够加锁,否则返回不能加锁。
  • 6、判断加锁过程中处理了幂等,如果自己已经加锁了可以再次获取锁。


TM 全局锁维护数据结构

private static final ConcurrentHashMap<String, 
                     ConcurrentHashMap<String, 
                     ConcurrentHashMap<Integer, 
                     Map<String, Long>>>> LOCK_MAP = 
                     new ConcurrentHashMap<String, 
                         ConcurrentHashMap<String, 
                        ConcurrentHashMap<Integer, 
                        Map<String, Long>>>>();

说明:

  • LOCK_MAP 作为TC全局锁的保存结构。
  • 第一层ConcurrentHashMap的key为resourceId。
  • 第二层ConcurrentHashMap的key为tableName。
  • 第三层ConcurrentHashMap的key为pk的hashCode。
  • 第四层的Map的key为pk,value为transactionId(RM携带过来)。


参考文章


Fescar源码分析连载

目录
相关文章
原理篇:Seata TCC模式是如何解决幂等性、资源悬挂、空回滚问题的
原理篇:Seata TCC模式是如何解决幂等性、资源悬挂、空回滚问题的
1130 0
|
2月前
|
NoSQL 前端开发 Java
【幂等性大坑】事务提交前释放锁导致锁失效问题
在事务中,使用了 Redis 分布式锁.这个方法一旦执行,事务生效,接着就 Redis 分布式锁生效,代码执行完后,先释放 Redis 分布式锁,然后再提交事务数据,最后事务结束。如果是表单重复提交场景,可以尝试给“订单号”等有唯一性的字段加唯一索引,这样重复提交时会因为唯一索引约束导致索引失效。5、如果表的一个字段,要作为另外一个表的外键,这个字段必须有唯一约束(或是主键),如果只是有唯一索引,就会报错。2、创建唯一约束,会自动创建一个同名的唯一索引,该索引不能单独删除,删除约束会自动删除索引。
【幂等性大坑】事务提交前释放锁导致锁失效问题
|
3月前
|
运维 监控 关系型数据库
全局事物服务GTS
【8月更文挑战第22天】
33 0
|
5月前
|
SQL 关系型数据库 MySQL
MySQL数据库——锁-概述以及全局锁(介绍、语法、特点)
MySQL数据库——锁-概述以及全局锁(介绍、语法、特点)
79 0
|
6月前
|
Java 数据库连接 API
seata回滚问题之全局异常如何解决
Seata是一款开源的分布式事务解决方案,旨在提供高效且无缝的分布式事务服务;在集成和使用Seata过程中,开发者可能会遇到不同的异常问题,本合集针对Seata常见异常进行系统整理,为开发者提供详细的问题分析和解决方案,助力高效解决分布式事务中的难题。
869 7
|
6月前
|
NoSQL Java 数据库
分布式事务的锁
分布式事务的锁
38 0
分布式事务的锁
|
SQL 存储 Oracle
19 PostgreSQL 锁类型,锁模式,锁冲突,死锁检测的介绍|学习笔记
快速学习19 PostgreSQL 锁类型,锁模式,锁冲突,死锁检测的介绍
702 0
19 PostgreSQL 锁类型,锁模式,锁冲突,死锁检测的介绍|学习笔记
|
JSON 算法 安全
全局锁,锁住怎么办???
全局锁,锁住怎么办???
|
SQL 存储 算法
MySQL的锁机制,包括锁分类、锁级别、锁粒度、锁冲突等方面
MySQL的锁机制,包括锁分类、锁级别、锁粒度、锁冲突等方面
146 0