各位好: 请教一个问题 场景: 1.实时监控业务流向Kafka的binlog数据,使用Flink进行解析出来同步到对应的MySQL环境 2.业务数据并不是一直都有的,可能N天才有一条,使用Flink就是想第一时间同步到MySQL环境,便于后续使用,降低业务影响 3.Flink使用的版本为:1.10.0
问题: 1.由于业务binlog数据不定时出现,导致Flink sink to MySQL的链接N天就会出现问题,报下面的问题(已使用MySQL pool) 2.错误信息如下: Caused by: com.mysql.jdbc.exceptions.jdbc4.CommunicationsException: The last packet successfully received from the server was 159,043,262 milliseconds ago. The last packet sent successfully to the server was 159,043,262 milliseconds ago. is longer than the server configured value of 'wait_timeout'. You should consider either expiring and/or testing connection validity before use in your application, increasing the server configured values for client timeouts, or using the Connector/J connection property 'autoReconnect=true' to avoid this problem. at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) at java.lang.reflect.Constructor.newInstance(Constructor.java:423) at com.mysql.jdbc.Util.handleNewInstance(Util.java:404) at com.mysql.jdbc.SQLError.createCommunicationsException(SQLError.java:988) at com.mysql.jdbc.MysqlIO.send(MysqlIO.java:3739) at com.mysql.jdbc.MysqlIO.sendCommand(MysqlIO.java:2508) at com.mysql.jdbc.MysqlIO.sqlQueryDirect(MysqlIO.java:2673) at com.mysql.jdbc.ConnectionImpl.execSQL(ConnectionImpl.java:2549) at com.mysql.jdbc.PreparedStatement.executeInternal(PreparedStatement.java:1861) at com.mysql.jdbc.PreparedStatement.executeUpdateInternal(PreparedStatement.java:2073) at com.mysql.jdbc.PreparedStatement.executeUpdateInternal(PreparedStatement.java:2009) at com.mysql.jdbc.PreparedStatement.executeLargeUpdate(PreparedStatement.java:5098) at com.mysql.jdbc.PreparedStatement.executeUpdate(PreparedStatement.java:1994) at org.apache.commons.dbcp2.DelegatingPreparedStatement.executeUpdate(DelegatingPreparedStatement.java:98) at org.apache.commons.dbcp2.DelegatingPreparedStatement.executeUpdate(DelegatingPreparedStatement.java:98) at com.jiuye.mdh.fla.MySQLSinkFunc.invoke(MySQLSinkFunc.java:34) at com.jiuye.mdh.fla.MySQLSinkFunc.invoke(MySQLSinkFunc.java:17) at org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:641) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:616) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:596) at org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:711) at org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:664) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:730) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:708) at org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:641) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:616) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:596) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:730) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:708) at org.apache.flink.streaming.api.operators.StreamFilter.processElement(StreamFilter.java:40) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:641) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:616) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:596) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:730) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:708) at SinkConversion$31.processElement(Unknown Source) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:641) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:616) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:596) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:730) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:708) at StreamExecCalc$28.processElement(Unknown Source) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:641) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:616) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:596) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:730) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:708) at SourceConversion$1.processElement(Unknown Source) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:641) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:616) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:596) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:730) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:708) at org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollectWithTimestamp(StreamSourceContexts.java:310) at org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collectWithTimestamp(StreamSourceContexts.java:409) at org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordWithTimestamp(AbstractFetcher.java:398) at org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.emitRecord(KafkaFetcher.java:185) at org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:150) at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:715) at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100) at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63) at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:196) Caused by: java.net.SocketException: Broken pipe (Write failed) at java.net.SocketOutputStream.socketWrite0(Native Method) at java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:111) at java.net.SocketOutputStream.write(SocketOutputStream.java:155) at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82) at java.io.BufferedOutputStream.flush(BufferedOutputStream.java:140) at com.mysql.jdbc.MysqlIO.send(MysqlIO.java:3721) ... 59 more 说明: 针对上述问题,之前查证过 1.针对版本不同,可以在URL中添加对应的参数(4.X需要,5.X不要[个人使用的是5.X])(无效) 2.使用对应MySQL pool进行处理(目前已使用,但是还会出问题)(求指点) 问: 是否有什么方式可以处理上述问题*来自志愿者整理的flink邮件归档
Hi, 你可以看下这个 Jira , https://issues.apache.org/jira/browse/FLINK-16681,似乎与你问题相关。 由于长时间没有数据导致 connection 断开问题,该问题已经在1.11版本修复。*来自志愿者整理的flink
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。