开篇
这篇文章的主要是目的是解释清楚DataSourceProxy、ConnectionProxy、StatementProxy几个Proxy代理各自的实现以及相互之间的联系。
希望通过这篇文章,能够解释清楚一个核心问题,就是上述的三个Proxy是如何实现代理并同时保证和原有的JDBC的数据访问逻辑保持不变。
当然,我个人觉得只有理清楚了三个代理的实现机制以后,才能更好的理解RM的工作原理。
JDBC使用方式
public class Demo {
// 定义数据库的用户名
private static final String USERNAME = "root";
// 定义数据库的密码
private static final String PASSWORD = "123456";
// 定义数据库的驱动信息
private static final String DRIVER = "com.mysql.jdbc.Driver";
// 定义访问数据库的地址
private static final String URL = "jdbc:mysql://localhost:3306/mydb";
// 定义访问数据库的连接
private static Connection connection;
// 定义sql语句的执行对象
private static PreparedStatement pstmt;
// 定义查询返回的结果集合
private static ResultSet resultSet;
public static void main(String[] args) throws SQLException {
try {
Class.forName(DRIVER);
connection = DriverManager.getConnection(URL, USERNAME, PASSWORD);
//用事务,必须设置setAutoCommit false,表示手动提交
connection.setAutoCommit(false);
//设置事务的隔离级别。
connection.setTransactionIsolation(Connection.TRANSACTION_REPEATABLE_READ);
String sql1 = "insert into userinfo(username,pswd) values(?,?)";
String sql2 = "update userinfo set pswd=? where username = ?";
pstmt = connection.prepareStatement(sql1);
pstmt.setString(1, "CAROL");
pstmt.setString(2, "123");
pstmt.executeUpdate();
pstmt = connection.prepareStatement(sql2);
pstmt.setString(1, "123456");
pstmt.setString(2, "nicole");
pstmt.executeUpdate();
//提交事务
connection.commit();
} catch (Exception e) {
// 若事务执行有异常,则事务回滚
connection.rollback();
}
}
说明:
- JDBC的使用方式按照获取驱动、获取连接、创建回话、执行操作的顺序执行。
- RM Proxy的执行原理和JDBC的使用方式基本一致。
- JDBC的事务操作也在上述的例子当中,事务在Connection维度。
<bean name="accountDataSource" class="com.alibaba.druid.pool.DruidDataSource"
init-method="init" destroy-method="close">
<property name="url" value="${jdbc.account.url}"/>
<property name="username" value="${jdbc.account.username}"/>
<property name="password" value="${jdbc.account.password}"/>
<property name="driverClassName" value="${jdbc.account.driver}"/>
<property name="initialSize" value="0" />
<property name="maxActive" value="180" />
<property name="minIdle" value="0" />
<property name="maxWait" value="60000" />
<property name="validationQuery" value="Select 'x' from DUAL" />
<property name="testOnBorrow" value="false" />
<property name="testOnReturn" value="false" />
<property name="testWhileIdle" value="true" />
<property name="timeBetweenEvictionRunsMillis" value="60000" />
<property name="minEvictableIdleTimeMillis" value="25200000" />
<property name="removeAbandoned" value="true" />
<property name="removeAbandonedTimeout" value="1800" />
<property name="logAbandoned" value="true" />
<property name="filters" value="mergeStat" />
</bean>
<bean id="accountDataSourceProxy" class="com.alibaba.fescar.rm.datasource.DataSourceProxy">
<constructor-arg ref="accountDataSource" />
</bean>
<bean id="jdbcTemplate" class="org.springframework.jdbc.core.JdbcTemplate">
<property name="dataSource" ref="accountDataSourceProxy" />
</bean>
说明:
- 上述配置来自example例子中的dubbo-account-service.xml文件。
- 上述核心在于创建了DruidDataSource对象accountDataSource。
- accountDataSource的作用和JDBC中的DriverManager相同。
- 从DataSource中获取Connection对象,从Connection对象获取Statement对象。
DataSourceProxy
说明:
- DataSource是与数据源对象代表物理数据源连接的工厂,接口由驱动程序供应商实现。
- 系统API接口DataSource的介绍参考javax.sql.DataSource之api学习。
- 抽象类AbstractDataSourceProxy实现了DataSource的大部分接口(不包括Connection操作相关接口)。
- DataSourceProxy实现了核心的接口getConnection实现对于返回Connection代理包装。
DataSourceProxy源码介绍
public abstract class AbstractDataSourceProxy implements DataSource {
protected DruidDataSource targetDataSource;
public AbstractDataSourceProxy(DruidDataSource targetDataSource) {
this.targetDataSource = targetDataSource;
}
public DruidDataSource getTargetDataSource() {
return targetDataSource;
}
@Override
public <T> T unwrap(Class<T> iface) throws SQLException {
return targetDataSource.unwrap(iface);
}
@Override
public boolean isWrapperFor(Class<?> iface) throws SQLException {
return targetDataSource.isWrapperFor(iface);
}
@Override
public PrintWriter getLogWriter() throws SQLException {
return targetDataSource.getLogWriter();
}
@Override
public void setLogWriter(PrintWriter out) throws SQLException {
targetDataSource.setLogWriter(out);
}
@Override
public void setLoginTimeout(int seconds) throws SQLException {
targetDataSource.setLoginTimeout(seconds);
}
@Override
public int getLoginTimeout() throws SQLException {
return targetDataSource.getLoginTimeout();
}
@Override
public Logger getParentLogger() throws SQLFeatureNotSupportedException {
return targetDataSource.getParentLogger();
}
}
public class DataSourceProxy extends AbstractDataSourceProxy implements Resource {
private String resourceGroupId = "DEFAULT";
private boolean managed = false;
// 通过构造函数实现代理对象创建
public DataSourceProxy(DruidDataSource targetDataSource) {
super(targetDataSource);
}
public DataSourceProxy(DruidDataSource targetDataSource, String resourceGroupId) {
super(targetDataSource);
this.resourceGroupId = resourceGroupId;
}
private void assertManaged() {
if (!managed) {
DataSourceManager.get().registerResource(this);
managed = true;
}
}
public Connection getPlainConnection() throws SQLException {
return targetDataSource.getConnection();
}
public String getDbType() {
return targetDataSource.getDbType();
}
@Override
public ConnectionProxy getConnection() throws SQLException {
assertManaged();
Connection targetConnection = targetDataSource.getConnection();
return new ConnectionProxy(this, targetConnection, targetDataSource.getDbType());
}
@Override
public ConnectionProxy getConnection(String username, String password) throws SQLException {
assertManaged();
Connection targetConnection = targetDataSource.getConnection(username, password);
return new ConnectionProxy(this, targetConnection, targetDataSource.getDbType());
}
@Override
public String getResourceGroupId() {
return resourceGroupId;
}
@Override
public String getResourceId() {
return targetDataSource.getUrl();
}
}
说明:
- AbstractDataSourceProxy实现了DataSource的接口
- DataSourceProxy继承自AbstractDataSourceProxy,重写了getConnection的方法。
- DataSourceProxy的构造函数当中传入DruidDataSource targetDataSource。
- getConnection通过DataSource对象获取连接然后包装成ConnectionProxy对象返回。
- Connection targetConnection = targetDataSource.getConnection(username, password)。
- return new ConnectionProxy(this, targetConnection, targetDataSource.getDbType())。
- DataSourceProxy返回ConnectionProxy对象。
ConnectionProxy
说明:
- ConnectionProxy继承自AbstractConnectionProxy,AbstractConnectionProxy实现Connection接口。
- AbstractConnectionProxy实现了Connection的大部分接口(不包括commit操作相关接口)。
- ConnectionProxy实现了Connection的部分接口(commit相关操作的接口)。
ConnectionProxy源码介绍
public abstract class AbstractConnectionProxy implements Connection {
protected DataSourceProxy dataSourceProxy;
protected Connection targetConnection;
protected String dbType;
public AbstractConnectionProxy(DataSourceProxy dataSourceProxy, Connection targetConnection, String dbType) {
this.dataSourceProxy = dataSourceProxy;
this.targetConnection = targetConnection;
this.dbType = dbType;
}
public DataSourceProxy getDataSourceProxy() {
return dataSourceProxy;
}
public Connection getTargetConnection() {
return targetConnection;
}
public String getDbType() {
return dbType;
}
@Override
public Statement createStatement() throws SQLException {
Statement targetStatement = getTargetConnection().createStatement();
return new StatementProxy(this, targetStatement);
}
@Override
public PreparedStatement prepareStatement(String sql) throws SQLException {
PreparedStatement targetPreparedStatement = getTargetConnection().prepareStatement(sql);
return new PreparedStatementProxy(this, targetPreparedStatement, sql);
}
@Override
public CallableStatement prepareCall(String sql) throws SQLException {
RootContext.assertNotInGlobalTransaction();
return targetConnection.prepareCall(sql);
}
}
public class ConnectionProxy extends AbstractConnectionProxy {
private ConnectionContext context = new ConnectionContext();
public ConnectionProxy(DataSourceProxy dataSourceProxy, Connection targetConnection, String dbType) {
super(dataSourceProxy, targetConnection, dbType);
}
@Override
public void commit() throws SQLException {
if (context.inGlobalTransaction()) {
try {
register();
} catch (TransactionException e) {
recognizeLockKeyConflictException(e);
}
try {
if (context.hasUndoLog()) {
UndoLogManager.flushUndoLogs(this);
}
targetConnection.commit();
} catch (Throwable ex) {
report(false);
if (ex instanceof SQLException) {
throw (SQLException) ex;
} else {
throw new SQLException(ex);
}
}
report(true);
context.reset();
} else {
targetConnection.commit();
}
}
private void register() throws TransactionException {
Long branchId = DataSourceManager.get().branchRegister(BranchType.AT, getDataSourceProxy().getResourceId(),
null, context.getXid(), context.buildLockKeys());
context.setBranchId(branchId);
}
@Override
public void rollback() throws SQLException {
targetConnection.rollback();
if (context.inGlobalTransaction()) {
if (context.isBranchRegistered()) {
report(false);
}}
context.reset();
}
}
说明:
- AbstractConnectionProxy类包含DataSourceProxy和targetConnection两个核心变量。
- ConnectionProxy继承AbstractConnectionProxy类,ConnectionProxy的构造函数初始化核心变量。
- AbstractConnectionProxy重写createStatement和prepareStatement方法。
- createStatement()创建StatementProxy对象, prepareStatement()创建PreparedStatementProxy对象。
- ConnectionProxy内部实现了Statement对象代理。
StatementProxy
说明:
- StatementProxy继承自AbstractStatementProxy类,AbstractStatementProxy实现Statement接口。
- AbstractStatementProxy实现了Statement的大部分接口不包括executeQuery相关的接口。
- StatementProxy实现了Statement的execute相关的接口。
StatementProxy源码介绍
public abstract class AbstractStatementProxy<T extends Statement> implements Statement {
protected AbstractConnectionProxy connectionProxy;
protected T targetStatement;
protected String targetSQL;
public AbstractStatementProxy(AbstractConnectionProxy connectionProxy, T targetStatement, String targetSQL)
throws SQLException {
this.connectionProxy = connectionProxy;
this.targetStatement = targetStatement;
this.targetSQL = targetSQL;
}
public AbstractStatementProxy(ConnectionProxy connectionProxy, T targetStatement)
throws SQLException {
this(connectionProxy, targetStatement, null);
}
public AbstractConnectionProxy getConnectionProxy() {
return connectionProxy;
}
public T getTargetStatement() {
return targetStatement;
}
public String getTargetSQL() {
return targetSQL;
}
}
public class StatementProxy<T extends Statement> extends AbstractStatementProxy<T> {
public StatementProxy(AbstractConnectionProxy connectionWrapper, T targetStatement, String targetSQL)
throws SQLException {
super(connectionWrapper, targetStatement, targetSQL);
}
public StatementProxy(AbstractConnectionProxy connectionWrapper, T targetStatement)
throws SQLException {
this(connectionWrapper, targetStatement, null);
}
@Override
public ConnectionProxy getConnectionProxy() {
return (ConnectionProxy) super.getConnectionProxy();
}
@Override
public ResultSet executeQuery(String sql) throws SQLException {
this.targetSQL = sql;
return ExecuteTemplate.execute(this, new StatementCallback<ResultSet, T>() {
@Override
public ResultSet execute(Statement statement, Object... args) throws SQLException {
return statement.executeQuery((String) args[0]);
}
}, sql);
}
@Override
public int executeUpdate(String sql) throws SQLException {
this.targetSQL = sql;
return ExecuteTemplate.execute(this, new StatementCallback<Integer, T>() {
@Override
public Integer execute(Statement statement, Object... args) throws SQLException {
return statement.executeUpdate((String) args[0]);
}
}, sql);
}
@Override
public boolean execute(String sql) throws SQLException {
// TODO
return false;
}
}
说明:
- AbstractStatementProxy包含connectionProxy和targetStatement核心变量。
- StatementProxy继承AbstractStatementProxy类,构造函数中初始化核心的两个变量。
- StatementProxy实现核心的executeUpdate和executeQuery操作。
期待
下一篇文章尝试讲解ExecuteTemplate这个核心操作,然后整个执行链路就串联起来了,有了完整的执行链路,分析RM的执行过程也就水到渠成了。