前言
兄弟们,刚刚又给seata社区修了一个BUG
,有用户提了issue反应TransactionHook在某些情况下不会被调用:
相关issue链接:github.com/seata/seata…,该用户在issue中已经指出了相关问题所在:
下面我们来看一下到底是什么原因导致了上述BUG
的产生。
问题定位
根据用户的反馈,我们找到目标源码io.seata.tm.api.TransactionalTemplate#execute()
:
try { // 开启分布式事务,获取XID beginTransaction(txInfo, tx); Object rs; try { // 执行业务代码 rs = business.execute(); } catch (Throwable ex) { // 3. 处理异常,准备回滚. completeTransactionAfterThrowing(txInfo, tx, ex); throw ex; } // 4. 提交事务. commitTransaction(tx, txInfo); return rs; } finally { //5. 回收现场 resumeGlobalLockConfig(previousConfig); triggerAfterCompletion(); cleanUp(); } 复制代码
问题代码就出在cleanUp()
中,我们来看一下里面做了什么操作,最终我们定位到:
public final class TransactionHookManager { private static final ThreadLocal<List<TransactionHook>> LOCAL_HOOKS = new ThreadLocal<>(); // 注册TransactionHook public static void registerHook(TransactionHook transactionHook) { if (transactionHook == null) { throw new NullPointerException("transactionHook must not be null"); } List<TransactionHook> transactionHooks = LOCAL_HOOKS.get(); if (transactionHooks == null) { LOCAL_HOOKS.set(new ArrayList<>()); } LOCAL_HOOKS.get().add(transactionHook); } // 移除当前线程上所有TransactionHook public static void clear() { LOCAL_HOOKS.remove(); } } 复制代码
由上面的源码可知,cleanUp()
操作时把当前线程中的所有TransactionHook
都清除掉了。也就是说,假如事务A和事务B共用同一个线程,当事务B处理完毕后,调用了cleanUp()
回收现场时,把该线程当中存储的所有TransactionHook
全部清除掉了,导致事务A的生命周期中找不到该事务对应的TransactionHook
,从而产生了BUG
。
如何解决
通过与seata社区的大佬不断地沟通,最终敲定以下方案:
1.改造
TransactionHookManager.LOCAL_HOOKS
,把数据类型改成ThreadLocal<Map<String, List<TransactionHook>>>
,Map
中的key
对应分布式事务XID
;2.针对当前上下文中没有XID,那么
key
就为null
,因为HashMap
允许key
为null
;3.当用户查询指定
XID
下的hook
时,连同key
为null
对应的hook
也一起返回;
- 第一步比较好理解,因为事务A和事务B对应的
TransactionHook
没有被区分出来,所以造成了清理事务B的TransactionHook
时连同事务A的TransactionHook
一起被清除,那么我们修改数据结构来区分事务A和事务B的TransactionHook
,以便清理的时候不会造成误删; - 第二步为什么要针对没有XID的时候也要能设置
TransactionHook
,因为有这么一段代码:
private void beginTransaction(TransactionInfo txInfo, GlobalTransaction tx) throws TransactionalExecutor.ExecutionException { try { // 执行triggerBeforeBegin() triggerBeforeBegin(); // 注册分布式事务,生成XID tx.begin(txInfo.getTimeOut(), txInfo.getName()); // 执行triggerAfterBegin() triggerAfterBegin(); } catch (TransactionException txe) { throw new TransactionalExecutor.ExecutionException(tx, txe, TransactionalExecutor.Code.BeginFailure); } } 复制代码
- 上面的代码会产生一个问题,因为我们的
TransactionHook
依赖于XID
,但是triggerBeforeBegin()
执行的时候还没有产生XID
,所以为了能够在没有XID
的时候也能够让TransactionHook
生效,我们要有一个虚值key
来临时设置TransactionHook
; - 第三步的设计时为了在第二步的基础上,当事务开启后获取
XID
后,要保证XID
获取前注册的TransactionHook
也要生效,我们在通过XID
查询TransactionHook
时要把虚值key
对应的TransactionHook
也一起返回;
注意事项
在实际代码修改中,发现triggerAfterCommit()
、triggerAfterRollback()
、triggerAfterCompletion()
在被调用时始终拿不到对应的TransactionHook
,最终debug下来发现在调用这三个方法前,上下文中的XID
被解绑了,导致拿到的XID
为空。代码类似下面这样:
try { // 调用triggerBeforeCommit() triggerBeforeCommit(); // 提交事务,清除XID tx.commit(); if (Arrays.asList(GlobalStatus.TimeoutRollbacking, GlobalStatus.TimeoutRollbacked).contains(tx.getLocalStatus())) { throw new TransactionalExecutor.ExecutionException(tx, new TimeoutException(String.format("Global transaction[%s] is timeout and will be rollback[TC].", tx.getXid())), TransactionalExecutor.Code.TimeoutRollback); } // 调用triggerAfterCommit() triggerAfterCommit(); } catch (TransactionException txe) { // 4.1 Failed to commit throw new TransactionalExecutor.ExecutionException(tx, txe, TransactionalExecutor.Code.CommitFailure); } 复制代码
不过经过我的一番查找,发现GlobalTransaction
中是包含XID
属性的,所以果断从GlobalTransaction
对象中取XID
传进来。
修改后的代码如下:
try { // 调用triggerBeforeCommit() triggerBeforeCommit(); // 提交事务,清除XID tx.commit(); if (Arrays.asList(GlobalStatus.TimeoutRollbacking, GlobalStatus.TimeoutRollbacked).contains(tx.getLocalStatus())) { throw new TransactionalExecutor.ExecutionException(tx, new TimeoutException(String.format("Global transaction[%s] is timeout and will be rollback[TC].", tx.getXid())), TransactionalExecutor.Code.TimeoutRollback); } // 调用triggerAfterCommit() triggerAfterCommit(tx.getXid()); } catch (TransactionException txe) { // 4.1 Failed to commit throw new TransactionalExecutor.ExecutionException(tx, txe, TransactionalExecutor.Code.CommitFailure); } 复制代码
改造后的TransactionHookManager
public final class TransactionHookManager { private TransactionHookManager() { } private static final ThreadLocal<Map<String, List<TransactionHook>>> LOCAL_HOOKS = new ThreadLocal<>(); /** * get the current hooks * * @return TransactionHook list */ public static List<TransactionHook> getHooks() { String xid = RootContext.getXID(); return getHooks(xid); } /** * get hooks by xid * * @param xid * @return TransactionHook list */ public static List<TransactionHook> getHooks(String xid) { Map<String, List<TransactionHook>> hooksMap = LOCAL_HOOKS.get(); if (hooksMap == null || hooksMap.isEmpty()) { return Collections.emptyList(); } List<TransactionHook> hooks = new ArrayList<>(); List<TransactionHook> localHooks = hooksMap.get(xid); if (StringUtils.isNotBlank(xid)) { List<TransactionHook> virtualHooks = hooksMap.get(null); if (virtualHooks != null && !virtualHooks.isEmpty()) { hooks.addAll(virtualHooks); } } if (localHooks != null && !localHooks.isEmpty()) { hooks.addAll(localHooks); } if (hooks.isEmpty()) { return Collections.emptyList(); } return Collections.unmodifiableList(hooks); } /** * add new hook * * @param transactionHook transactionHook */ public static void registerHook(TransactionHook transactionHook) { if (transactionHook == null) { throw new NullPointerException("transactionHook must not be null"); } Map<String, List<TransactionHook>> hooksMap = LOCAL_HOOKS.get(); if (hooksMap == null) { hooksMap = new HashMap<>(); LOCAL_HOOKS.set(hooksMap); } String xid = RootContext.getXID(); List<TransactionHook> hooks = hooksMap.get(xid); if (hooks == null) { hooks = new ArrayList<>(); hooksMap.put(xid, hooks); } hooks.add(transactionHook); } /** * clear hooks by xid * * @param xid */ public static void clear(String xid) { Map<String, List<TransactionHook>> hooksMap = LOCAL_HOOKS.get(); if (hooksMap == null || hooksMap.isEmpty()) { return; } hooksMap.remove(xid); if (StringUtils.isNotBlank(xid)) { hooksMap.remove(null); } }