- 第一种情况:当任意RM的业务处理出现异常,都会触发TM发起全局事务的回滚,相关的回滚指令由TM下发给TC,最终TC把回滚指令依次下发给所有的RM,通过所有分支事务的回滚达到全局事务回滚的目的;
- 第二种情况:当所有RM都成功提交分支事务后,TM发起全局事务提交指令给到TC服务,TC收到指令后,同样会依次调用所有RM发起分支事务的提交,以便达到全局事务提交的目的;
public class DefaultRMHandler extends AbstractRMHandler { // 接收TC下发的提交请求 @Override public BranchCommitResponse handle(BranchCommitRequest request) { MDC.put(RootContext.MDC_KEY_XID, request.getXid()); MDC.put(RootContext.MDC_KEY_BRANCH_ID, String.valueOf(request.getBranchId())); return getRMHandler(request.getBranchType()).handle(request); } // 接收TC下发的回滚请求 @Override public BranchRollbackResponse handle(BranchRollbackRequest request) { MDC.put(RootContext.MDC_KEY_XID, request.getXid()); MDC.put(RootContext.MDC_KEY_BRANCH_ID, String.valueOf(request.getBranchId())); return getRMHandler(request.getBranchType()).handle(request); } } // 上面的提交或回滚请求,最终会调用到AbstractRMHandler里面的逻辑 public abstract class AbstractRMHandler extends AbstractExceptionHandler implements RMInboundHandler, TransactionMessageHandler { // 下面使用到了最熟悉的模版模式 @Override public BranchCommitResponse handle(BranchCommitRequest request) { BranchCommitResponse response = new BranchCommitResponse(); // 模版模式 exceptionHandleTemplate(new AbstractCallback<BranchCommitRequest, BranchCommitResponse>() { @Override public void execute(BranchCommitRequest request, BranchCommitResponse response) throws TransactionException { doBranchCommit(request, response); } }, request, response); return response; } // 回滚也是一样,使用模版模式 @Override public BranchRollbackResponse handle(BranchRollbackRequest request) { BranchRollbackResponse response = new BranchRollbackResponse(); exceptionHandleTemplate(new AbstractCallback<BranchRollbackRequest, BranchRollbackResponse>() { @Override public void execute(BranchRollbackRequest request, BranchRollbackResponse response) throws TransactionException { doBranchRollback(request, response); } }, request, response); return response; } } 复制代码
类的处理逻辑中,最核心的逻辑在doBranchCommit(request, response)
和doBranchRollback(request, response)
AbstractRMHandler.doBranchCommit(request, response)
protected void doBranchCommit(BranchCommitRequest request, BranchCommitResponse response) throws TransactionException { // 拆解请求参数 String xid = request.getXid(); long branchId = request.getBranchId(); String resourceId = request.getResourceId(); String applicationData = request.getApplicationData(); if (LOGGER.isInfoEnabled()) { LOGGER.info("Branch committing: " + xid + " " + branchId + " " + resourceId + " " + applicationData); } // 执行分支事务提交 BranchStatus status = getResourceManager().branchCommit(request.getBranchType(), xid, branchId, resourceId, applicationData); // 返回响应结果 response.setXid(xid); response.setBranchId(branchId); response.setBranchStatus(status); if (LOGGER.isInfoEnabled()) { LOGGER.info("Branch commit result: " + status); } } 复制代码
public class DataSourceManager extends AbstractResourceManager { @Override public BranchStatus branchCommit(BranchType branchType, String xid, long branchId, String resourceId, String applicationData) throws TransactionException { // 异步提交 return asyncWorker.branchCommit(xid, branchId, resourceId); } } 复制代码
public class AsyncWorker { public AsyncWorker(DataSourceManager dataSourceManager) { this.dataSourceManager = dataSourceManager; LOGGER.info("Async Commit Buffer Limit: {}", ASYNC_COMMIT_BUFFER_LIMIT); commitQueue = new LinkedBlockingQueue<>(ASYNC_COMMIT_BUFFER_LIMIT); ThreadFactory threadFactory = new NamedThreadFactory("AsyncWorker", 2, true); scheduledExecutor = new ScheduledThreadPoolExecutor(2, threadFactory); // 创建定时任务,每秒中调用doBranchCommitSafely提交分支事务 scheduledExecutor.scheduleAtFixedRate(this::doBranchCommitSafely, 10, 1000, TimeUnit.MILLISECONDS); } // 异步提交分支事务 public BranchStatus branchCommit(String xid, long branchId, String resourceId) { Phase2Context context = new Phase2Context(xid, branchId, resourceId); // 把分支事务放进阻塞队列中 addToCommitQueue(context); return BranchStatus.PhaseTwo_Committed; } } 复制代码
void doBranchCommitSafely() { try { // 提交分支事务 doBranchCommit(); } catch (Throwable e) { LOGGER.error("Exception occur when doing branch commit", e); } } // 真正干活的代码 private void doBranchCommit() { // 如果阻塞队列中没有需要提交的分支事务,那么直接返回 if (commitQueue.isEmpty()) { return; } // 这里把阻塞队列中的数据放到了allContexts中,属于CopyOnWrite的思想,下面就可以专注操作allContexts List<Phase2Context> allContexts = new LinkedList<>(); commitQueue.drainTo(allContexts); // 通过resourceId对Phase2Context进行分组 Map<String, List<Phase2Context>> groupedContexts = groupedByResourceId(allContexts); // 循环调用dealWithGroupedContexts groupedContexts.forEach(this::dealWithGroupedContexts); } private void dealWithGroupedContexts(String resourceId, List<Phase2Context> contexts) { if (StringUtils.isBlank(resourceId)) { //ConcurrentHashMap required notNull key LOGGER.warn("resourceId is empty and will skip."); return; } // 通过resourceId获取DataSourceProxy DataSourceProxy dataSourceProxy = dataSourceManager.get(resourceId); if (dataSourceProxy == null) { LOGGER.warn("failed to find resource for {} and requeue", resourceId); // 如果没拿到DataSourceProxy,重新放进阻塞队列中,等待下次提交 addAllToCommitQueue(contexts); return; } Connection conn = null; try { // 获取原生Connection,这里就不需要代理Connection了 conn = dataSourceProxy.getPlainConnection(); UndoLogManager undoLogManager = UndoLogManagerFactory.getUndoLogManager(dataSourceProxy.getDbType()); // 按照每次最多1000个Phase2Context来分割所有的Phase2Context List<List<Phase2Context>> splitByLimit = Lists.partition(contexts, UNDOLOG_DELETE_LIMIT_SIZE); // 删除该分支事务对应的所有的undolog for (List<Phase2Context> partition : splitByLimit) { deleteUndoLog(conn, undoLogManager, partition); } } catch (SQLException sqlExx) { //如果出了异常,那么重新回到阻塞队列中,等待下次提交 addAllToCommitQueue(contexts); LOGGER.error("failed to get connection for async committing on {} and requeue", resourceId, sqlExx); } finally { // 释放链接 IOUtil.close(conn); } } // 真正的删除undolog逻辑 private void deleteUndoLog(final Connection conn, UndoLogManager undoLogManager, List<Phase2Context> contexts) { Set<String> xids = new LinkedHashSet<>(contexts.size()); Set<Long> branchIds = new LinkedHashSet<>(contexts.size()); contexts.forEach(context -> { xids.add(context.xid); branchIds.add(context.branchId); }); try { // 构建删除undolog的sql语句,并执行 undoLogManager.batchDeleteUndoLog(xids, branchIds, conn); if (!conn.getAutoCommit()) { // 提交删除结果 conn.commit(); } } catch (SQLException e) { LOGGER.error("Failed to batch delete undo log", e); try { // 出异常,就回滚,并重新加入待提交队列 conn.rollback(); addAllToCommitQueue(contexts); } catch (SQLException rollbackEx) { LOGGER.error("Failed to rollback JDBC resource after deleting undo log failed", rollbackEx); } } } 复制代码
- 分支事务的提交其实是定时任务异步提交的,RM中的
每秒中会对阻塞队列中的待提交分支事务进行提交;- 分支事务的提交其实就是简单地删除对应的undolog日志即可;
AbstractRMHandler.doBranchRollback(request, response)
/** * Do branch rollback. * * @param request the request * @param response the response * @throws TransactionException the transaction exception */ protected void doBranchRollback(BranchRollbackRequest request, BranchRollbackResponse response) throws TransactionException { String xid = request.getXid(); long branchId = request.getBranchId(); String resourceId = request.getResourceId(); String applicationData = request.getApplicationData(); if (LOGGER.isInfoEnabled()) { LOGGER.info("Branch Rollbacking: " + xid + " " + branchId + " " + resourceId); } BranchStatus status = getResourceManager().branchRollback(request.getBranchType(), xid, branchId, resourceId, applicationData); response.setXid(xid); response.setBranchId(branchId); response.setBranchStatus(status); if (LOGGER.isInfoEnabled()) { LOGGER.info("Branch Rollbacked result: " + status); } } 复制代码
public class DataSourceManager extends AbstractResourceManager { // 分支事务回滚 @Override public BranchStatus branchRollback(BranchType branchType, String xid, long branchId, String resourceId, String applicationData) throws TransactionException { // 获取DataSourceProxy DataSourceProxy dataSourceProxy = get(resourceId); if (dataSourceProxy == null) { throw new ShouldNeverHappenException(String.format("resource: %s not found",resourceId)); } try { // 回滚主要逻辑在undo()里面 UndoLogManagerFactory.getUndoLogManager(dataSourceProxy.getDbType()).undo(dataSourceProxy, xid, branchId); } catch (TransactionException te) { StackTraceLogger.info(LOGGER, te, "branchRollback failed. branchType:[{}], xid:[{}], branchId:[{}], resourceId:[{}], applicationData:[{}]. reason:[{}]", new Object[]{branchType, xid, branchId, resourceId, applicationData, te.getMessage()}); // 回滚失败,直接返回,并告知是否可以重试 if (te.getCode() == TransactionExceptionCode.BranchRollbackFailed_Unretriable) { return BranchStatus.PhaseTwo_RollbackFailed_Unretryable; } else { return BranchStatus.PhaseTwo_RollbackFailed_Retryable; } } // 回滚成功 return BranchStatus.PhaseTwo_Rollbacked; } } 复制代码
@Override public void undo(DataSourceProxy dataSourceProxy, String xid, long branchId) throws TransactionException { Connection conn = null; ResultSet rs = null; PreparedStatement selectPST = null; boolean originalAutoCommit = true; // 回滚的逻辑使用了死循环,直至回滚成功或失败 for (; ; ) { try { // DataSourceProxy主要是为了拿到原生Connection conn = dataSourceProxy.getPlainConnection(); // 改成手动提交 if (originalAutoCommit = conn.getAutoCommit()) { conn.setAutoCommit(false); } // 查询undolog selectPST = conn.prepareStatement(SELECT_UNDO_LOG_SQL); selectPST.setLong(1, branchId); selectPST.setString(2, xid); rs = selectPST.executeQuery(); boolean exists = false; // 判断是否有undolog while (rs.next()) { // 进入while循环,说明一定有undolog exists = true; int state = rs.getInt(ClientTableColumnsName.UNDO_LOG_LOG_STATUS); // 判断当前undolog是否可以操作 if (!canUndo(state)) { if (LOGGER.isInfoEnabled()) { LOGGER.info("xid {} branch {}, ignore {} undo_log", xid, branchId, state); } // 不可用操作就返回 return; } String contextString = rs.getString(ClientTableColumnsName.UNDO_LOG_CONTEXT); Map<String, String> context = parseContext(contextString); byte[] rollbackInfo = getRollbackInfo(rs); String serializer = context == null ? null : context.get(UndoLogConstants.SERIALIZER_KEY); UndoLogParser parser = serializer == null ? UndoLogParserFactory.getInstance() : UndoLogParserFactory.getInstance(serializer); // 最终通过一系列的解析,最终得到BranchUndoLog实体对象 BranchUndoLog branchUndoLog = parser.decode(rollbackInfo); try { // put serializer name to local setCurrentSerializer(parser.getName()); List<SQLUndoLog> sqlUndoLogs = branchUndoLog.getSqlUndoLogs(); if (sqlUndoLogs.size() > 1) { Collections.reverse(sqlUndoLogs); } // 循环执行回滚操作 for (SQLUndoLog sqlUndoLog : sqlUndoLogs) { TableMeta tableMeta = TableMetaCacheFactory.getTableMetaCache(dataSourceProxy.getDbType()).getTableMeta( conn, sqlUndoLog.getTableName(), dataSourceProxy.getResourceId()); sqlUndoLog.setTableMeta(tableMeta); AbstractUndoExecutor undoExecutor = UndoExecutorFactory.getUndoExecutor( dataSourceProxy.getDbType(), sqlUndoLog); // 回滚逻辑主要在这里面 undoExecutor.executeOn(conn); } } finally { // remove serializer name removeCurrentSerializer(); } } // 回滚完成后,删除undolog,并提交事务 if (exists) { deleteUndoLog(xid, branchId, conn); conn.commit(); if (LOGGER.isInfoEnabled()) { LOGGER.info("xid {} branch {}, undo_log deleted with {}", xid, branchId, State.GlobalFinished.name()); } } else { // 如果没有找到undolog日志,说明可能在分支事务注册的时候,undolog还没来得及添加,因为超时或者其他分支事务异常触发了TM发起全局事务回滚,所以这里立马插入一条记录,以防undolog被RM插入 insertUndoLogWithGlobalFinished(xid, branchId, UndoLogParserFactory.getInstance(), conn); conn.commit(); if (LOGGER.isInfoEnabled()) { LOGGER.info("xid {} branch {}, undo_log added with {}", xid, branchId, State.GlobalFinished.name()); } } return; } catch (SQLIntegrityConstraintViolationException e) { // 产生SQLIntegrityConstraintViolationException异常后,将重新循环执行回滚 if (LOGGER.isInfoEnabled()) { LOGGER.info("xid {} branch {}, undo_log inserted, retry rollback", xid, branchId); } } catch (Throwable e) { // 如果产生其他异常,那么将回滚前面的操作,并抛出异常跳出循环 if (conn != null) { try { conn.rollback(); } catch (SQLException rollbackEx) { LOGGER.warn("Failed to close JDBC resource while undo ... ", rollbackEx); } } throw new BranchTransactionException(BranchRollbackFailed_Retriable, String .format("Branch session rollback failed and try again later xid = %s branchId = %s %s", xid, branchId, e.getMessage()), e); } finally { // 恢复现场 try { if (rs != null) { rs.close(); } if (selectPST != null) { selectPST.close(); } if (conn != null) { if (originalAutoCommit) { conn.setAutoCommit(true); } conn.close(); } } catch (SQLException closeEx) { LOGGER.warn("Failed to close JDBC resource while undo ... ", closeEx); } } } } 复制代码
- 如果找到undolog,那么执行回滚操作;
- 如果没有undolog,那么先插入一条记录标记,以防undolog被插入;如果标记插入失败,说明undolog已经被其他线程插入成功了,那么重新循环执行回滚操作;
public void executeOn(Connection conn) throws SQLException { // 如果开启校验undo日志,并且数据校验失败,不能执行回滚 // 主要目的是为了确认当前数据和后镜像是否一致,否则就是数据被污染了,不能回滚 if (IS_UNDO_DATA_VALIDATION_ENABLE && !dataValidationAndGoOn(conn)) { return; } // 下面才是真正的回滚逻辑 PreparedStatement undoPST = null; try { // 构建undoSQL,这里用到了策略模式 String undoSQL = buildUndoSQL(); // 获取对应的prepareStatement undoPST = conn.prepareStatement(undoSQL); // 构建反向sql TableRecords undoRows = getUndoRows(); for (Row undoRow : undoRows.getRows()) { ArrayList<Field> undoValues = new ArrayList<>(); List<Field> pkValueList = getOrderedPkList(undoRows, undoRow, getDbType(conn)); for (Field field : undoRow.getFields()) { if (field.getKeyType() != KeyType.PRIMARY_KEY) { undoValues.add(field); } } // 设置参数 undoPrepare(undoPST, undoValues, pkValueList); // 执行回滚 undoPST.executeUpdate(); } } catch (Exception ex) { if (ex instanceof SQLException) { throw (SQLException) ex; } else { throw new SQLException(ex); } } finally { //important for oracle IOUtil.close(undoPST); } } 复制代码
- 分支事务的回滚不是异步的;
- 在回滚的过程中,默认是需要校验数据是否被污染;在有完全把握的情况下,开发人员也可以自己配置成回滚不校验以提升效率