flinksql消费kafka,自定义的 connector phoenix 查询维表
任务在启动一段时间有时候一周左右后,任务挂掉,看日志是:
2020-11-24 00:52:38,534 ERROR com.custom.jdbc.table.JdbcRowDataLookupFunction [] - JDBC executeBatch error, retry times = 2 java.sql.SQLException: null at org.apache.calcite.avatica.Helper.createException(Helper.java:56) ~[flink-table-blink_2.11-1.11.1.jar:1.11.1] at org.apache.calcite.avatica.Helper.createException(Helper.java:41) ~[flink-table-blink_2.11-1.11.1.jar:1.11.1] at org.apache.calcite.avatica.AvaticaConnection.executeQueryInternal(AvaticaConnection.java:557) ~[flink-table-blink_2.11-1.11.1.jar:1.11.1] at org.apache.calcite.avatica.AvaticaPreparedStatement.executeQuery(AvaticaPreparedStatement.java:137) ~[flink-table-blink_2.11-1.11.1.jar:1.11.1] at com.custom.jdbc.table.JdbcRowDataLookupFunction.eval(JdbcRowDataLookupFunction.java:145) [sql-client-1.0-SNAPSHOT.jar:?] at LookupFunction$2.flatMap(Unknown Source) [flink-table-blink_2.11-1.11.1.jar:?] at org.apache.flink.table.runtime.operators.join.lookup.LookupJoinRunner.processElement(LookupJoinRunner.java:82) [flink-table-blink_2.11-1.11.1.jar:1.11.1] at org.apache.flink.table.runtime.operators.join.lookup.LookupJoinRunner.processElement(LookupJoinRunner.java:36) [flink-table-blink_2.11-1.11.1.jar:1.11.1] at org.apache.flink.streaming.api.operators.ProcessOperator.processElement(ProcessOperator.java:66) [flink-dist_2.11-1.11.1.jar:1.11.1] at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717) [flink-dist_2.11-1.11.1.jar:1.11.1] at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692) [flink-dist_2.11-1.11.1.jar:1.11.1] at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672) [flink-dist_2.11-1.11.1.jar:1.11.1] at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52) [flink-dist_2.11-1.11.1.jar:1.11.1] at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30) [flink-dist_2.11-1.11.1.jar:1.11.1] at org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:104) [flink-dist_2.11-1.11.1.jar:1.11.1] at org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collectWithTimestamp(StreamSourceContexts.java:111) [flink-dist_2.11-1.11.1.jar:1.11.1] at org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordsWithTimestamps(AbstractFetcher.java:352) [flink-sql-connector-kafka_2.11-1.11.1.jar:1.11.1] at org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:185) [flink-sql-connector-kafka_2.11-1.11.1.jar:1.11.1] at org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:141) [flink-sql-connector-kafka_2.11-1.11.1.jar:1.11.1] at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:755) [flink-sql-connector-kafka_2.11-1.11.1.jar:1.11.1] at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100) [flink-dist_2.11-1.11.1.jar:1.11.1] at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63) [flink-dist_2.11-1.11.1.jar:1.11.1] at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:201) [flink-dist_2.11-1.11.1.jar:1.11.1] Caused by: org.apache.calcite.avatica.NoSuchStatementException at org.apache.calcite.avatica.remote.RemoteMeta$15.call(RemoteMeta.java:349) ~[flink-table-blink_2.11-1.11.1.jar:1.11.1] at org.apache.calcite.avatica.remote.RemoteMeta$15.call(RemoteMeta.java:343) ~[flink-table-blink_2.11-1.11.1.jar:1.11.1] at org.apache.calcite.avatica.AvaticaConnection.invokeWithRetries(AvaticaConnection.java:793) ~[flink-table-blink_2.11-1.11.1.jar:1.11.1] at org.apache.calcite.avatica.remote.RemoteMeta.execute(RemoteMeta.java:342) ~[flink-table-blink_2.11-1.11.1.jar:1.11.1] at org.apache.calcite.avatica.AvaticaConnection.executeQueryInternal(AvaticaConnection.java:548) ~[flink-table-blink_2.11-1.11.1.jar:1.11.1] ... 20 more 2020-11-24 00:52:40,539 ERROR org.apache.flink.connector.jdbc.table.JdbcRowDataLookupFunction [] - JDBC executeBatch error, retry times = 3 java.sql.SQLException: null at org.apache.calcite.avatica.Helper.createException(Helper.java:56) ~[flink-table-blink_2.11-1.11.1.jar:1.11.1] at org.apache.calcite.avatica.Helper.createException(Helper.java:41) ~[flink-table-blink_2.11-1.11.1.jar:1.11.1] at org.apache.calcite.avatica.AvaticaConnection.executeQueryInternal(AvaticaConnection.java:557) ~[flink-table-blink_2.11-1.11.1.jar:1.11.1] at org.apache.calcite.avatica.AvaticaPreparedStatement.executeQuery(AvaticaPreparedStatement.java:137) ~[flink-table-blink_2.11-1.11.1.jar:1.11.1] at com.custom.phoenix.jdbc.table.JdbcRowDataLookupFunction.eval(JdbcRowDataLookupFunction.java:145) [sql-client-1.0-SNAPSHOT.jar:?] at LookupFunction$2.flatMap(Unknown Source) [flink-table-blink_2.11-1.11.1.jar:?] at org.apache.flink.table.runtime.operators.join.lookup.LookupJoinRunner.processElement(LookupJoinRunner.java:82) [flink-table-blink_2.11-1.11.1.jar:1.11.1] at org.apache.flink.table.runtime.operators.join.lookup.LookupJoinRunner.processElement(LookupJoinRunner.java:36) [flink-table-blink_2.11-1.11.1.jar:1.11.1] at org.apache.flink.streaming.api.operators.ProcessOperator.processElement(ProcessOperator.java:66) [flink-dist_2.11-1.11.1.jar:1.11.1] at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717) [flink-dist_2.11-1.11.1.jar:1.11.1] at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692) [flink-dist_2.11-1.11.1.jar:1.11.1] at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672) [flink-dist_2.11-1.11.1.jar:1.11.1] at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52) [flink-dist_2.11-1.11.1.jar:1.11.1] at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30) [flink-dist_2.11-1.11.1.jar:1.11.1] at org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:104) [flink-dist_2.11-1.11.1.jar:1.11.1] at org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collectWithTimestamp(StreamSourceContexts.java:111) [flink-dist_2.11-1.11.1.jar:1.11.1] at org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordsWithTimestamps(AbstractFetcher.java:352) [flink-sql-connector-kafka_2.11-1.11.1.jar:1.11.1] at org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:185) [flink-sql-connector-kafka_2.11-1.11.1.jar:1.11.1] at org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:141) [flink-sql-connector-kafka_2.11-1.11.1.jar:1.11.1] at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:755) [flink-sql-connector-kafka_2.11-1.11.1.jar:1.11.1] at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100) [flink-dist_2.11-1.11.1.jar:1.11.1 at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63) [flink-dist_2.11-1.11.1.jar:1.11.1] at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:201) [flink-dist_2.11-1.11.1.jar:1.11.1] Caused by: org.apache.calcite.avatica.NoSuchStatementException at org.apache.calcite.avatica.remote.RemoteMeta$15.call(RemoteMeta.java:349) ~[flink-table-blink_2.11-1.11.1.jar:1.11.1] at org.apache.calcite.avatica.remote.RemoteMeta$15.call(RemoteMeta.java:343) ~[flink-table-blink_2.11-1.11.1.jar:1.11.1] at org.apache.calcite.avatica.AvaticaConnection.invokeWithRetries(AvaticaConnection.java:793) ~[flink-table-blink_2.11-1.11.1.jar:1.11.1] at org.apache.calcite.avatica.remote.RemoteMeta.execute(RemoteMeta.java:342) ~[flink-table-blink_2.11-1.11.1.jar:1.11.1] at org.apache.calcite.avatica.AvaticaConnection.executeQueryInternal(AvaticaConnection.java:548) ~[flink-table-blink_2.11-1.11.1.jar:1.11.1] ... 20 more 2020-11-24 00:52:40,635 WARN org.apache.flink.runtime.taskmanager.Task switched from RUNNING to FAILED. java.lang.RuntimeException: Execution of JDBC statement failed.
各位大佬帮我看下哪的问题*来自志愿者整理的flink邮件归档
从你的堆栈看,你自定义的 “com.custom.jdbc.table.JdbcRowDataLookupFunction” 函数引用的 PreparedStatement 包不对。 具体实现可以参考:https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcRowDataLookupFunction.java 我理解如果 phoenix 支持标准的 SQL 协议的话,直接用提供的 JDBCRowDataLookupFunction 也可以?*来自志愿者整理的flink邮件归档
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。