Flink在第一个Sink失败的情况下 导致 在switch source算子运行状态 从running变成 cancelling 失败
报错 是因为 ResultSet.close()方法阻塞住了, 然后超过 task.cancellation.timeout 设置的时间 taskmanager就宕机了 有大佬遇到过嘛?Source(Mysql ) -> filter -> Sink(Mysql)
-> filter -> SinkWARN org.apache.flink.runtime.taskmanager.Task [] - Task 'DataSource (PFFF7AF61-7336-EF96-F823-31CE49B07791) (1/1)#0' did not react to cancelling signal - interrupting; it is stuck for 30 seconds in method:
java.net.SocketInputStream.socketRead0(Native Method)
java.net.SocketInputStream.socketRead(SocketInputStream.java:116)
java.net.SocketInputStream.read(SocketInputStream.java:171)
java.net.SocketInputStream.read(SocketInputStream.java:141)
com.mysql.jdbc.util.ReadAheadInputStream.fill(ReadAheadInputStream.java:100)
com.mysql.jdbc.util.ReadAheadInputStream.readFromUnderlyingStreamIfNecessary(ReadAheadInputStream.java:143)
com.mysql.jdbc.util.ReadAheadInputStream.read(ReadAheadInputStream.java:173)
com.mysql.jdbc.MysqlIO.readFully(MysqlIO.java:2911)
com.mysql.jdbc.MysqlIO.reuseAndReadPacket(MysqlIO.java:3382)
com.mysql.jdbc.MysqlIO.reuseAndReadPacket(MysqlIO.java:3322)
com.mysql.jdbc.MysqlIO.checkErrorPacket(MysqlIO.java:3762)
com.mysql.jdbc.MysqlIO.checkErrorPacket(MysqlIO.java:870)
com.mysql.jdbc.MysqlIO.nextRow(MysqlIO.java:1928)
com.mysql.jdbc.RowDataDynamic.nextRecord(RowDataDynamic.java:378)
com.mysql.jdbc.RowDataDynamic.next(RowDataDynamic.java:358)
com.mysql.jdbc.RowDataDynamic.close(RowDataDynamic.java:158)
com.mysql.jdbc.ResultSetImpl.realClose(ResultSetImpl.java:6703)
com.mysql.jdbc.ResultSetImpl.close(ResultSetImpl.java:848)
*.close(RDBInputFormat.java:433)
org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:207)
org.apache.flink.runtime.taskmanager.Task$$Lambda$671/731154499.run(Unknown Source)
org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948)
org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:927)
org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:741)
org.apache.flink.runtime.taskmanager.Task.run(Task.java:563)
java.lang.Thread.run(Thread.java:745)
这个问题可能是由于在第一个Sink失败后,Flink尝试关闭ResultSet时阻塞了。你可以尝试增加task.cancellation.timeout的值来解决这个问题
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。
实时计算Flink版是阿里云提供的全托管Serverless Flink云服务,基于 Apache Flink 构建的企业级、高性能实时大数据处理系统。提供全托管版 Flink 集群和引擎,提高作业开发运维效率。