开篇
这篇文章的目的是介绍Fescar的提交流程(Commit)和回滚流程(Rollback),这两个流程其实是Fescar中RM的核心逻辑,涉及和TC交互的流程。
由于RM和TC交互涉及到网络通信,所以这块我们暂时只关注RM端的处理流程而暂时忽略网络通信的过程,网络通信的过程值得通过一篇文章单独进行描述。
RM提交事务源码分析
RM提交事务流程
public class ConnectionProxy extends AbstractConnectionProxy {
public void commit() throws SQLException {
// 如果是在全局事务当中的流程
if (context.inGlobalTransaction()) {
try {
// 1、注册全局事务
register();
} catch (TransactionException e) {
recognizeLockKeyConflictException(e);
}
try {
if (context.hasUndoLog()) {
// 2、持久化回滚日志
UndoLogManager.flushUndoLogs(this);
}
// 3、提交本地事务
targetConnection.commit();
} catch (Throwable ex) {
report(false);
if (ex instanceof SQLException) {
throw (SQLException) ex;
} else {
throw new SQLException(ex);
}
}
// 4、上报状态
report(true);
context.reset();
} else {
// 如果不在全局事务当中的流程
targetConnection.commit();
}
}
}
说明:
- RM的提交事务区分为在全局事务和不在全局事务两种场景。
- RM的提交事务不在全局事务的场景是直接通过数据库连接targetConnection.commit()提交事务。
- RM的提交事务在全局事务的场景中则按照:1、注册事务;2、持久化回滚日志;3、提交本地事务;4、上报状态。
- 注册事务register()。
- 持久化回滚日志UndoLogManager.flushUndoLogs(this)。
- 提交本地事务 targetConnection.commit()。
- 上报状态 report(true)。
RM 分支事务注册
public class ConnectionProxy extends AbstractConnectionProxy {
private void register() throws TransactionException {
Long branchId = DataSourceManager.get().branchRegister(BranchType.AT, getDataSourceProxy().getResourceId(),
null, context.getXid(), context.buildLockKeys());
context.setBranchId(branchId);
}
}
public class ConnectionContext {
public String buildLockKeys() {
if (lockKeysBuffer.isEmpty()) {
return null;
}
StringBuffer appender = new StringBuffer();
Iterator<String> iterable = lockKeysBuffer.iterator();
while (iterable.hasNext()) {
appender.append(iterable.next());
if (iterable.hasNext()) {
appender.append(";");
}
}
return appender.toString();
}
}
public class DataSourceManager implements ResourceManager {
private ResourceManagerInbound asyncWorker;
private Map<String, Resource> dataSourceCache = new ConcurrentHashMap<>();
public void setAsyncWorker(ResourceManagerInbound asyncWorker) {
this.asyncWorker = asyncWorker;
}
@Override
public Long branchRegister(BranchType branchType, String resourceId, String clientId,
String xid, String lockKeys) throws TransactionException {
try {
BranchRegisterRequest request = new BranchRegisterRequest();
request.setTransactionId(XID.getTransactionId(xid));
request.setLockKey(lockKeys);
request.setResourceId(resourceId);
request.setBranchType(branchType);
BranchRegisterResponse response = (BranchRegisterResponse)
RmRpcClient.getInstance().sendMsgWithResponse(request);
if (response.getResultCode() == ResultCode.Failed) {
throw new TransactionException(response.getTransactionExceptionCode(),
"Response[" + response.getMsg() + "]");
}
return response.getBranchId();
} catch (TimeoutException toe) {
throw new TransactionException(TransactionExceptionCode.IO, "RPC Timeout", toe);
} catch (RuntimeException rex) {
throw new TransactionException(TransactionExceptionCode.BranchRegisterFailed,
"Runtime", rex);
}
}
}
说明:
- 分支事务的注册过程是通过DataSourceManager的branchRegister()实现的。
- branchRegister的内部逻辑先组装BranchRegisterRequest并发送给TC。
- branchRegister的内部逻辑后接收TC的响应BranchRegisterResponse并返回执行结果。
- 核心的lockKeys的拼接逻辑是拼接所有lockKeysBuffer的内容,lockKeysBuffer是所有受影响行拼接的字符串,相当于整合二次索引的感觉。
RM 持久化回滚日志
public final class UndoLogManager {
private static final Logger LOGGER = LoggerFactory.getLogger(UndoLogManager.class);
private static String UNDO_LOG_TABLE_NAME = "undo_log";
private static String INSERT_UNDO_LOG_SQL = "INSERT INTO " + UNDO_LOG_TABLE_NAME + "\n" +
"\t(branch_id, xid, rollback_info, log_status, log_created, log_modified)\n" +
"VALUES (?, ?, ?, 0, now(), now())";
private static String DELETE_UNDO_LOG_SQL = "DELETE FROM " + UNDO_LOG_TABLE_NAME + "\n" +
"\tWHERE branch_id = ? AND xid = ?";
private static String SELECT_UNDO_LOG_SQL = "SELECT * FROM "
+ UNDO_LOG_TABLE_NAME + " WHERE log_status = 0 AND branch_id = ? AND xid = ? FOR UPDATE";
private UndoLogManager() {
}
public static void flushUndoLogs(ConnectionProxy cp) throws SQLException {
assertDbSupport(cp.getDbType());
ConnectionContext connectionContext = cp.getContext();
String xid = connectionContext.getXid();
long branchID = connectionContext.getBranchId();
// 组装分支回滚日志对象
BranchUndoLog branchUndoLog = new BranchUndoLog();
branchUndoLog.setXid(xid);
branchUndoLog.setBranchId(branchID);
branchUndoLog.setSqlUndoLogs(connectionContext.getUndoItems());
// 序列化回滚日志对象
String undoLogContent = UndoLogParserFactory.getInstance().encode(branchUndoLog);
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("Flushing UNDO LOG: " + undoLogContent);
}
// 保存数据库回滚日志对象
PreparedStatement pst = null;
try {
pst = cp.getTargetConnection().prepareStatement(INSERT_UNDO_LOG_SQL);
pst.setLong(1, branchID);
pst.setString(2, xid);
pst.setBlob(3, BlobUtils.string2blob(undoLogContent));
pst.executeUpdate();
} catch (Exception e) {
if (e instanceof SQLException) {
throw (SQLException) e;
} else {
throw new SQLException(e);
}
} finally {
if (pst != null) {
pst.close();
}
}
}
}
说明:
- 构建回滚日志的逻辑核心在于组装数据结果序列化后保存到数据表当中。
- 组装分支回滚日志对象。
- 序列化回滚日志对象。
- 保存数据库回滚日志对象。
RM分支事务状态汇报
public class ConnectionProxy extends AbstractConnectionProxy {
private void report(boolean commitDone) throws SQLException {
int retry = 5; // TODO: configure
while (retry > 0) {
try {
DataSourceManager.get().branchReport(context.getXid(), context.getBranchId(),
(commitDone ? BranchStatus.PhaseOne_Done : BranchStatus.PhaseOne_Failed), null);
return;
} catch (Throwable ex) {
LOGGER.error("Failed to report [" + context.getBranchId() +
"/" + context.getXid() + "] commit done [" +
commitDone + "] Retry Countdown: " + retry);
retry--;
if (retry == 0) {
throw new SQLException("Failed to report branch status " + commitDone, ex);
}
}
}
}
}
public class DataSourceManager implements ResourceManager {
public void branchReport(String xid, long branchId, BranchStatus status, String applicationData)
throws TransactionException {
try {
BranchReportRequest request = new BranchReportRequest();
request.setTransactionId(XID.getTransactionId(xid));
request.setBranchId(branchId);
request.setStatus(status);
request.setApplicationData(applicationData);
BranchReportResponse response = (BranchReportResponse)
RmRpcClient.getInstance().sendMsgWithResponse(request);
if (response.getResultCode() == ResultCode.Failed) {
throw new TransactionException(response.getTransactionExceptionCode(),
"Response[" + response.getMsg() + "]");
}
} catch (TimeoutException toe) {
throw new TransactionException(TransactionExceptionCode.IO, "RPC Timeout", toe);
} catch (RuntimeException rex) {
throw new TransactionException(TransactionExceptionCode.BranchReportFailed, "Runtime", rex);
}
}
说明:
- 分支事务的report过程是通过DataSourceManager的branchReport()实现的。
- branchReport的内部逻辑先组装BranchReportRequest并发送给TC。
- branchReport的内部逻辑后接收TC的响应BranchReportResponse并返回执行结果。
期待
下一篇文章会尝试分析RM回滚事务流程。