我在SQL关联后把结果写入mysql出现 No value specified for parameter 1错误? 我的版本是1.10.0,代码如下 JDBCUpsertTableSink build = JDBCUpsertTableSink.builder() .setTableSchema(results.getSchema()) .setOptions(JDBCOptions.builder() .setDBUrl("。。。。MultiQueries=true&useUnicode=true&characterEncoding=UTF-8") .setDriverName("com.mysql.jdbc.Driver") .setUsername("jczx_cjch") .setPassword("jczx_cjch2") .setTableName("xkf_join_result") .build()) .setFlushIntervalMills(1000) .setFlushMaxSize(100) .setMaxRetryTimes(3) .build();
DataStream<Tuple2<Boolean, Row>> retract = bsTableEnv.toRetractStream(results, Row.class); retract.print(); build.emitDataStream(retract);
就会出现如下错误 java.sql.SQLException: No value specified for parameter 1 at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:965) at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:898) at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:887) at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:861) at com.mysql.jdbc.PreparedStatement.checkAllParametersSet(PreparedStatement.java:2211) at com.mysql.jdbc.PreparedStatement.fillSendPacket(PreparedStatement.java:2191) at com.mysql.jdbc.PreparedStatement.fillSendPacket(PreparedStatement.java:2121) at com.mysql.jdbc.PreparedStatement.execute(PreparedStatement.java:1162) at org.apache.flink.api.java.io.jdbc.writer.UpsertWriter.executeBatch(UpsertWriter.java:118) at org.apache.flink.api.java.io.jdbc.JDBCUpsertOutputFormat.flush(JDBCUpsertOutputFormat.java:159) at org.apache.flink.api.java.io.jdbc.JDBCUpsertSinkFunction.snapshotState(JDBCUpsertSinkFunction.java:56) at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.trySnapshotFunctionState(StreamingFunctionUtils.java:118) at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.snapshotFunctionState(StreamingFunctionUtils.java:99) at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.snapshotState(AbstractUdfStreamOperator.java:90) at org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:402) at org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.checkpointStreamOperator(StreamTask.java:1420) at org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java:1354)
我的输出数据是(true,2020-04-22 21:34:00,2020-04-22 21:34:15,20200422213541465568468)是这样的 我查看源码发现 先调用JDBCUpsertOutputFormat类的writeRecord方法给UpsertWriter类的成员变量map中添加元素 @Override public synchronized void writeRecord(Tuple2<Boolean, Row> tuple2) throws IOException { checkFlushException(); try { jdbcWriter.addRecord(tuple2); batchCount++; if (batchCount >= flushMaxSize) { flush(); } } catch (Exception e) { throw new RuntimeException("Writing records to JDBC failed.", e); } } 之后调用flush()方法,调用UpsertWriter类执行executeBatch方法 public synchronized void flush() throws Exception { checkFlushException(); for (int i = 1; i <= maxRetryTimes; i++) { try { jdbcWriter.executeBatch(); batchCount = 0; break; } catch (SQLException e) { LOG.error("JDBC executeBatch error, retry times = {}", i, e); if (i >= maxRetryTimes) { throw e; } Thread.sleep(1000 * i); } } }
然后会调用UpsertWriter类 实现JDBCWriter类在executeBatch方法中先判断map是否为空,然后循环map;之后判断2元组第一个元素的true调用内部类处理元素,否则删除数据;但是我每次的数据只有一条就是map的大小是1,且2元组的第一个元素值是true,循环结束执行 deleteStatement.executeBatch();方法就会出错,因为删除的语句站位符还没有填充;
@Override public void executeBatch() throws SQLException { if (keyToRows.size() > 0) { for (Map.Entry<Row, Tuple2<Boolean, Row>> entry : keyToRows.entrySet()) { Row pk = entry.getKey(); Tuple2<Boolean, Row> tuple = entry.getValue(); if (tuple.f0) { processOneRowInBatch(pk, tuple.f1); } else { setRecordToStatement(deleteStatement, pkTypes, pk); deleteStatement.addBatch(); } } internalExecuteBatch(); deleteStatement.executeBatch(); keyToRows.clear(); } }*来自志愿者整理的FLINK邮件归档
没能复现你说的问题,最后一步的分析应该有点小问题,我看下了jdbc mysql的实现 com/mysql/jdbc/PreparedStatement.java#executeBatchInternal() 1289行 是会判断batchedArgs数组的大小后会直接返回的,应该不会执行,你可以进一步调试确认下
if (this.batchedArgs == null || this.batchedArgs.size() == 0) {
return new long[0];
}
```*来自志愿者整理的FLINK邮件归档
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。