Flink CDC里有没有遇到 这种问题啊?
2024-01-30 19:43:08,239 ERROR org.apache.flink.runtime.io.network.netty.PartitionRequestQueue [] - Encountered error while consuming partitions
org.apache.flink.shaded.netty4.io.netty.handler.codec.TooLongFrameException: Adjusted frame length exceeds 2147483647: 2157838595 - discarded
akka.framesize: "20480000b" 貌似已经足够大了吧?
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。
在Flink CDC场景中,您遇到的错误信息如下:
Encountered error while consuming partitions org.apache.flink.shaded.netty4.io.netty.handler.codec.TooLongFrameException: Adjusted frame length exceeds 2147483647: 2157838595 - discarded
该问题表明Netty在处理网络数据帧时,检测到数据帧长度超过了允许的最大值(2147483647
,即Integer.MAX_VALUE
)。尽管您已经将akka.framesize
设置为20480000b
(约20MB),但此参数仅影响Akka框架的消息传递大小限制,而不会直接影响Netty的帧长度限制。
以下是针对该问题的详细分析和解决方案:
Netty帧长度限制
Netty默认对单个数据帧的长度有严格的限制。如果某个数据帧的大小超过2147483647
字节(约2GB),Netty会抛出TooLongFrameException
异常。
CDC数据量过大
Flink CDC在从MySQL等数据库捕获变更数据时,可能会生成较大的数据块(例如大批量的更新或删除操作)。如果这些数据块未被合理分片或压缩,可能导致单个数据帧超出Netty的限制。
配置不匹配
akka.framesize
仅适用于Akka消息传递,而Netty的帧长度限制由其他参数控制。因此,即使akka.framesize
设置得足够大,也无法解决Netty的帧长度问题。
为了缓解Netty帧长度限制问题,您可以尝试以下配置调整:
增加Netty缓冲区大小
在Flink作业的flink-conf.yaml
文件中,添加或修改以下参数:
taskmanager.network.memory.fraction: 0.2
taskmanager.network.memory.min: 64mb
taskmanager.network.memory.max: 1gb
这些参数用于调整TaskManager的网络缓冲区内存分配,确保能够处理更大的数据帧。
调整Netty最大帧长度
如果您的Flink版本支持自定义Netty参数,可以通过以下方式设置最大帧长度:
taskmanager.network.netty.server.max-frame-size: 2147483647
taskmanager.network.netty.client.max-frame-size: 2147483647
注意:上述参数的具体名称可能因Flink版本不同而有所变化,请参考您所使用的Flink版本文档。
如果问题的根本原因是CDC数据量过大,建议从数据源和作业逻辑层面进行优化:
分片处理
确保MySQL Binlog中的变更数据能够被合理分片。例如,避免一次性捕获过多的大批量更新操作。
启用压缩
在Flink CDC的WITH
参数中启用数据压缩功能(如'debezium.snapshot.fetch.size'
和'debezium.source.database.history.kafka.recovery.attempts'
),以减少单个数据帧的大小。
调整Checkpoint间隔
如果Checkpoint间隔过短,可能导致大量数据堆积在内存中。建议适当延长Checkpoint间隔时间:
execution.checkpointing.interval: 60000 # 单位为毫秒
某些情况下,Netty版本冲突可能导致帧长度限制异常。请检查您的Flink作业是否引入了与Flink内置Netty版本不兼容的依赖。如果存在冲突,建议通过以下方式解决:
排除冲突依赖
在pom.xml
中排除冲突的Netty依赖:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.11</artifactId>
<version>${flink.version}</version>
<exclusions>
<exclusion>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
</exclusion>
</exclusions>
</dependency>
强制使用Flink内置Netty版本
在pom.xml
中显式指定Flink内置的Netty版本:
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.68.Final</version> <!-- 根据Flink版本选择 -->
</dependency>
升级Flink版本
如果您使用的是较旧的Flink版本,建议升级到最新稳定版本(如Flink 1.15及以上)。新版本通常包含对Netty和CDC相关问题的优化和修复。
监控资源使用
在调整配置后,请密切监控TaskManager的内存和网络使用情况,确保不会因缓冲区过大导致OOM(Out of Memory)问题。
通过调整Netty相关参数、优化CDC数据流以及解决依赖冲突,可以有效缓解TooLongFrameException
问题。如果问题仍然存在,建议结合具体场景进一步分析日志,并考虑升级Flink版本以获得更好的兼容性和性能支持。
实时计算Flink版是阿里云提供的全托管Serverless Flink云服务,基于 Apache Flink 构建的企业级、高性能实时大数据处理系统。提供全托管版 Flink 集群和引擎,提高作业开发运维效率。