Seata-AT模式+TDDL:排查 构建Insert操作的afterImage时执行SELECT LAST_INSERT_ID()报错

本文涉及的产品
云数据库 RDS MySQL Serverless,0.5-2RCU 50GB
云数据库 RDS MySQL Serverless,价值2615元额度,1个月
简介: 构建 afterImage的时候,prepareStatement是复用了Insert操作的prepareStatement,而prepareStatement逻辑中,会在执行 sql 的时候会把参数设置一遍;由于未清空参数,只把 sql 变成了 SELECT LAST_INSERT_ID() ,给没有占位符的 sql 指定参数,就引发了错误
我是石页兄,朋友不因远而疏,高山不隔友谊情;偶遇美羊羊,我们互相鼓励
欢迎关注微信公众号「架构染色」交流和学习

一、问题

代码环境在第三部分。

1)错误信息:

java.sql.SQLException: Parameter index out of range (1 > number of parameters, which is 0).

2)错误现场:

MySQLInsertExecutor#getPkValuesByAuto中,在执行genKeys = statementProxy.getTargetStatement().executeQuery("SELECT LAST_INSERT_ID()");时出现错误:java.sql.SQLException: Parameter index out of range (1 > number of parameters, which is 0).

3)疑问:

"SELECT LAST_INSERT_ID()" 语句确没有参数,那么引发报错的参数从哪里来的呢?

二、故障原因排查

2.1 debug 故障环节

通过 debug 进入到TGroupPreparedStatement#executeQueryOnConnection中(注意:TGroupPreparedStatement是 TDDL 组件中的),看一下里边是什么操作

@Override
protected ResultSet executeQueryOnConnection(Connection conn, String sql)
      throws SQLException {
   PreparedStatement ps = createPreparedStatementInternal(conn, sql);
   Parameters.setParameters(ps, parameterSettings);
   this.currentResultSet = ps.executeQuery();
   return this.currentResultSet;
}

上下文的参数信息如下:

  • sql

    • SELECT LAST_INSERT_ID()
  • parameterSettings

    {Integer@10685} 1 -> {ParameterContext@15869} "setLong(1, 1010)" 
    {Integer@14902} 2 -> {ParameterContext@15870} "setInt(2, 1)" 
    {Integer@14904} 3 -> {ParameterContext@15871} "setTimestamp1(3, 2023-01-03 15:08:32.263)"

    看到这三个参数后,意识到原来它们是Insert记录的时候在prepareStatement中设置的 3 个参数:

INSERT INTO tstock (`sku_id`,`stock_num`,`gmt_created`) VALUES(?,?,?);

2.2 prepareStatement 为什么要设置参数呢?

prepareStatement接口继承自Statement接口,增加了参数占位符功能,当执行 SQL 语句时,可使用“?”作为参数占位符,然后使用其提供的其他方法为占位符设置参数值。其实例对象包含已编译的 SQL 语句,由于已预编译过,所以其执行速度要快于 Statement 对象。因此,多次执行的 SQL 语句经常创建为 PreparedStatement 对象,以提高效率。所以使用参数是很正常的现象。

2.3 原因初定

那么报错的直接原因是构建 afterImage的时候,prepareStatement 是复用了Insert操作的prepareStatement,而prepareStatement逻辑中,会在执行 sql 的时候会把参数设置一遍;由于未清空参数,只把 sql 从 INSERT INTO tstock (sku_id,stock_num,gmt_created) VALUES(?,?,?); 变成了 SELECT LAST_INSERT_ID() ,给没有占位符的 sql 指定参数,就引发了错误:java.sql.SQLException: Parameter index out of range (1 > number of parameters, which is 0).

2.4 处理思路

再来看一下错误现场

截图中可以看出,因为statementProxy.getGeneratedKeys();执行报错,才进入了executeQuery("SELECT LAST_INSERT_ID()")导致了报错,那么:

  1. getGeneratedKeys 是什么情况?这个操作是否可以不报错? (放到其他篇章补充)
  2. prepareStatement 在整个执行的上下文中的生命周期是怎样,此处是否有补偿处理的机会?

本篇先梳理 prepareStatement 在整个执行的上下文中的生命周期是怎样,尝试找一下补偿处理的办法。

三、代码环境梳理

Demo 代码环境是 Seata 全局注解事务中内嵌一个 Spring 注解事务

3.1 @GlobalTransactional 方法

@RequestMapping("createStock/{skuId}/{num}")
@ResponseBody
@GlobalTransactional
public StockDto createStock(@PathVariable Long skuId, @PathVariable Integer num){
    try {
        return  stockService.createStock(skuId,num);
    } catch (Exception e) {
        throw new RuntimeException(e);
    }
}

3.2 @Transactional 方法

@Transactional(rollbackFor = Exception.class,value = "testSeataProxyTransactionManager")
public StockDto createStock(Long skuId, Integer num) throws Exception {
    int delcount = seataProxyStockMapper.delete(skuId);
    System.out.println("delete stock count = "+delcount);
    Stock stock = new Stock(skuId,num);
    int count = seataProxyStockMapper.insert(stock);
    if(count==0){
        throw new Exception("创建库存失败");
    }
    Long id = seataProxyStockMapper.getId();
    StockDto stockDto = JSON.parseObject(JSON.toJSONString(stock),StockDto.class);
    stockDto.setId(id);
    return stockDto;
}

3.3 出问题的环节

Seata 在 seataProxyStockMapper.insert(stock); 环节,AT 模式下数据源代理逻辑中,insert 操作会把刚插入的数据构建成 afterImage ,问题就发生在这里。

其他的一些细节也不太重要,暂不描述。

四、@Transactional 的关键逻辑概述

org.springframework.transaction.interceptor.TransactionInterceptor#invoke中将 createStock 中的方法加上事务能力

@Override
public Object invoke(final MethodInvocation invocation) throws Throwable {
   Class<?> targetClass = (invocation.getThis() != null ? AopUtils.getTargetClass(invocation.getThis()) : null);

   // Adapt to TransactionAspectSupport's invokeWithinTransaction...
   return invokeWithinTransaction(invocation.getMethod(), targetClass, new InvocationCallback() {
      @Override
      public Object proceedWithInvocation() throws Throwable {
         return invocation.proceed();
      }
   });
}

事务能力在invokeWithinTransaction中,代码如下:

//1 创建事务
TransactionInfo txInfo = createTransactionIfNecessary(tm, txAttr, joinpointIdentification);

Object retVal;
try {
   // 2 执行标注了@Transactional的方法
   retVal = invocation.proceedWithInvocation();
}
catch (Throwable ex) {
   // 处理异常
   completeTransactionAfterThrowing(txInfo, ex);
   throw ex;
}
finally {
   //3. 清除事务上下文信息
   cleanupTransactionInfo(txInfo);
}
//4. 提交事务
commitTransactionAfterReturning(txInfo);

事务处理有三个核心步骤:

  1. 创建事务(并设置 autoCommit 设置为 false)

    org.springframework.jdbc.datasource.DataSourceTransactionManager#doBegin中会把 autoCommit 设置为 false。这个设置会对于下文 Seata 执行逻辑有影响,非常重要

    con.setAutoCommit(false);
  2. 执行标注了@Transactional 的方法

    需要关注的是以下三个 mapper 的方法调用,而这三个方法的执行就跟 Seata 的代理逻辑有关了,要梳理处理清楚 prepareStatement 在整个执行的上下文中的生命周期是怎样,mybatis 中的使用逻辑自然是非常重要,必须理清楚。另外因为开启了 Spring 的事务,所以需要注意到,这几个 mybatis 操作是会使用同一个connection对象

    • seataProxyStockMapper.delete(skuId);//构建 beforeImage,afterImage 是空
    • seataProxyStockMapper.insert(stock);//构建 afterImage,beforeImage 是空
    • seataProxyStockMapper.getId();//查询类操作,无数据变化不需要构建镜像
  3. 提交事务-commitTransactionAfterReturning

    这里是重点了,第六部分中进行分析。

五、mybatis 侧的调用逻辑开始

seataProxyStockMapper.delete(skuId)seataProxyStockMapper.insert(stock)都是对应与 mybatis 的代码逻辑org.apache.ibatis.executor.SimpleExecutor#doUpdate,在其中大家可看到三个重要且跟本篇问题密切相关的 JDBC 对象操作

@Override
public int doUpdate(MappedStatement ms, Object parameter) throws SQLException {
  Statement stmt = null;
  try {
    Configuration configuration = ms.getConfiguration();
    StatementHandler handler = configuration.newStatementHandler(this, ms, parameter, RowBounds.DEFAULT, null, null);
    stmt = prepareStatement(handler, ms.getStatementLog());// 1
    return handler.update(stmt);//2
  } finally {
    closeStatement(stmt);//3
  }
}
  1. 创建prepareStatement

    • prepareStatement(handler, ms.getStatementLog());
  2. 通过prepareStatement执行 sql

    • update(stmt);
  3. 释放prepareStatement

    • closeStatement(stmt);

这三步是 JDBC 执行 SQL 的基本操作,本篇上下文重点关注这期间 prepareStatement的创建与释放。

5.1 创建 prepareStatement

创建发生在 doUpdate中的prepareStatement方法内

stmt = prepareStatement(handler, ms.getStatementLog()); //1

源码在 org.apache.ibatis.executor.SimpleExecutor#prepareStatement,其中有 2 个关键操作

  1. 创建prepareStatement
  2. prepareStatement 设置参数,这便是参数的来源,也是本篇的关键之处
private Statement prepareStatement(StatementHandler handler, Log statementLog) throws SQLException {
  Statement stmt;
  Connection connection = getConnection(statementLog);
  stmt = handler.prepare(connection, transaction.getTimeout());
  handler.parameterize(stmt);
  return stmt;
}

其中创建prepareStatement的代码是经由 io.seata.rm.datasource.AbstractConnectionProxy#prepareStatement后,新建一个TGroupPreparedStatement

5.2 通过 prepareStatement 执行 sql

因为 Spring 的事务处理中将 autoCommit 设置为了 false,所以这边最终是执行了executeAutoCommitFalse方法,这是Seata AT 模式下的的关键方法,包含以下步骤:

  1. 构建 beforeImage
  2. 执行业务 sql(本篇出问题时是执行了 insert )
  3. 构建 afterImage
  4. 将 beforeImage 和 afterImage 构建成 undo_log
protected T executeAutoCommitFalse(Object[] args) throws Exception {
    //...
    TableRecords beforeImage = beforeImage();
    T result = statementCallback.execute(statementProxy.getTargetStatement(), args);
    int updateCount = statementProxy.getUpdateCount();
    if (updateCount > 0) {
        TableRecords afterImage = afterImage(beforeImage);
        prepareUndoLog(beforeImage, afterImage);
    }
    return result;
}

另外 本问题发生时的 sql 是 insert 类型,所以对应的 executor 是 MySQLInsertExecutor

1) beforeImage中如何使用prepareStatement

MySQLInsertExecutorbeforeImage 构建时,在buildTableRecords中查询记录时,是创建了一个新的prepareStatement

try (PreparedStatement ps = statementProxy.getConnection().prepareStatement(selectSQL)) {

对应的堆栈情况如下:

buildTableRecords:399, BaseTransactionalExecutor (io.seata.rm.datasource.exec)
beforeImage:60, DeleteExecutor (io.seata.rm.datasource.exec)
executeAutoCommitFalse:99, AbstractDMLBaseExecutor (io.seata.rm.datasource.exec)

2) 执行 sql 时,使用的是 mybatis 的逻辑内发起构建的一个prepareStatement

3) 构建 afterImage
对应BaseInsertExecutor#afterImage的源码

protected TableRecords afterImage(TableRecords beforeImage) throws SQLException {
    //1. 获取所有新插入记录的主键值集合
    Map<String, List<Object>> pkValues = getPkValues();
    //2. 根据主键查询刚插入的记录,构建成 TableRecords
    TableRecords afterImage = buildTableRecords(pkValues);
    if (afterImage == null) {
        throw new SQLException("Failed to build after-image for insert");
    }
    return afterImage;
}

这里是分 2 个步骤:

  1. 获取所有新插入记录的主键值集合

    • MySQLInsertExecutorafterImage 构建时,进入到MySQLInsertExecutor#getPkValuesByAuto中,正是报错的发生地,所使用的prepareStatement 也是执行 mapper的insert方法时构建的prepareStatement,因这个prepareStatement刚刚执行了 insert 操作,里边还存有 insert 操作所构建的参数(错误的诱因),这些参数用于执行查询 sql "SELECT LAST_INSERT_ID()" 时报错。

      genKeys = statementProxy.getTargetStatement().executeQuery("SELECT LAST_INSERT_ID()");
  2. 根据主键查询刚插入的记录

    • buildTableRecords方法中跟构建前镜像一样,是创建了新的PreparedStatement,并且通过try-with-resource 的方式保障这个PreparedStatement资源释放。

      try (PreparedStatement ps = statementProxy.getConnection().prepareStatement(selectSQLJoin.toString())) {

4) 将 beforeImage 和 afterImage 构建成 undo_log

BaseTransactionalExecutor#prepareUndoLog中完成 undo_log 的处理,从下边核心代码中可以看出 sqlUndoLog 是被 connectionProxy 对象的 appendUndoLog 方法处理,

String lockKeys = buildLockKey(lockKeyRecords);
if (null != lockKeys) {
    connectionProxy.appendLockKey(lockKeys);

    SQLUndoLog sqlUndoLog = buildUndoItem(beforeImage, afterImage);
    connectionProxy.appendUndoLog(sqlUndoLog);
}

appendUndoLog内部是将 undo_log 添加到了ConnectionContext中,处理逻辑是在io.seata.rm.datasource.ConnectionContext#appendUndoItem

void appendUndoItem(SQLUndoLog sqlUndoLog) {
    sqlUndoItemsBuffer.computeIfAbsent(currentSavepoint, k -> new ArrayList<>()).add(sqlUndoLog);
}

源码可知,这个环节并没有 insert undo_log 的操作,真实插入 undo_log 的逻辑是在io.seata.rm.datasource.undo.UndoLogManager#flushUndoLogs内,触发时机是在io.seata.rm.datasource.ConnectionProxy#commit

  1. 插入 undo_log 的prepareStatement是哪个?

    • 1io.seata.rm.datasource.undo.mysql.MySQLUndoLogManager#insertUndoLog,是新建了一个prepareStatement

      try (PreparedStatement pst = conn.prepareStatement(INSERT_UNDO_LOG_SQL))
  2. 插入 undo_log 的 connection 是哪个?

    • 从下边源码中可知使用的是 ConnectionProxy 中的 connection,本篇的场景中是 Spring 事务中的第一个mapper所创建的 connection对象(整个 Spring 事务中都使用这一个connection对象)

      insertUndoLogWithNormal(xid, branchId, buildContext(parser.getName(), compressorType), undoLogContent, cp.getTargetConnection());

5.3 释放prepareStatement?No

closeStatement(stmt)的代码是在 org.apache.ibatis.executor.BaseExecutor#closeStatement中,如下

   protected void closeStatement(Statement statement) {
     if (statement != null) {
       try {
         if (!statement.isClosed()) {
           statement.close();
         }
       } catch (SQLException e) {
         // ignore
       }
     }
   }

closeStatement(...) 内会先判断 statement 是否已关闭,若未关闭才关闭。 不巧的是下边TGroupPreparedStatement#isClosed代码中抛了异常:

public boolean isClosed() throws SQLException {
   throw new SQLException("not support exception");
}

所以closeStatement中并没有真实的关闭PreparedStatement,现在看来能关闭PreparedStatement的地方只能推测为,当Spring事务结束后,关闭connection的时候顺带把其创建的PreparedStatement也释放掉了。接下来一起来看下。

六、Spring 提交事务后,释放 Connection 资源

Spring 提交事务的方法 commitTransactionAfterReturning,其调用堆栈有点深,从下边调用堆栈中可以看出,在commitTransactionAfterReturning方法中,最终会执行ConnectionProxy#close方法,从而把 connection 资源释放掉。

close:147, AbstractConnectionProxy (io.seata.rm.datasource)
doCloseConnection:348, DataSourceUtils (org.springframework.jdbc.datasource)
doReleaseConnection:335, DataSourceUtils (org.springframework.jdbc.datasource)
releaseConnection:302, DataSourceUtils (org.springframework.jdbc.datasource)
doCleanupAfterCompletion:370, DataSourceTransactionManager (org.springframework.jdbc.datasource)
cleanupAfterCompletion:1021, AbstractPlatformTransactionManager (org.springframework.transaction.support)
processCommit:815, AbstractPlatformTransactionManager (org.springframework.transaction.support)
commit:734, AbstractPlatformTransactionManager (org.springframework.transaction.support)
commitTransactionAfterReturning:521, TransactionAspectSupport (org.springframework.transaction.interceptor)
invokeWithinTransaction:293, TransactionAspectSupport (org.springframework.transaction.interceptor)

创建 Connection 的地方有setAutoCommit(false),而对应反向操作setAutoCommit(true)则在doCleanupAfterCompletion中,源码如下:

protected void doCleanupAfterCompletion(Object transaction) {
    DataSourceTransactionManager.DataSourceTransactionObject txObject = (DataSourceTransactionManager.DataSourceTransactionObject)transaction;
    if (txObject.isNewConnectionHolder()) {
        TransactionSynchronizationManager.unbindResource(this.dataSource);
    }

    Connection con = txObject.getConnectionHolder().getConnection();

    try {
        if (txObject.isMustRestoreAutoCommit()) {
            //1.设置commit 为true
            con.setAutoCommit(true);
        }

        DataSourceUtils.resetConnectionAfterTransaction(con, txObject.getPreviousIsolationLevel());
    } catch (Throwable var5) {
        this.logger.debug("Could not reset JDBC Connection after transaction", var5);
    }

    if (txObject.isNewConnectionHolder()) {
        if (this.logger.isDebugEnabled()) {
            this.logger.debug("Releasing JDBC Connection [" + con + "] after transaction");
        }
        //2. 释放链接
        DataSourceUtils.releaseConnection(con, this.dataSource);
    }

    txObject.getConnectionHolder().clear();
}

以上 doCleanupAfterCompletion 中有两个重要事项

  1. con.setAutoCommit(true);
  2. DataSourceUtils.releaseConnection(con, this.dataSource);

    TGroupConnection#close 中会关闭所有的statement,代码如下:

    public void close() throws SQLException {
        ...
        // 关闭statement
        for (TGroupStatement stmt : openedStatements) {
         try {
            stmt.close(false);
         } catch (SQLException e) {
            exceptions.add(e);
         }
        }
        ...
        if (wBaseConnection != null && !wBaseConnection.isClosed()) {
           wBaseConnection.close();
        }

七、小结

在本篇案例的上下文中,connection 的创建与释放 以及其创建的 statement 的创建以及释放都梳理了。情况总结一下:

  1. @Transactional注解,开启了 Spring 的事务
  2. 在此事务中创建了一个 Connection 对象
  3. 在多个 sql 执行的过程中,通过此 Connection 对象创建了多个PreparedStatement对象,有些PreparedStatement对象在使用后就立即释放了。
  4. Spring事务结束的时候释放connection,以及connection中剩余的PreparedStatement对象

从本次 Demo 上下文来看,对于报错之处statementProxy.getTargetStatement().executeQuery("SELECT LAST_INSERT_ID()");来说以下两种修复方案似乎都可以考虑:

  1. 执行SELECT LAST_INSERT_ID()时,新建一个PreparedStatement
  2. 若复用PreparedStatement,也可用clearParameters()方法将 Insert 时设置的三个参数清除掉

后续会对 SELECT LAST_INSERT_ID() 做进一步的调研,并结合更多的场景验证修复方案的可行性。

以上两种方案应该采用哪种,会有什么弊端?欢迎读者老师讨论给出建议。

八、最后说一句

我是石页兄,如果这篇文章对您有帮助,或者有所启发的话,欢迎关注笔者的微信公众号【 架构染色 】进行交流和学习。您的支持是我坚持写作最大的动力。

相关实践学习
基于CentOS快速搭建LAMP环境
本教程介绍如何搭建LAMP环境,其中LAMP分别代表Linux、Apache、MySQL和PHP。
全面了解阿里云能为你做什么
阿里云在全球各地部署高效节能的绿色数据中心,利用清洁计算为万物互联的新世界提供源源不断的能源动力,目前开服的区域包括中国(华北、华东、华南、香港)、新加坡、美国(美东、美西)、欧洲、中东、澳大利亚、日本。目前阿里云的产品涵盖弹性计算、数据库、存储与CDN、分析与搜索、云通信、网络、管理与监控、应用服务、互联网中间件、移动服务、视频服务等。通过本课程,来了解阿里云能够为你的业务带来哪些帮助 &nbsp; &nbsp; 相关的阿里云产品:云服务器ECS 云服务器 ECS(Elastic Compute Service)是一种弹性可伸缩的计算服务,助您降低 IT 成本,提升运维效率,使您更专注于核心业务创新。产品详情: https://www.aliyun.com/product/ecs
相关文章
|
2月前
|
Nacos 微服务
Seata常见问题之Seata报错Failed to fetch schema of t_table如何解决
Seata 是一个开源的分布式事务解决方案,旨在提供高效且简单的事务协调机制,以解决微服务架构下跨服务调用(分布式场景)的一致性问题。以下是Seata常见问题的一个合集
|
2月前
|
Linux Apache 微服务
Seata常见问题之Seata关闭自动代理提交数据时候报错如何解决
Seata 是一个开源的分布式事务解决方案,旨在提供高效且简单的事务协调机制,以解决微服务架构下跨服务调用(分布式场景)的一致性问题。以下是Seata常见问题的一个合集
|
2月前
|
Java 数据库 微服务
Seata常见问题之Seata的jdk17启动seata1.7.0报错如何解决
Seata 是一个开源的分布式事务解决方案,旨在提供高效且简单的事务协调机制,以解决微服务架构下跨服务调用(分布式场景)的一致性问题。以下是Seata常见问题的一个合集
|
2月前
|
弹性计算 Java 微服务
Seata常见问题之客户端集成了seata 注册会报错如何解决
Seata 是一个开源的分布式事务解决方案,旨在提供高效且简单的事务协调机制,以解决微服务架构下跨服务调用(分布式场景)的一致性问题。以下是Seata常见问题的一个合集
|
2月前
|
SQL NoSQL 关系型数据库
Seata常见问题之Seata报错Unknown SQLExpr如何解决
Seata 是一个开源的分布式事务解决方案,旨在提供高效且简单的事务协调机制,以解决微服务架构下跨服务调用(分布式场景)的一致性问题。以下是Seata常见问题的一个合集
|
2月前
|
NoSQL Java 数据库
Seata常见问题之xa模式下插入一条数据再更新这条数据会报错如何解决
Seata 是一个开源的分布式事务解决方案,旨在提供高效且简单的事务协调机制,以解决微服务架构下跨服务调用(分布式场景)的一致性问题。以下是Seata常见问题的一个合集
111 2
|
2月前
|
Java 关系型数据库 微服务
Seata常见问题之项目一直启动不成功如何解决
Seata 是一个开源的分布式事务解决方案,旨在提供高效且简单的事务协调机制,以解决微服务架构下跨服务调用(分布式场景)的一致性问题。以下是Seata常见问题的一个合集
78 0
|
2月前
|
SQL JSON Kubernetes
Seata常见问题之服务端 error日志没有输出,客户端执行sql报错如何解决
Seata 是一个开源的分布式事务解决方案,旨在提供高效且简单的事务协调机制,以解决微服务架构下跨服务调用(分布式场景)的一致性问题。以下是Seata常见问题的一个合集
122 0
|
2月前
|
数据库连接 应用服务中间件 数据库
Seata常见问题之访问seata 7091端口提示报错如何解决
Seata 是一个开源的分布式事务解决方案,旨在提供高效且简单的事务协调机制,以解决微服务架构下跨服务调用(分布式场景)的一致性问题。以下是Seata常见问题的一个合集
142 0
|
2月前
|
Nacos 数据库
分布式事务解决方案Seata
分布式事务解决方案Seata
27 1

热门文章

最新文章

相关课程

更多