某产品线的某个RocketMQ GroupID消息大量积压;DBA反馈该应用使用的TBase数据库带宽流量飙升,TBase数据库的CPU、Load处于正常范围。
update test set a = ?,b = ?,c = ? where d = ?
TBase JDBC Driver
<dependency> <groupId>org.postgresql</groupId> <artifactId>postgresql</artifactId> <version>42.3.3</version> </dependency>
Extended Query
Extended Query 协议将以上 Simple Query 的处理流程分为若干步骤,每一步都由单独的服务端消息进行确认。该协议可以使用服务端的 perpared-statement 功能,即先发送一条参数化 SQL,服务端收到 SQL(Statement)之后对其进行解析、重写并保存,这里保存的 Statement 也就是所谓 Prepared-statement,可以被复用;执行 SQL 时,直接获取事先保存的 Prepared-statement 生成计划并执行,避免对同类型 SQL 重复解析和重写。
如下例, SELECT * FROM users u, logs l WHERE u.usrid=$1 AND u.usrid=l.usrid AND l.date = $2; 是一条参数化 SQL,执行 PREPARE 时,服务端对该 SQL 进行解析和重写;执行 EXECUTE 时,为 Prepared Statement 生成计划并执行。第二次执行 EXECUTE 时无需再对 SQL 进行解析和重写,直接生成计划并执行即可。
可见,Extended Query 协议通过使用服务端的 Prepared Statement,提升同类 SQL 多次执行的效率。但与 Simple Query 相比,其不允许在一个请求中包含多条 SQL 命令,否则会报语法错误。
Extended Query 协议通常包括 5 个步骤,分别是 Parse,Bind,Describe,Execute 和 Sync。以下分别介绍各个阶段的处理流程。
客户端首先向服务端发送一个 Parse 消息,该消息包括参数化 SQL,参数占位符以及每个参数的类型,还可以指定 Statement 的名字,若不指定名字,即为一个 “未命名” 的 Statement,该 Statement 会在生成下一个 “未命名” Statement 时予以销毁,若指定名字,则必须在下次发送 Parse 消息前将其显式销毁。
PostgreSQL 服务端收到该消息后,调用 exec_parse_message 函数进行处理,进行语法分析、语义分析和重写,同时会创建一个 Plan Cache 的结构,用于缓存后续的执行计划。
客户端发送 Bind 消息,该消息携带具体的参数值、参数格式和返回列的格式,如下:
PostgreSQL 收到该消息后,调用 exec_bind_message 函数进行处理。为之前保存的 Prepared Statement 创建执行计划并将其保存在 Plan Cache 中,创建一个 Portal 用于后续执行。关于 Plan Cache 的具体实现和复用逻辑在此不细述,以后单独撰文介绍。
在 PostgreSQL 内核中,Portal 是对查询执行状态的一种抽象,该结构贯穿执行器运行的始终。
客户端可以发送 Describe 消息获取 Statment 或 Portal 的元信息,即返回结果的列名,类型等信息,这些信息由 RowDescription 消息携带。如果请求获取 Statement 的元信息,还会返回具体的参数信息,由 ParameterDescription 消息携带。
客户端发送 Execute 消息告知服务端执行请求,服务端收到消息后,执行 Bind 阶段创建的 Portal,执行结果通过 DataRow 消息返回给客户端,执行完成后发送 CommandComplete 。
Execute 消息中可以指定返回的行数,若行数为 0,表示返回所有行。
使用 Extended Query 协议时,一个请求总是以 Sync 消息结束,服务端接收到 Sync 消息后,关闭隐式开启的事务并回复 ReadyForQuery 消息。
Extended Query 完整的消息流如下:
另外一个思路是从应用程序侧分析TBase JDBC Driver发送请求的逻辑。
TBase JDBC Driver
ts=2024-01-23 00:23:56;thread_name=ConsumeMessageThread_12;id=14a;is_daemon=false;priority=5;TCCL=org.apache.catalina.loader.ParallelWebappClassLoader@246b0ff2;trace_id=0a04cb0d17059202723904892d7747;rpc_id= @org.postgresql.core.v3.QueryExecutorImpl.sendQuery() at org.postgresql.core.v3.QueryExecutorImpl.execute(QueryExecutorImpl.java:347) at org.postgresql.core.v3.QueryExecutorImpl.execute(QueryExecutorImpl.java:315) at org.postgresql.jdbc.PgPreparedStatement.getParameterMetaData(PgPreparedStatement.java:1659) at com.alibaba.druid.pool.DruidPooledPreparedStatement.getParameterMetaData(DruidPooledPreparedStatement.java:710) at org.springframework.jdbc.core.StatementCreatorUtils.setNull(StatementCreatorUtils.java:276) at org.springframework.jdbc.core.StatementCreatorUtils.setParameterValueInternal(StatementCreatorUtils.java:235) at org.springframework.jdbc.core.StatementCreatorUtils.setParameterValue(StatementCreatorUtils.java:169) at org.springframework.jdbc.core.ArgumentPreparedStatementSetter.doSetValue(ArgumentPreparedStatementSetter.java:66) at org.springframework.jdbc.core.ArgumentPreparedStatementSetter.setValues(ArgumentPreparedStatementSetter.java:47) at org.springframework.jdbc.core.JdbcTemplate$2.doInPreparedStatement(JdbcTemplate.java:875) at org.springframework.jdbc.core.JdbcTemplate$2.doInPreparedStatement(JdbcTemplate.java:870) at org.springframework.jdbc.core.JdbcTemplate.execute(JdbcTemplate.java:633) at org.springframework.jdbc.core.JdbcTemplate.update(JdbcTemplate.java:870) at org.springframework.jdbc.core.JdbcTemplate.update(JdbcTemplate.java:931) at org.springframework.jdbc.core.JdbcTemplate.update(JdbcTemplate.java:941) at xxxxx at com.aliyun.openservices.ons.api.impl.rocketmq.ConsumerImpl$MessageListenerImpl.consumeMessage(ConsumerImpl.java:116) at com.alibaba.rocketmq.client.impl.consumer.ConsumeMessageConcurrentlyService$ConsumeRequest.run(ConsumeMessageConcurrentlyService.java:710) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745)
import com.alibaba.druid.pool.DruidDataSource; import org.springframework.jdbc.core.JdbcTemplate; import java.sql.SQLException; public class JdbcTemplateTest { public static void main(String[] args) throws SQLException { DruidDataSource dataSource = new DruidDataSource(); dataSource.setDriverClassName("org.postgresql.Driver"); dataSource.setUsername("postgres"); dataSource.setPassword("admin"); dataSource.setUrl("jdbc:postgresql://localhost:5432/postgres"); dataSource.setTimeBetweenEvictionRunsMillis(10 * 60 * 1000); dataSource.setInitialSize(0); dataSource.setPhyTimeoutMillis(1000); dataSource.setTestWhileIdle(false); dataSource.setFilters("stat"); dataSource.init(); JdbcTemplate jdbcTemplate = new JdbcTemplate(); jdbcTemplate.setDataSource(dataSource); String sql = "update test set name = ?,address = ? where id = ?"; Object[] params = new Object[3]; params[0] = null; params[1] = null; params[2] = 1; int size = jdbcTemplate.update(sql,params); System.out.println(size); } }
public void setValues(PreparedStatement ps) throws SQLException { if (this.args != null) { for (int i = 0; i < this.args.length; i++) { Object arg = this.args[i]; doSetValue(ps, i + 1, arg); } } } protected void doSetValue(PreparedStatement ps, int parameterPosition, Object argValue) throws SQLException { if (argValue instanceof SqlParameterValue) { SqlParameterValue paramValue = (SqlParameterValue) argValue; StatementCreatorUtils.setParameterValue(ps, parameterPosition, paramValue, paramValue.getValue()); } else { StatementCreatorUtils.setParameterValue(ps, parameterPosition, SqlTypeValue.TYPE_UNKNOWN, argValue); } }
private static void setParameterValueInternal(PreparedStatement ps, int paramIndex, int sqlType, String typeName, Integer scale, Object inValue) throws SQLException { String typeNameToUse = typeName; int sqlTypeToUse = sqlType; Object inValueToUse = inValue; // override type info? if (inValue instanceof SqlParameterValue) { SqlParameterValue parameterValue = (SqlParameterValue) inValue; if (logger.isDebugEnabled()) { logger.debug("Overriding type info with runtime info from SqlParameterValue: column index " + paramIndex + ", SQL type " + parameterValue.getSqlType() + ", type name " + parameterValue.getTypeName()); } if (parameterValue.getSqlType() != SqlTypeValue.TYPE_UNKNOWN) { sqlTypeToUse = parameterValue.getSqlType(); } if (parameterValue.getTypeName() != null) { typeNameToUse = parameterValue.getTypeName(); } inValueToUse = parameterValue.getValue(); } if (logger.isTraceEnabled()) { logger.trace("Setting SQL statement parameter value: column index " + paramIndex + ", parameter value [" + inValueToUse + "], value class [" + (inValueToUse != null ? inValueToUse.getClass().getName() : "null") + "], SQL type " + (sqlTypeToUse == SqlTypeValue.TYPE_UNKNOWN ? "unknown" : Integer.toString(sqlTypeToUse))); } if (inValueToUse == null) { setNull(ps, paramIndex, sqlTypeToUse, typeNameToUse); } else { setValue(ps, paramIndex, sqlTypeToUse, typeNameToUse, scale, inValueToUse); } } private static void setNull(PreparedStatement ps, int paramIndex, int sqlType, String typeName) throws SQLException { if (sqlType == SqlTypeValue.TYPE_UNKNOWN || sqlType == Types.OTHER) { boolean useSetObject = false; Integer sqlTypeToUse = null; DatabaseMetaData dbmd = null; String jdbcDriverName = null; boolean tryGetParameterType = true; if (shouldIgnoreGetParameterType == null) { try { dbmd = ps.getConnection().getMetaData(); jdbcDriverName = dbmd.getDriverName(); tryGetParameterType = !driversWithNoSupportForGetParameterType.contains(jdbcDriverName); if (tryGetParameterType && jdbcDriverName.startsWith("Oracle")) { // Avoid getParameterType use with Oracle 12c driver by default: // needs to be explicitly activated through spring.jdbc.getParameterType.ignore=false tryGetParameterType = false; driversWithNoSupportForGetParameterType.add(jdbcDriverName); } } catch (Throwable ex) { logger.debug("Could not check connection metadata", ex); } } else { tryGetParameterType = !shouldIgnoreGetParameterType; } if (tryGetParameterType) { try { // 问题出在ps.getParameterMetaData()调用上 sqlTypeToUse = ps.getParameterMetaData().getParameterType(paramIndex); } catch (Throwable ex) { if (logger.isDebugEnabled()) { logger.debug("JDBC 3.0 getParameterType call not supported - using fallback method instead: " + ex); } } } if (sqlTypeToUse == null) { // JDBC driver not compliant with JDBC 3.0 -> proceed with database-specific checks sqlTypeToUse = Types.NULL; try { if (dbmd == null) { dbmd = ps.getConnection().getMetaData(); } if (jdbcDriverName == null) { jdbcDriverName = dbmd.getDriverName(); } if (shouldIgnoreGetParameterType == null) { // Register JDBC driver with no support for getParameterType, except for the // Oracle 12c driver where getParameterType fails for specific statements only // (so an exception thrown above does not indicate general lack of support). driversWithNoSupportForGetParameterType.add(jdbcDriverName); } String databaseProductName = dbmd.getDatabaseProductName(); if (databaseProductName.startsWith("Informix") || (jdbcDriverName.startsWith("Microsoft") && jdbcDriverName.contains("SQL Server"))) { // "Microsoft SQL Server JDBC Driver 3.0" versus "Microsoft JDBC Driver 4.0 for SQL Server" useSetObject = true; } else if (databaseProductName.startsWith("DB2") || jdbcDriverName.startsWith("jConnect") || jdbcDriverName.startsWith("SQLServer")|| jdbcDriverName.startsWith("Apache Derby")) { sqlTypeToUse = Types.VARCHAR; } } catch (Throwable ex) { logger.debug("Could not check connection metadata", ex); } } if (useSetObject) { ps.setObject(paramIndex, null); } else { ps.setNull(paramIndex, sqlTypeToUse); } } else if (typeName != null) { ps.setNull(paramIndex, sqlType, typeName); } else { ps.setNull(paramIndex, sqlType); } }
public ParameterMetaData getParameterMetaData() throws SQLException { int flags = QueryExecutor.QUERY_ONESHOT | QueryExecutor.QUERY_DESCRIBE_ONLY | QueryExecutor.QUERY_SUPPRESS_BEGIN; StatementResultHandler handler = new StatementResultHandler(); connection.getQueryExecutor().execute(preparedQuery.query, preparedParameters, handler, 0, 0, flags); int[] oids = preparedParameters.getTypeOIDs(); return createParameterMetaData(connection, oids); }
private void sendOneQuery(SimpleQuery query, SimpleParameterList params, int maxRows, int fetchSize, int flags) throws IOException { boolean asSimple = (flags & QueryExecutor.QUERY_EXECUTE_AS_SIMPLE) != 0; if (asSimple) { assert (flags & QueryExecutor.QUERY_DESCRIBE_ONLY) == 0 : "Simple mode does not support describe requests. sql = " + query.getNativeSql() + ", flags = " + flags; sendSimpleQuery(query, params); return; } assert !query.getNativeQuery().multiStatement : "Queries that might contain ; must be executed with QueryExecutor.QUERY_EXECUTE_AS_SIMPLE mode. " + "Given query is " + query.getNativeSql(); // Per https://www.postgresql.org/docs/current/static/protocol-flow.html#PROTOCOL-FLOW-EXT-QUERY // A Bind message can use the unnamed prepared statement to create a named portal. // If the Bind is successful, an Execute message can reference that named portal until either // the end of the current transaction // or the named portal is explicitly destroyed boolean noResults = (flags & QueryExecutor.QUERY_NO_RESULTS) != 0; boolean noMeta = (flags & QueryExecutor.QUERY_NO_METADATA) != 0; boolean describeOnly = (flags & QueryExecutor.QUERY_DESCRIBE_ONLY) != 0; // extended queries always use a portal // the usePortal flag controls whether or not we use a *named* portal boolean usePortal = (flags & QueryExecutor.QUERY_FORWARD_CURSOR) != 0 && !noResults && !noMeta && fetchSize > 0 && !describeOnly; boolean oneShot = (flags & QueryExecutor.QUERY_ONESHOT) != 0; boolean noBinaryTransfer = (flags & QUERY_NO_BINARY_TRANSFER) != 0; boolean forceDescribePortal = (flags & QUERY_FORCE_DESCRIBE_PORTAL) != 0; // Work out how many rows to fetch in this pass. int rows; if (noResults) { rows = 1; // We're discarding any results anyway, so limit data transfer to a minimum } else if (!usePortal) { rows = maxRows; // Not using a portal -- fetchSize is irrelevant } else if (maxRows != 0 && fetchSize > maxRows) { // fetchSize > maxRows, use maxRows (nb: fetchSize cannot be 0 if usePortal == true) rows = maxRows; } else { rows = fetchSize; // maxRows > fetchSize } sendParse(query, params, oneShot); // Must do this after sendParse to pick up any changes to the // query's state. // boolean queryHasUnknown = query.hasUnresolvedTypes(); boolean paramsHasUnknown = params.hasUnresolvedTypes(); boolean describeStatement = describeOnly || (!oneShot && paramsHasUnknown && queryHasUnknown && !query.isStatementDescribed()); if (!describeStatement && paramsHasUnknown && !queryHasUnknown) { int[] queryOIDs = castNonNull(query.getPrepareTypes()); int[] paramOIDs = params.getTypeOIDs(); for (int i = 0; i < paramOIDs.length; i++) { // Only supply type information when there isn't any // already, don't arbitrarily overwrite user supplied // type information. if (paramOIDs[i] == Oid.UNSPECIFIED) { params.setResolvedType(i + 1, queryOIDs[i]); } } } if (describeStatement) { sendDescribeStatement(query, params, describeOnly); if (describeOnly) { return; } } // Construct a new portal if needed. Portal portal = null; if (usePortal) { String portalName = "C_" + (nextUniqueID++); portal = new Portal(query, portalName); } sendBind(query, params, portal, noBinaryTransfer); // A statement describe will also output a RowDescription, // so don't reissue it here if we've already done so. // if (!noMeta && !describeStatement) { /* * don't send describe if we already have cached the row description from previous executions * * XXX Clearing the fields / unpreparing the query (in sendParse) is incorrect, see bug #267. * We might clear the cached fields in a later execution of this query if the bind parameter * types change, but we're assuming here that they'll still be valid when we come to process * the results of this query, so we don't send a new describe here. We re-describe after the * fields are cleared, but the result of that gets processed after processing the results from * earlier executions that we didn't describe because we didn't think we had to. * * To work around this, force a Describe at each execution in batches where this can be a * problem. It won't cause more round trips so the performance impact is low, and it'll ensure * that the field information available when we decoded the results. This is undeniably a * hack, but there aren't many good alternatives. */ if (!query.isPortalDescribed() || forceDescribePortal) { sendDescribePortal(query, portal); } } sendExecute(query, portal, rows); }
通过以上分析可知,造成流量增大的原因是Spring JdbcTemplate中针对值为null的字段,调用了PgPreparedStatement.getParameterMetaData导致的。
jdbcTemplate.update(sql, new PreparedStatementSetter() { @Override public void setValues(PreparedStatement ps) throws SQLException { ps.setString(1,null); ps.setString(2,null); ps.setInt(3,1); } });
MySQL JDBC Driver是否存在这个问题?经过分析相关代码,及ECS带宽流量,可以确定不存在这个问题。
public ParameterMetaData getParameterMetaData() throws SQLException { synchronized (checkClosed().getConnectionMutex()) { if (this.parameterMetaData == null) { if (this.connection.getGenerateSimpleParameterMetadata()) { this.parameterMetaData = new MysqlParameterMetadata(this.parameterCount); } else { this.parameterMetaData = new MysqlParameterMetadata(null, this.parameterCount, getExceptionInterceptor()); } } return this.parameterMetaData; } }