开发者社区> 问答> 正文

Flink1.10 sink to mysql SocketException

各位好: 请教一个问题 场景: 1.实时监控业务流向Kafka的binlog数据,使用Flink进行解析出来同步到对应的MySQL环境 2.业务数据并不是一直都有的,可能N天才有一条,使用Flink就是想第一时间同步到MySQL环境,便于后续使用,降低业务影响 3.Flink使用的版本为:1.10.0

问题: 1.由于业务binlog数据不定时出现,导致Flink sink to MySQL的链接N天就会出现问题,报下面的问题(已使用MySQL pool) 2.错误信息如下: Caused by: com.mysql.jdbc.exceptions.jdbc4.CommunicationsException: The last packet successfully received from the server was 159,043,262 milliseconds ago. The last packet sent successfully to the server was 159,043,262 milliseconds ago. is longer than the server configured value of 'wait_timeout'. You should consider either expiring and/or testing connection validity before use in your application, increasing the server configured values for client timeouts, or using the Connector/J connection property 'autoReconnect=true' to avoid this problem. at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) at java.lang.reflect.Constructor.newInstance(Constructor.java:423) at com.mysql.jdbc.Util.handleNewInstance(Util.java:404) at com.mysql.jdbc.SQLError.createCommunicationsException(SQLError.java:988) at com.mysql.jdbc.MysqlIO.send(MysqlIO.java:3739) at com.mysql.jdbc.MysqlIO.sendCommand(MysqlIO.java:2508) at com.mysql.jdbc.MysqlIO.sqlQueryDirect(MysqlIO.java:2673) at com.mysql.jdbc.ConnectionImpl.execSQL(ConnectionImpl.java:2549) at com.mysql.jdbc.PreparedStatement.executeInternal(PreparedStatement.java:1861) at com.mysql.jdbc.PreparedStatement.executeUpdateInternal(PreparedStatement.java:2073) at com.mysql.jdbc.PreparedStatement.executeUpdateInternal(PreparedStatement.java:2009) at com.mysql.jdbc.PreparedStatement.executeLargeUpdate(PreparedStatement.java:5098) at com.mysql.jdbc.PreparedStatement.executeUpdate(PreparedStatement.java:1994) at org.apache.commons.dbcp2.DelegatingPreparedStatement.executeUpdate(DelegatingPreparedStatement.java:98) at org.apache.commons.dbcp2.DelegatingPreparedStatement.executeUpdate(DelegatingPreparedStatement.java:98) at com.jiuye.mdh.fla.MySQLSinkFunc.invoke(MySQLSinkFunc.java:34) at com.jiuye.mdh.fla.MySQLSinkFunc.invoke(MySQLSinkFunc.java:17) at org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:641) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:616) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:596) at org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:711) at org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:664) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:730) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:708) at org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:641) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:616) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:596) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:730) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:708) at org.apache.flink.streaming.api.operators.StreamFilter.processElement(StreamFilter.java:40) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:641) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:616) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:596) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:730) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:708) at SinkConversion$31.processElement(Unknown Source) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:641) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:616) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:596) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:730) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:708) at StreamExecCalc$28.processElement(Unknown Source) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:641) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:616) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:596) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:730) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:708) at SourceConversion$1.processElement(Unknown Source) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:641) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:616) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:596) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:730) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:708) at org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollectWithTimestamp(StreamSourceContexts.java:310) at org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collectWithTimestamp(StreamSourceContexts.java:409) at org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordWithTimestamp(AbstractFetcher.java:398) at org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.emitRecord(KafkaFetcher.java:185) at org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:150) at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:715) at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100) at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63) at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:196) Caused by: java.net.SocketException: Broken pipe (Write failed) at java.net.SocketOutputStream.socketWrite0(Native Method) at java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:111) at java.net.SocketOutputStream.write(SocketOutputStream.java:155) at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82) at java.io.BufferedOutputStream.flush(BufferedOutputStream.java:140) at com.mysql.jdbc.MysqlIO.send(MysqlIO.java:3721) ... 59 more 说明: 针对上述问题,之前查证过 1.针对版本不同,可以在URL中添加对应的参数(4.X需要,5.X不要[个人使用的是5.X])(无效) 2.使用对应MySQL pool进行处理(目前已使用,但是还会出问题)(求指点) 问: 是否有什么方式可以处理上述问题*来自志愿者整理的flink邮件归档

展开
收起
毛毛虫雨 2021-12-07 12:26:43 562 0
1 条回答
写回答
取消 提交回答
  • Hi, 你可以看下这个 Jira , https://issues.apache.org/jira/browse/FLINK-16681,似乎与你问题相关。 由于长时间没有数据导致 connection 断开问题,该问题已经在1.11版本修复。*来自志愿者整理的flink

    2021-12-07 15:41:18
    赞同 展开评论 打赏
问答排行榜
最热
最新

相关电子书

更多
Flink CDC Meetup PPT - 龚中强 立即下载
Flink CDC Meetup PPT - 王赫 立即下载
Flink CDC Meetup PPT - 覃立辉 立即下载

相关镜像