开发者社区> 问答> 正文

Flink 1.10 的 JDBCUpsertOutputFormat flush方法的有问题

我们在测试环境测试JDBC写入postgresql的场景,用tcpkill模拟链接被关闭的情况,测试对异常的兼容性,我们发现一直打印类似的日志 

2020-03-20 21:16:21.247 [jdbc-upsert-output-format-thread-1] ERROR org.apache.flink.api.java.io.jdbc.JDBCUpsertOutputFormat  - JDBC executeBatch error, retry times = 1 

org.postgresql.util.PSQLException: This connection has been closed. 

at org.postgresql.jdbc.PgConnection.checkClosed(PgConnection.java:857) 

at org.postgresql.jdbc.PgConnection.getAutoCommit(PgConnection.java:817) 

at org.postgresql.jdbc.PgStatement.internalExecuteBatch(PgStatement.java:813) 

at org.postgresql.jdbc.PgStatement.executeBatch(PgStatement.java:873) 

at org.postgresql.jdbc.PgPreparedStatement.executeBatch(PgPreparedStatement.java:1569) 

at org.apache.flink.api.java.io.jdbc.writer.AppendOnlyWriter.executeBatch(AppendOnlyWriter.java:62) 

at org.apache.flink.api.java.io.jdbc.JDBCUpsertOutputFormat.flush(JDBCUpsertOutputFormat.java:159) 

at org.apache.flink.api.java.io.jdbc.JDBCUpsertOutputFormat.lambda$open$0(JDBCUpsertOutputFormat.java:124) 

at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) 

at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) 

at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180) 

at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294) 

at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) 

at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) 

at java.lang.Thread.run(Thread.java:748) 

从日志上看来,链接被关闭,只会导致重试一次,这里是Flink的重试代码 

//JDBCUpsertOutputFormat.javapublic synchronized void flush() throws Exception { 

checkFlushException(); 

for (int i = 1; i <= maxRetryTimes; i++) { 

try { 

jdbcWriter.executeBatch(); 

batchCount = 0; 

break; 

} catch (SQLException e) { 

LOG.error("JDBC executeBatch error, retry times = {}", i, e); 

if (i >= maxRetryTimes) { 

throw e; 

Thread.sleep(1000 * i); 

通过远程debug分析,在第一次执行 

JDBCUpsertOutputFormat.flush 

  -> AppendOnlyWriter.executeBatch 

     ... 

     -> PgConnection.getAutoCommit 

抛出PSQLException: This connection has been closed时,batchStatements在这之前已经被清空 

// PgStatement.java private BatchResultHandler internalExecuteBatch() throws SQLException { // Construct query/parameter arrays. transformQueriesAndParameters(); // Empty arrays should be passed to toArray // see http://shipilev.net/blog/2016/arrays-wisdom-ancients/ Query[] queries = batchStatements.toArray(new Query[0]); ParameterList[] parameterLists = batchParameters.toArray(new ParameterList[0]); batchStatements.clear(); // 这里已经被清空 batchParameters.clear(); ... if (connection.getAutoCommit()) { // 抛出异常 flags |= QueryExecutor.QUERY_SUPPRESS_BEGIN; } ... } 

所以在Flink第二次重试执行这段代码的时候jdbcWriter.executeBatch的时候,batchStatements已经为Empty并直接返回了,导致Flink认为是成功的,这样会导致其实语句没有被执行? 

// PgStatement.java public int[] executeBatch() throws SQLException { checkClosed(); closeForNextExecution(); if (batchStatements == null || batchStatements.isEmpty()) { //这里就直接返回了 return new int[0]; } return internalExecuteBatch().getUpdateCount(); } 

目前的想法是,当我出现异常捕获的时候,在重试之前,判断这个链接是否已经被关闭,如果已经被关闭,则重新open,不知道大家有啥看法,这是我提的issue 

https://issues.apache.org/jira/browse/FLINK-16708*来自志愿者整理的flink邮件归档

展开
收起
玛丽莲梦嘉 2021-12-02 16:39:00 623 0
1 条回答
写回答
取消 提交回答
  • 是否[1]能解决你的问题呢?还是说需要单独判断不同的exceptions? 

    [1] https://issues.apache.org/jira/browse/FLINK-16281*来自志愿者整理的FLINK邮件归档

    2021-12-02 17:33:49
    赞同 展开评论 打赏
问答排行榜
最热
最新

相关电子书

更多
Flink CDC Meetup PPT - 龚中强 立即下载
Flink CDC Meetup PPT - 王赫 立即下载
Flink CDC Meetup PPT - 覃立辉 立即下载