在使用flink cdc pg时 有遇到这个问题吗?
] -> TableToDataSteam (1/1)#0. Failure reason: Checkpoint was declined.
at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java: 269)
at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java: 173)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java: 336)
at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.checkpointStreamOperator(RegularOperatorChain.java: 228)
at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.buildOperatorSnapshotFutures(RegularOperatorChain.java: 213)
at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.snapshotState(RegularOperatorChain.java: 192)
at org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.takeSnapshotSync(SubtaskCheckpointCoordinatorImpl.java: 718)
at org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.checkpointState(SubtaskCheckpointCoordinatorImpl.java: 351)
at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$performCheckpoint$15(StreamTask.java: 1318)
at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java: 93)
at org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java: 1306)
at org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointAsyncInMailbox(StreamTask.java: 1191)
... 14 more
Caused by: org.apache.flink.util.FlinkRuntimeException: Call snapshotState() on failed source, checkpoint failed.
at com.ververica.cdc.debezium.DebeziumSourceFunction.snapshotState(DebeziumSourceFunction.java: 311)
at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.trySnapshotFunctionState(StreamingFunctionUtils.java: 118)
at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.snapshotFunctionState(StreamingFunctionUtils.java: 99)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.snapshotState(AbstractUdfStreamOperator.java: 88)
at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java: 222)
... 25 more
Caused by: com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.errors.ConnectException: An exception occurred in the change event producer. This connector will be stopped.
at io.debezium.pipeline.ErrorHandler.setProducerThrowable(ErrorHandler.java: 50)
at io.debezium.pipeline.ChangeEventSourceCoordinator.lambda$start$0(ChangeEventSourceCoordinator.java: 116)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java: 511)
at java.util.concurrent.FutureTask.run(FutureTask.java: 266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java: 1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java: 617)
... 1 more
Caused by: io.debezium.DebeziumException: java.lang.RuntimeException: org.postgresql.util.PSQLException: This connection has been closed.
at io.debezium.pipeline.source.AbstractSnapshotChangeEventSource.execute(AbstractSnapshotChangeEventSource.java: 85)
at io.debezium.pipeline.ChangeEventSourceCoordinator.doSnapshot(ChangeEventSourceCoordinator.java: 155)
at io.debezium.pipeline.ChangeEventSourceCoordinator.executeChangeEventSources(ChangeEventSourceCoordinator.java: 137)
at io.debezium.pipeline.ChangeEventSourceCoordinator.lambda$start$0(ChangeEventSourceCoordinator.java: 109)
... 5 more
Caused by: java.lang.RuntimeException: org.postgresql.util.PSQLException: This connection has been closed.
at io.debezium.relational.RelationalSnapshotChangeEventSource.rollbackTransaction(RelationalSnapshotChangeEventSource.java: 548)
at io.debezium.relational.RelationalSnapshotChangeEventSource.doExecute(RelationalSnapshotChangeEventSource.java: 146)
at io.debezium.pipeline.source.AbstractSnapshotChangeEventSource.execute(AbstractSnapshotChangeEventSource.java: 76)
... 8 more
Caused by: org.postgresql.util.PSQLException: This connection has been closed.
at org.postgresql.jdbc.PgConnection.checkClosed(PgConnection.java: 907)
at org.postgresql.jdbc.PgConnection.rollback(PgConnection.java: 914)
at io.debezium.relational.RelationalSnapshotChangeEventSource.rollbackTransaction(RelationalSnapshotChangeEventSource.java: 545)
... 10 more
我看了一下代码里面没有任何一个地方去关闭了这个数据库连接 我猜测是数据库或者是jdbc自动去close的这个连接
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。
实时计算Flink版是阿里云提供的全托管Serverless Flink云服务,基于 Apache Flink 构建的企业级、高性能实时大数据处理系统。提供全托管版 Flink 集群和引擎,提高作业开发运维效率。