Flink CDC里Caused by: com.github.shyiko.mysql.binlog.event.deserialization.EventDataDeserializationException: Failed to deserialize data of EventHeaderV4 ,Caused by: java.net.SocketException: Connection reset,mysql cdc 过2天就报错,这种情况是不是因为存在反压导致的呢 ?现在抽取差不多100张表 数据量有时候突然增长有点大 有没有可能是反压造程的源端mysql连接中断了 让dba 看 dba说没网络没问题 业务系统一直用的很稳定。Caused by: com.github.shyiko.mysql.binlog.event.deserialization.EventDataDeserializationException: Failed to deserialize data of EventHeaderV4{timestamp=1704708007000, eventType=UPDATE_ROWS, serverId=47613306, headerLength=19, dataLength=11255627, nextPosition=272860969, flags=0}
at com.github.shyiko.mysql.binlog.event.deserialization.EventDeserializer.deserializeEventData(EventDeserializer.java:341) ~[sdlw-flink-mysql-FlinkCdcMysqlToKafka.jar:?]
at com.github.shyiko.mysql.binlog.event.deserialization.EventDeserializer.nextEvent(EventDeserializer.java:244) ~[sdlw-flink-mysql-FlinkCdcMysqlToKafka.jar:?]
at io.debezium.connector.mysql.MySqlStreamingChangeEventSource$1.nextEvent(MySqlStreamingChangeEventSource.java:259) ~[sdlw-flink-mysql-FlinkCdcMysqlToKafka.jar:?]
at com.github.shyiko.mysql.binlog.BinaryLogClient.listenForEventPackets(BinaryLogClient.java:1051) ~[sdlw-flink-mysql-FlinkCdcMysqlToKafka.jar:?]
at com.github.shyiko.mysql.binlog.BinaryLogClient.connect(BinaryLogClient.java:631) ~[sdlw-flink-mysql-FlinkCdcMysqlToKafka.jar:?]
at com.github.shyiko.mysql.binlog.BinaryLogClient$7.run(BinaryLogClient.java:932) ~[sdlw-flink-mysql-FlinkCdcMysqlToKafka.jar:?]
at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_232]
Caused by: java.net.SocketException: Connection reset
at java.net.SocketInputStream.read(SocketInputStream.java:154) ~[?:1.8.0_232]
at java.net.SocketInputStream.read(SocketInputStream.java:141) ~[?:1.8.0_232]
at com.github.shyiko.mysql.binlog.io.BufferedSocketInputStream.read(BufferedSocketInputStream.java:68) ~[sdlw-flink-mysql-FlinkCdcMysqlToKafka.jar:?]
at com.github.shyiko.mysql.binlog.io.ByteArrayInputStream.readWithinBlockBoundaries(ByteArrayInputStream.java:266) ~[sdlw-flink-mysql-FlinkCdcMysqlToKafka.jar:?]
at com.github.shyiko.mysql.binlog.io.ByteArrayInputStream.read(ByteArrayInputStream.java:245) ~[sdlw-flink-mysql-FlinkCdcMysqlToKafka.jar:?]
at java.io.InputStream.skip(InputStream.java:224) ~[?:1.8.0_232]
at com.github.shyiko.mysql.binlog.io.ByteArrayInputStream.skipToTheEndOfTheBlock(ByteArrayInputStream.java:285) ~[sdlw-flink-mysql-FlinkCdcMysqlToKafka.jar:?]
at com.github.shyiko.mysql.binlog.event.deserialization.EventDeserializer.deserializeEventData(EventDeserializer.java:337) ~[sdlw-flink-mysql-FlinkCdcMysqlToKafka.jar:?]
at com.github.shyiko.mysql.binlog.event.deserialization.EventDeserializer.nextEvent(EventDeserializer.java:244) ~[sdlw-flink-mysql-FlinkCdcMysqlToKafka.jar:?]
at io.debezium.connector.mysql.MySqlStreamingChangeEventSource$1.nextEvent(MySqlStreamingChangeEventSource.java:259) ~[sdlw-flink-mysql-FlinkCdcMysqlToKafka.jar:?]
at com.github.shyiko.mysql.binlog.BinaryLogClient.listenForEventPackets(BinaryLogClient.java:1051) ~[sdlw-flink-mysql-FlinkCdcMysqlToKafka.jar:?]
at com.github.shyiko.mysql.binlog.BinaryLogClient.connect(BinaryLogClient.java:631) ~[sdlw-flink-mysql-FlinkCdcMysqlToKafka.jar:?]
at com.github.shyiko.mysql.binlog.BinaryLogClient$7.run(BinaryLogClient.java:932) ~[sdlw-flink-mysql-FlinkCdcMysqlToKafka.jar:?]
at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_232]
从日志中可以看出,这个问题可能是由于MySQL的binlog反压导致的。当数据量突然增长时,MySQL服务器可能会限制客户端的连接数,导致连接被重置。这可能是由于MySQL的配置参数max_connections
设置得过低,或者网络带宽不足等原因导致的。
为了解决这个问题,你可以尝试以下方法:
max_connections
,确保其值足够大以应对数据量的增长。你可以通过以下命令查看和修改max_connections
的值:SHOW VARIABLES LIKE 'max_connections';
如果需要修改,可以使用以下命令:
SET GLOBAL max_connections = 新的值;
检查网络带宽是否足够支持数据量的传输。如果网络带宽不足,可能会导致连接被重置。你可以尝试优化网络环境,或者增加带宽。
如果问题仍然存在,可以考虑使用其他的数据同步工具,如Debezium、Canal等,它们可能对反压的处理更加稳定。
看网络系统是否稳定。或者没必要hash,主键+库名+表名作为kafka主键放在一个分区就行了,并行消费也没得问题。此回答来自钉群Flink CDC 社区。
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。
实时计算Flink版是阿里云提供的全托管Serverless Flink云服务,基于 Apache Flink 构建的企业级、高性能实时大数据处理系统。提供全托管版 Flink 集群和引擎,提高作业开发运维效率。