Fescar - RM Proxy创建过程

简介: 开篇 这篇文章的主要是目的是解释清楚DataSourceProxy、ConnectionProxy、StatementProxy几个Proxy代理各自的实现以及相互之间的联系。 希望通过这篇文章,能够解释清楚一个核心问题,就是上述的三个Proxy是如何实现代理并同时保证和原有的JDBC的数据访问逻辑保持不变。

开篇

 这篇文章的主要是目的是解释清楚DataSourceProxy、ConnectionProxy、StatementProxy几个Proxy代理各自的实现以及相互之间的联系。

 希望通过这篇文章,能够解释清楚一个核心问题,就是上述的三个Proxy是如何实现代理并同时保证和原有的JDBC的数据访问逻辑保持不变。

 当然,我个人觉得只有理清楚了三个代理的实现机制以后,才能更好的理解RM的工作原理。


JDBC使用方式

public class Demo {
        // 定义数据库的用户名
        private static final String USERNAME = "root";
        // 定义数据库的密码
        private static final String PASSWORD = "123456";
        // 定义数据库的驱动信息
        private static final String DRIVER = "com.mysql.jdbc.Driver";
        // 定义访问数据库的地址
        private static final String URL = "jdbc:mysql://localhost:3306/mydb";
 
        // 定义访问数据库的连接
        private static Connection connection;
        // 定义sql语句的执行对象
        private static PreparedStatement pstmt;
        // 定义查询返回的结果集合
        private static ResultSet resultSet;     
         
        public static void main(String[] args) throws SQLException {
             
             
            try {
                Class.forName(DRIVER);

                connection = DriverManager.getConnection(URL, USERNAME, PASSWORD);

                //用事务,必须设置setAutoCommit false,表示手动提交
                connection.setAutoCommit(false);

                //设置事务的隔离级别。
                connection.setTransactionIsolation(Connection.TRANSACTION_REPEATABLE_READ);
                 
                String sql1 = "insert into userinfo(username,pswd) values(?,?)";
                String sql2 = "update userinfo set pswd=? where username = ?";
                pstmt = connection.prepareStatement(sql1);
                pstmt.setString(1, "CAROL");
                pstmt.setString(2, "123");
                pstmt.executeUpdate();
                 
                pstmt = connection.prepareStatement(sql2);
                pstmt.setString(1, "123456");
                pstmt.setString(2, "nicole");               
                pstmt.executeUpdate();
                //提交事务
                connection.commit();
            } catch (Exception e) {
                // 若事务执行有异常,则事务回滚
                connection.rollback();
          }
}

说明:

  • JDBC的使用方式按照获取驱动、获取连接、创建回话、执行操作的顺序执行。
  • RM Proxy的执行原理和JDBC的使用方式基本一致。
  • JDBC的事务操作也在上述的例子当中,事务在Connection维度。


    <bean name="accountDataSource" class="com.alibaba.druid.pool.DruidDataSource"
          init-method="init" destroy-method="close">
        <property name="url" value="${jdbc.account.url}"/>
        <property name="username" value="${jdbc.account.username}"/>
        <property name="password" value="${jdbc.account.password}"/>
        <property name="driverClassName" value="${jdbc.account.driver}"/>
        <property name="initialSize" value="0" />
        <property name="maxActive" value="180" />
        <property name="minIdle" value="0" />
        <property name="maxWait" value="60000" />
        <property name="validationQuery" value="Select 'x' from DUAL" />
        <property name="testOnBorrow" value="false" />
        <property name="testOnReturn" value="false" />
        <property name="testWhileIdle" value="true" />
        <property name="timeBetweenEvictionRunsMillis" value="60000" />
        <property name="minEvictableIdleTimeMillis" value="25200000" />
        <property name="removeAbandoned" value="true" />
        <property name="removeAbandonedTimeout" value="1800" />
        <property name="logAbandoned" value="true" />
        <property name="filters" value="mergeStat" />
    </bean>

    <bean id="accountDataSourceProxy" class="com.alibaba.fescar.rm.datasource.DataSourceProxy">
        <constructor-arg ref="accountDataSource" />
    </bean>

    <bean id="jdbcTemplate" class="org.springframework.jdbc.core.JdbcTemplate">
        <property name="dataSource" ref="accountDataSourceProxy" />
    </bean>

说明:

  • 上述配置来自example例子中的dubbo-account-service.xml文件。
  • 上述核心在于创建了DruidDataSource对象accountDataSource。
  • accountDataSource的作用和JDBC中的DriverManager相同。
  • 从DataSource中获取Connection对象,从Connection对象获取Statement对象。


DataSourceProxy

DataSourceProxy.png

说明:

  • DataSource是与数据源对象代表物理数据源连接的工厂,接口由驱动程序供应商实现。
  • 系统API接口DataSource的介绍参考javax.sql.DataSource之api学习
  • 抽象类AbstractDataSourceProxy实现了DataSource的大部分接口(不包括Connection操作相关接口)。
  • DataSourceProxy实现了核心的接口getConnection实现对于返回Connection代理包装。


DataSourceProxy源码介绍

public abstract class AbstractDataSourceProxy implements DataSource {

    protected DruidDataSource targetDataSource;

    public AbstractDataSourceProxy(DruidDataSource targetDataSource) {
        this.targetDataSource = targetDataSource;
    }

    public DruidDataSource getTargetDataSource() {
        return targetDataSource;
    }

    @Override
    public <T> T unwrap(Class<T> iface) throws SQLException {
        return targetDataSource.unwrap(iface);
    }

    @Override
    public boolean isWrapperFor(Class<?> iface) throws SQLException {
        return targetDataSource.isWrapperFor(iface);
    }

    @Override
    public PrintWriter getLogWriter() throws SQLException {
        return targetDataSource.getLogWriter();
    }

    @Override
    public void setLogWriter(PrintWriter out) throws SQLException {
        targetDataSource.setLogWriter(out);
    }

    @Override
    public void setLoginTimeout(int seconds) throws SQLException {
        targetDataSource.setLoginTimeout(seconds);
    }

    @Override
    public int getLoginTimeout() throws SQLException {
        return targetDataSource.getLoginTimeout();
    }

    @Override
    public Logger getParentLogger() throws SQLFeatureNotSupportedException {
        return targetDataSource.getParentLogger();
    }
}
public class DataSourceProxy extends AbstractDataSourceProxy implements Resource {

    private String resourceGroupId = "DEFAULT";

    private boolean managed = false;
    
    // 通过构造函数实现代理对象创建
    public DataSourceProxy(DruidDataSource targetDataSource) {
        super(targetDataSource);
    }

    public DataSourceProxy(DruidDataSource targetDataSource, String resourceGroupId) {
        super(targetDataSource);
        this.resourceGroupId = resourceGroupId;
    }

    private void assertManaged() {
        if (!managed) {
            DataSourceManager.get().registerResource(this);
            managed = true;
        }
    }

    public Connection getPlainConnection() throws SQLException {
        return targetDataSource.getConnection();
    }

    public String getDbType() {
        return targetDataSource.getDbType();
    }

    @Override
    public ConnectionProxy getConnection() throws SQLException {
        assertManaged();
        Connection targetConnection = targetDataSource.getConnection();
        return new ConnectionProxy(this, targetConnection, targetDataSource.getDbType());
    }

    @Override
    public ConnectionProxy getConnection(String username, String password) throws SQLException {
        assertManaged();
        Connection targetConnection = targetDataSource.getConnection(username, password);
        return new ConnectionProxy(this, targetConnection, targetDataSource.getDbType());
    }

    @Override
    public String getResourceGroupId() {
        return resourceGroupId;
    }

    @Override
    public String getResourceId() {
        return targetDataSource.getUrl();
    }
}

说明:

  • AbstractDataSourceProxy实现了DataSource的接口
  • DataSourceProxy继承自AbstractDataSourceProxy,重写了getConnection的方法。
  • DataSourceProxy的构造函数当中传入DruidDataSource targetDataSource
  • getConnection通过DataSource对象获取连接然后包装成ConnectionProxy对象返回
  • Connection targetConnection = targetDataSource.getConnection(username, password)
  • return new ConnectionProxy(this, targetConnection, targetDataSource.getDbType())
  • DataSourceProxy返回ConnectionProxy对象


ConnectionProxy

ConnectionProxy.png

说明:

  • ConnectionProxy继承自AbstractConnectionProxy,AbstractConnectionProxy实现Connection接口。
  • AbstractConnectionProxy实现了Connection的大部分接口(不包括commit操作相关接口)。
  • ConnectionProxy实现了Connection的部分接口(commit相关操作的接口)。


ConnectionProxy源码介绍

public abstract class AbstractConnectionProxy implements Connection {

    protected DataSourceProxy dataSourceProxy;
    protected Connection targetConnection;
    protected String dbType;

    public AbstractConnectionProxy(DataSourceProxy dataSourceProxy, Connection targetConnection, String dbType) {
        this.dataSourceProxy = dataSourceProxy;
        this.targetConnection = targetConnection;
        this.dbType = dbType;
    }

    public DataSourceProxy getDataSourceProxy() {
        return dataSourceProxy;
    }

    public Connection getTargetConnection() {
        return targetConnection;
    }

    public String getDbType() {
        return dbType;
    }

    @Override
    public Statement createStatement() throws SQLException {
        Statement targetStatement = getTargetConnection().createStatement();
        return new StatementProxy(this, targetStatement);
    }

    @Override
    public PreparedStatement prepareStatement(String sql) throws SQLException {
        PreparedStatement targetPreparedStatement = getTargetConnection().prepareStatement(sql);
        return new PreparedStatementProxy(this, targetPreparedStatement, sql);
    }

    @Override
    public CallableStatement prepareCall(String sql) throws SQLException {
        RootContext.assertNotInGlobalTransaction();
        return targetConnection.prepareCall(sql);
    }
}
public class ConnectionProxy extends AbstractConnectionProxy {

    private ConnectionContext context = new ConnectionContext();

    public ConnectionProxy(DataSourceProxy dataSourceProxy, Connection targetConnection, String dbType) {
        super(dataSourceProxy, targetConnection, dbType);
    }

    @Override
    public void commit() throws SQLException {
        if (context.inGlobalTransaction()) {
            try {
                register();
            } catch (TransactionException e) {
                recognizeLockKeyConflictException(e);
            }

            try {
                if (context.hasUndoLog()) {
                    UndoLogManager.flushUndoLogs(this);
                }
                targetConnection.commit();
            } catch (Throwable ex) {
                report(false);
                if (ex instanceof SQLException) {
                    throw (SQLException) ex;
                } else {
                    throw new SQLException(ex);
                }
            }
            report(true);
            context.reset();

        } else {
            targetConnection.commit();
        }
    }

    private void register() throws TransactionException {
        Long branchId = DataSourceManager.get().branchRegister(BranchType.AT, getDataSourceProxy().getResourceId(),
            null, context.getXid(), context.buildLockKeys());
        context.setBranchId(branchId);
    }

    @Override
    public void rollback() throws SQLException {
        targetConnection.rollback();
        if (context.inGlobalTransaction()) {
            if (context.isBranchRegistered()) {
                report(false);
            }}
        context.reset();
    }

}

说明:

  • AbstractConnectionProxy类包含DataSourceProxy和targetConnection两个核心变量。
  • ConnectionProxy继承AbstractConnectionProxy类,ConnectionProxy的构造函数初始化核心变量。
  • AbstractConnectionProxy重写createStatement和prepareStatement方法
  • createStatement()创建StatementProxy对象, prepareStatement()创建PreparedStatementProxy对象
  • ConnectionProxy内部实现了Statement对象代理


StatementProxy

StatementProxy.png
说明:

  • StatementProxy继承自AbstractStatementProxy类,AbstractStatementProxy实现Statement接口。
  • AbstractStatementProxy实现了Statement的大部分接口不包括executeQuery相关的接口。
  • StatementProxy实现了Statement的execute相关的接口。


StatementProxy源码介绍

public abstract class AbstractStatementProxy<T extends Statement> implements Statement {

    protected AbstractConnectionProxy connectionProxy;
    protected T targetStatement;
    protected String targetSQL;

    public AbstractStatementProxy(AbstractConnectionProxy connectionProxy, T targetStatement, String targetSQL) 
     throws SQLException {
        this.connectionProxy = connectionProxy;
        this.targetStatement = targetStatement;
        this.targetSQL = targetSQL;
    }

    public AbstractStatementProxy(ConnectionProxy connectionProxy, T targetStatement) 
     throws SQLException {
        this(connectionProxy, targetStatement, null);
    }

    public AbstractConnectionProxy getConnectionProxy() {
        return connectionProxy;
    }

    public T getTargetStatement() {
        return targetStatement;
    }

    public String getTargetSQL() {
        return targetSQL;
    }
}
public class StatementProxy<T extends Statement> extends AbstractStatementProxy<T> {

    public StatementProxy(AbstractConnectionProxy connectionWrapper, T targetStatement, String targetSQL) 
     throws SQLException {
        super(connectionWrapper, targetStatement, targetSQL);
    }

    public StatementProxy(AbstractConnectionProxy connectionWrapper, T targetStatement) 
     throws SQLException {
        this(connectionWrapper, targetStatement, null);
    }

    @Override
    public ConnectionProxy getConnectionProxy() {
        return (ConnectionProxy) super.getConnectionProxy();
    }

    @Override
    public ResultSet executeQuery(String sql) throws SQLException {
        this.targetSQL = sql;
        return ExecuteTemplate.execute(this, new StatementCallback<ResultSet, T>() {
            @Override
            public ResultSet execute(Statement statement, Object... args) throws SQLException {
                return statement.executeQuery((String) args[0]);
            }
        }, sql);
    }

    @Override
    public int executeUpdate(String sql) throws SQLException {
        this.targetSQL = sql;
        return ExecuteTemplate.execute(this, new StatementCallback<Integer, T>() {
            @Override
            public Integer execute(Statement statement, Object... args) throws SQLException {
                return statement.executeUpdate((String) args[0]);
            }
        }, sql);
    }

    @Override
    public boolean execute(String sql) throws SQLException {
        // TODO
        return false;
    }
}

说明:

  • AbstractStatementProxy包含connectionProxy和targetStatement核心变量。
  • StatementProxy继承AbstractStatementProxy类,构造函数中初始化核心的两个变量。
  • StatementProxy实现核心的executeUpdate和executeQuery操作。


期待

 下一篇文章尝试讲解ExecuteTemplate这个核心操作,然后整个执行链路就串联起来了,有了完整的执行链路,分析RM的执行过程也就水到渠成了。

目录
相关文章
|
IDE Go 开发工具
etcd源码分析 - 5.【打通核心流程】EtcdServer消息的处理函数
在上一讲,我们梳理了`EtcdServer`的关键函数`processInternalRaftRequestOnce`里的四个细节。 其中,`wait.Wait`组件使用里,我们还遗留了一个细节实现,也就是请求的处理结果是怎么通过channel返回的。
113 0
etcd源码分析 - 5.【打通核心流程】EtcdServer消息的处理函数
|
3月前
|
缓存
EMR Remote Shuffle Service实践问题之Mapper的首次PushData请求如何解决
EMR Remote Shuffle Service实践问题之Mapper的首次PushData请求如何解决
|
3月前
|
SQL 测试技术 流计算
EMR Remote Shuffle Service实践问题之Leader节点变化导致的中断如何解决
EMR Remote Shuffle Service实践问题之Leader节点变化导致的中断如何解决
|
3月前
|
缓存 NoSQL Java
【Azure Redis 缓存】云服务Worker Role中调用StackExchange.Redis,遇见莫名异常(RedisConnectionException: UnableToConnect on xxx 或 No connection is available to service this operation: xxx)
【Azure Redis 缓存】云服务Worker Role中调用StackExchange.Redis,遇见莫名异常(RedisConnectionException: UnableToConnect on xxx 或 No connection is available to service this operation: xxx)
|
5月前
|
Java 微服务 Spring
Seata 客户端需要同时启动 TM 和 RM 吗?
Seata 客户端需要同时启动 TM 和 RM 吗?
|
6月前
|
监控 Java 数据库
Zabbix【部署 05】 Docker部署Zabbix Server Agent Agent2 Web interface及 Java-Gate-Way(详细启动脚本及踩坑记录)不定时更新
Zabbix【部署 05】 Docker部署Zabbix Server Agent Agent2 Web interface及 Java-Gate-Way(详细启动脚本及踩坑记录)不定时更新
527 0
|
jenkins 应用服务中间件 持续交付
Jenkins+Gitlab+Nginx实现自动发布与回退基于tag版本的静态项目(解决重复构建问题)
Jenkins+Gitlab+Nginx实现自动发布与回退基于tag版本的静态项目(解决重复构建问题)
236 0
|
Kubernetes 监控 容器
etcd源码分析 - 1.【打通核心流程】etcd server的启动流程
在第一阶段,我将从主流程出发,讲述一个`PUT`指令是怎么将数据更新到`etcd server`中的。今天,我们先来看看server是怎么启动的。
173 0
|
消息中间件 前端开发 Java
阿里中间件seata源码剖析一:聊聊RM和TM客户端初始化
阿里中间件seata源码剖析一:聊聊RM和TM客户端初始化
251 8
阿里中间件seata源码剖析一:聊聊RM和TM客户端初始化
|
Java 微服务 Spring
Seata 客户端需要同时启动 RM 和 TM 吗?
在分析启动部分源码时,我发现 GlobalTransactionScanner 会同时启动 RM 和 TM client,但根据 Seata 的设计来看,TM 负责全局事务的操作,如果一个服务中不需要开启全局事务,此时是不需要启动 TM client的,也就是说项目中如果没有全局事务注解,此时是不是就不需要初始化 TM client 了,因为不是每个微服务,都需要 GlobalTransactional,它此时仅仅作为一个 RM client 而已。
147 0
Seata 客户端需要同时启动 RM 和 TM 吗?