问题一:Flinksql通过phoenix查询维表,报错
flinksql消费kafka,自定义的 connector phoenix 查询维表
任务在启动一段时间有时候一周左右后,任务挂掉,看日志是:
2020-11-24 00:52:38,534 ERROR com.custom.jdbc.table.JdbcRowDataLookupFunction [] - JDBC executeBatch error, retry times = 2 java.sql.SQLException: null at org.apache.calcite.avatica.Helper.createException(Helper.java:56) ~[flink-table-blink_2.11-1.11.1.jar:1.11.1] at org.apache.calcite.avatica.Helper.createException(Helper.java:41) ~[flink-table-blink_2.11-1.11.1.jar:1.11.1] at org.apache.calcite.avatica.AvaticaConnection.executeQueryInternal(AvaticaConnection.java:557) ~[flink-table-blink_2.11-1.11.1.jar:1.11.1] at org.apache.calcite.avatica.AvaticaPreparedStatement.executeQuery(AvaticaPreparedStatement.java:137) ~[flink-table-blink_2.11-1.11.1.jar:1.11.1] at com.custom.jdbc.table.JdbcRowDataLookupFunction.eval(JdbcRowDataLookupFunction.java:145) [sql-client-1.0-SNAPSHOT.jar:?] at LookupFunction$2.flatMap(Unknown Source) [flink-table-blink_2.11-1.11.1.jar:?] at org.apache.flink.table.runtime.operators.join.lookup.LookupJoinRunner.processElement(LookupJoinRunner.java:82) [flink-table-blink_2.11-1.11.1.jar:1.11.1] at org.apache.flink.table.runtime.operators.join.lookup.LookupJoinRunner.processElement(LookupJoinRunner.java:36) [flink-table-blink_2.11-1.11.1.jar:1.11.1] at org.apache.flink.streaming.api.operators.ProcessOperator.processElement(ProcessOperator.java:66) [flink-dist_2.11-1.11.1.jar:1.11.1] at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717) [flink-dist_2.11-1.11.1.jar:1.11.1] at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692) [flink-dist_2.11-1.11.1.jar:1.11.1] at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672) [flink-dist_2.11-1.11.1.jar:1.11.1] at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52) [flink-dist_2.11-1.11.1.jar:1.11.1] at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30) [flink-dist_2.11-1.11.1.jar:1.11.1] at org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:104) [flink-dist_2.11-1.11.1.jar:1.11.1] at org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collectWithTimestamp(StreamSourceContexts.java:111) [flink-dist_2.11-1.11.1.jar:1.11.1] at org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordsWithTimestamps(AbstractFetcher.java:352) [flink-sql-connector-kafka_2.11-1.11.1.jar:1.11.1] at org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:185) [flink-sql-connector-kafka_2.11-1.11.1.jar:1.11.1] at org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:141) [flink-sql-connector-kafka_2.11-1.11.1.jar:1.11.1] at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:755) [flink-sql-connector-kafka_2.11-1.11.1.jar:1.11.1] at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100) [flink-dist_2.11-1.11.1.jar:1.11.1] at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63) [flink-dist_2.11-1.11.1.jar:1.11.1] at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:201) [flink-dist_2.11-1.11.1.jar:1.11.1] Caused by: org.apache.calcite.avatica.NoSuchStatementException at org.apache.calcite.avatica.remote.RemoteMeta$15.call(RemoteMeta.java:349) ~[flink-table-blink_2.11-1.11.1.jar:1.11.1] at org.apache.calcite.avatica.remote.RemoteMeta$15.call(RemoteMeta.java:343) ~[flink-table-blink_2.11-1.11.1.jar:1.11.1] at org.apache.calcite.avatica.AvaticaConnection.invokeWithRetries(AvaticaConnection.java:793) ~[flink-table-blink_2.11-1.11.1.jar:1.11.1] at org.apache.calcite.avatica.remote.RemoteMeta.execute(RemoteMeta.java:342) ~[flink-table-blink_2.11-1.11.1.jar:1.11.1] at org.apache.calcite.avatica.AvaticaConnection.executeQueryInternal(AvaticaConnection.java:548) ~[flink-table-blink_2.11-1.11.1.jar:1.11.1] ... 20 more 2020-11-24 00:52:40,539 ERROR org.apache.flink.connector.jdbc.table.JdbcRowDataLookupFunction [] - JDBC executeBatch error, retry times = 3 java.sql.SQLException: null at org.apache.calcite.avatica.Helper.createException(Helper.java:56) ~[flink-table-blink_2.11-1.11.1.jar:1.11.1] at org.apache.calcite.avatica.Helper.createException(Helper.java:41) ~[flink-table-blink_2.11-1.11.1.jar:1.11.1] at org.apache.calcite.avatica.AvaticaConnection.executeQueryInternal(AvaticaConnection.java:557) ~[flink-table-blink_2.11-1.11.1.jar:1.11.1] at org.apache.calcite.avatica.AvaticaPreparedStatement.executeQuery(AvaticaPreparedStatement.java:137) ~[flink-table-blink_2.11-1.11.1.jar:1.11.1] at com.custom.phoenix.jdbc.table.JdbcRowDataLookupFunction.eval(JdbcRowDataLookupFunction.java:145) [sql-client-1.0-SNAPSHOT.jar:?] at LookupFunction$2.flatMap(Unknown Source) [flink-table-blink_2.11-1.11.1.jar:?] at org.apache.flink.table.runtime.operators.join.lookup.LookupJoinRunner.processElement(LookupJoinRunner.java:82) [flink-table-blink_2.11-1.11.1.jar:1.11.1] at org.apache.flink.table.runtime.operators.join.lookup.LookupJoinRunner.processElement(LookupJoinRunner.java:36) [flink-table-blink_2.11-1.11.1.jar:1.11.1] at org.apache.flink.streaming.api.operators.ProcessOperator.processElement(ProcessOperator.java:66) [flink-dist_2.11-1.11.1.jar:1.11.1] at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717) [flink-dist_2.11-1.11.1.jar:1.11.1] at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692) [flink-dist_2.11-1.11.1.jar:1.11.1] at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672) [flink-dist_2.11-1.11.1.jar:1.11.1] at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52) [flink-dist_2.11-1.11.1.jar:1.11.1] at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30) [flink-dist_2.11-1.11.1.jar:1.11.1] at org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:104) [flink-dist_2.11-1.11.1.jar:1.11.1] at org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collectWithTimestamp(StreamSourceContexts.java:111) [flink-dist_2.11-1.11.1.jar:1.11.1] at org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordsWithTimestamps(AbstractFetcher.java:352) [flink-sql-connector-kafka_2.11-1.11.1.jar:1.11.1] at org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:185) [flink-sql-connector-kafka_2.11-1.11.1.jar:1.11.1] at org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:141) [flink-sql-connector-kafka_2.11-1.11.1.jar:1.11.1] at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:755) [flink-sql-connector-kafka_2.11-1.11.1.jar:1.11.1] at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100) [flink-dist_2.11-1.11.1.jar:1.11.1 at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63) [flink-dist_2.11-1.11.1.jar:1.11.1] at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:201) [flink-dist_2.11-1.11.1.jar:1.11.1] Caused by: org.apache.calcite.avatica.NoSuchStatementException at org.apache.calcite.avatica.remote.RemoteMeta$15.call(RemoteMeta.java:349) ~[flink-table-blink_2.11-1.11.1.jar:1.11.1] at org.apache.calcite.avatica.remote.RemoteMeta$15.call(RemoteMeta.java:343) ~[flink-table-blink_2.11-1.11.1.jar:1.11.1] at org.apache.calcite.avatica.AvaticaConnection.invokeWithRetries(AvaticaConnection.java:793) ~[flink-table-blink_2.11-1.11.1.jar:1.11.1] at org.apache.calcite.avatica.remote.RemoteMeta.execute(RemoteMeta.java:342) ~[flink-table-blink_2.11-1.11.1.jar:1.11.1] at org.apache.calcite.avatica.AvaticaConnection.executeQueryInternal(AvaticaConnection.java:548) ~[flink-table-blink_2.11-1.11.1.jar:1.11.1] ... 20 more 2020-11-24 00:52:40,635 WARN org.apache.flink.runtime.taskmanager.Task switched from RUNNING to FAILED. java.lang.RuntimeException: Execution of JDBC statement failed.
各位大佬帮我看下哪的问题*来自志愿者整理的flink邮件归档
参考答案:
从你的堆栈看,你自定义的 “com.custom.jdbc.table.JdbcRowDataLookupFunction” 函数引用的 PreparedStatement 包不对。 具体实现可以参考:https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcRowDataLookupFunction.java 我理解如果 phoenix 支持标准的 SQL 协议的话,直接用提供的 JDBCRowDataLookupFunction 也可以?*来自志愿者整理的flink邮件归档
关于本问题的更多回答可点击进行查看:
https://developer.aliyun.com/ask/370097?spm=a2c6h.13066369.question.61.33bf585f8HkZzq
问题二:使用 StreamingFileSink后 checkpoint状态中的数据如何hive读取
本人使用的StreamingFileSink将数据按照行保存到hdfs中 StreamingFileSink streamingFileSink = StreamingFileSink. forRowFormat(new Path(path), new SimpleStringEncoder ("UTF-8")) .withBucketAssigner(bucketAssigner) .withRollingPolicy( DefaultRollingPolicy.builder()
withRolloverInterval(TimeUnit.HOURS.toMillis(1))
withInactivityInterval(TimeUnit.MINUTES.toMillis(30)) withMaxPartSize(1024 * 1024 * 1024) build()) withOutputFileConfig( OutputFileConfig.builder() withPartSuffix(partSuffix) build() ) build(); 配置如上,checkpoint的配置是10分钟一次,现在有个疑惑想要问下,现在hdfs上文件只是在半个小时都是未完成状态, 如 .part-0-11606723036.inprogress.5b46f31b-8289-44e9-ae26-997f3e479446 这种的处于 inprocress状态,但是我这checkpoint是10分钟一次,如果我的任务在29分钟挂了,那么hdfs上这个文件就肯定不是FINISHED状态,那么那20分钟的数据我这应该怎么处理. 我这现在按照默认的处理中,hive对于inprogress的数据是直接过滤掉的,我这把文件改成正常的名称是能读取到*来自志愿者整理的flink邮件归档
参考答案:
你需要使用oncheckpoint的policy,这样在每次Checkpoint时会滚动文件*来自志愿者整理的flink邮件归档
关于本问题的更多回答可点击进行查看:
https://developer.aliyun.com/ask/370098?spm=a2c6h.13066369.question.62.33bf585fSFtcXd
问题三:flink sql es写入时,用户名密码认证不支持怎么办?
看了一下官网文档,目前还不支持sql 写入es时进行用户名密码认证,有什么比较简单的解决方法吗?
除了用api之外。
感谢!*来自志愿者整理的flink邮件归档
参考答案:
你说的是es的 xpack 认证吗,需要你载入certificate文件是吗*来自志愿者整理的flink邮件归档
关于本问题的更多回答可点击进行查看:
https://developer.aliyun.com/ask/370099?spm=a2c6h.13066369.question.65.33bf585fAHh2HL
问题四:带有状态的算子保存checkpoint失败
rt*来自志愿者整理的flink邮件归档
参考答案:
我也刚遇到过,是从1.10升级上来的,一个任务可以保存checkpoint,一个任务老是保存失败,然后查看了lib下面的jar,发现有些jar没有升级上来,于是更改了,配置文件还改了state.checkpoints.dir。
希望对你有帮助*来自志愿者整理的flink邮件归档
关于本问题的更多回答可点击进行查看:
https://developer.aliyun.com/ask/370100?spm=a2c6h.13066369.question.64.33bf585fJRIH3C
问题五:请教一下目前flink submit能不能指定额外的依赖jar
由于目前用了flink SQL封装了jar包,sql是作为参数动态传入的,
因此需要动态的引入一下依赖jar,比如udf jar,connector的jar等,
由于不同任务的依赖jar是不同的,不适合都放在flink lib目录下(可能会冲突)
因此想请教一下有没有办法在submit时才指定任务依赖的jar包,类似spark submit的--jars
没有的话有没有相关的issue可以跟进这个问题*来自志愿者整理的flink邮件归档
参考答案:
目前有个 issue [1]在跟进创建 UDF 时候添加 jar 包。 PS:目前在我的内部版本,是扩展了 类似 ADD Dependency 语法,在 job 提交运行时候会把 jar 包等加载到所运行的 classpath 下, 这样就可以让用户在 SQL 中注册 UDF,自己定义 Connector等,但是是非标准 SQL。
[1] https://issues.apache.org/jira/browse/FLINK-14055*来自志愿者整理的flink邮件归档
关于本问题的更多回答可点击进行查看:
https://developer.aliyun.com/ask/370101?spm=a2c6h.13066369.question.67.33bf585fyYWUzf