开发者社区 > 大数据与机器学习 > 实时计算 Flink > 正文

Flink CDC里这种情况是不是因为存在反压导致的呢 ?

Flink CDC里Caused by: com.github.shyiko.mysql.binlog.event.deserialization.EventDataDeserializationException: Failed to deserialize data of EventHeaderV4 ,Caused by: java.net.SocketException: Connection reset,mysql cdc 过2天就报错,这种情况是不是因为存在反压导致的呢 ?现在抽取差不多100张表 数据量有时候突然增长有点大 有没有可能是反压造程的源端mysql连接中断了 让dba 看 dba说没网络没问题 业务系统一直用的很稳定。Caused by: com.github.shyiko.mysql.binlog.event.deserialization.EventDataDeserializationException: Failed to deserialize data of EventHeaderV4{timestamp=1704708007000, eventType=UPDATE_ROWS, serverId=47613306, headerLength=19, dataLength=11255627, nextPosition=272860969, flags=0}
at com.github.shyiko.mysql.binlog.event.deserialization.EventDeserializer.deserializeEventData(EventDeserializer.java:341) ~[sdlw-flink-mysql-FlinkCdcMysqlToKafka.jar:?]
at com.github.shyiko.mysql.binlog.event.deserialization.EventDeserializer.nextEvent(EventDeserializer.java:244) ~[sdlw-flink-mysql-FlinkCdcMysqlToKafka.jar:?]
at io.debezium.connector.mysql.MySqlStreamingChangeEventSource$1.nextEvent(MySqlStreamingChangeEventSource.java:259) ~[sdlw-flink-mysql-FlinkCdcMysqlToKafka.jar:?]
at com.github.shyiko.mysql.binlog.BinaryLogClient.listenForEventPackets(BinaryLogClient.java:1051) ~[sdlw-flink-mysql-FlinkCdcMysqlToKafka.jar:?]
at com.github.shyiko.mysql.binlog.BinaryLogClient.connect(BinaryLogClient.java:631) ~[sdlw-flink-mysql-FlinkCdcMysqlToKafka.jar:?]
at com.github.shyiko.mysql.binlog.BinaryLogClient$7.run(BinaryLogClient.java:932) ~[sdlw-flink-mysql-FlinkCdcMysqlToKafka.jar:?]
at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_232]
Caused by: java.net.SocketException: Connection reset
at java.net.SocketInputStream.read(SocketInputStream.java:154) ~[?:1.8.0_232]
at java.net.SocketInputStream.read(SocketInputStream.java:141) ~[?:1.8.0_232]
at com.github.shyiko.mysql.binlog.io.BufferedSocketInputStream.read(BufferedSocketInputStream.java:68) ~[sdlw-flink-mysql-FlinkCdcMysqlToKafka.jar:?]
at com.github.shyiko.mysql.binlog.io.ByteArrayInputStream.readWithinBlockBoundaries(ByteArrayInputStream.java:266) ~[sdlw-flink-mysql-FlinkCdcMysqlToKafka.jar:?]
at com.github.shyiko.mysql.binlog.io.ByteArrayInputStream.read(ByteArrayInputStream.java:245) ~[sdlw-flink-mysql-FlinkCdcMysqlToKafka.jar:?]
at java.io.InputStream.skip(InputStream.java:224) ~[?:1.8.0_232]
at com.github.shyiko.mysql.binlog.io.ByteArrayInputStream.skipToTheEndOfTheBlock(ByteArrayInputStream.java:285) ~[sdlw-flink-mysql-FlinkCdcMysqlToKafka.jar:?]
at com.github.shyiko.mysql.binlog.event.deserialization.EventDeserializer.deserializeEventData(EventDeserializer.java:337) ~[sdlw-flink-mysql-FlinkCdcMysqlToKafka.jar:?]
at com.github.shyiko.mysql.binlog.event.deserialization.EventDeserializer.nextEvent(EventDeserializer.java:244) ~[sdlw-flink-mysql-FlinkCdcMysqlToKafka.jar:?]
at io.debezium.connector.mysql.MySqlStreamingChangeEventSource$1.nextEvent(MySqlStreamingChangeEventSource.java:259) ~[sdlw-flink-mysql-FlinkCdcMysqlToKafka.jar:?]
at com.github.shyiko.mysql.binlog.BinaryLogClient.listenForEventPackets(BinaryLogClient.java:1051) ~[sdlw-flink-mysql-FlinkCdcMysqlToKafka.jar:?]
at com.github.shyiko.mysql.binlog.BinaryLogClient.connect(BinaryLogClient.java:631) ~[sdlw-flink-mysql-FlinkCdcMysqlToKafka.jar:?]
at com.github.shyiko.mysql.binlog.BinaryLogClient$7.run(BinaryLogClient.java:932) ~[sdlw-flink-mysql-FlinkCdcMysqlToKafka.jar:?]
at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_232]

展开
收起
小小鹿鹿鹿 2024-01-17 23:51:07 61 0
2 条回答
写回答
取消 提交回答
  • 面对过去,不要迷离;面对未来,不必彷徨;活在今天,你只要把自己完全展示给别人看。

    从日志中可以看出,这个问题可能是由于MySQL的binlog反压导致的。当数据量突然增长时,MySQL服务器可能会限制客户端的连接数,导致连接被重置。这可能是由于MySQL的配置参数max_connections设置得过低,或者网络带宽不足等原因导致的。

    为了解决这个问题,你可以尝试以下方法:

    1. 检查MySQL的配置参数max_connections,确保其值足够大以应对数据量的增长。你可以通过以下命令查看和修改max_connections的值:
    SHOW VARIABLES LIKE 'max_connections';
    

    如果需要修改,可以使用以下命令:

    SET GLOBAL max_connections = 新的值;
    
    1. 检查网络带宽是否足够支持数据量的传输。如果网络带宽不足,可能会导致连接被重置。你可以尝试优化网络环境,或者增加带宽。

    2. 如果问题仍然存在,可以考虑使用其他的数据同步工具,如Debezium、Canal等,它们可能对反压的处理更加稳定。

    2024-01-18 14:33:17
    赞同 展开评论 打赏
  • 看网络系统是否稳定。或者没必要hash,主键+库名+表名作为kafka主键放在一个分区就行了,并行消费也没得问题。此回答来自钉群Flink CDC 社区。

    2024-01-18 10:36:39
    赞同 展开评论 打赏

实时计算Flink版是阿里云提供的全托管Serverless Flink云服务,基于 Apache Flink 构建的企业级、高性能实时大数据处理系统。提供全托管版 Flink 集群和引擎,提高作业开发运维效率。

相关产品

  • 实时计算 Flink版
  • 热门讨论

    热门文章

    相关电子书

    更多
    Flink CDC Meetup PPT - 龚中强 立即下载
    Flink CDC Meetup PPT - 王赫 立即下载
    Flink CDC Meetup PPT - 覃立辉 立即下载