Flink CDC 2023-11-16 18:08:04,831 ERROR com.ververica.cdc.debezium.internal.Handover [] - Reporting error:
java.lang.OutOfMemoryError: GC overhead limit exceeded
at java.util.Arrays.copyOf(Arrays.java:3332) ~[?:1.8.0_301]
at java.lang.AbstractStringBuilder.ensureCapacityInternal(AbstractStringBuilder.java:124) ~[?:1.8.0_301]
at java.lang.AbstractStringBuilder.append(AbstractStringBuilder.java:649) ~[?:1.8.0_301]
at java.lang.StringBuffer.append(StringBuffer.java:381) ~[?:1.8.0_301]
at java.io.StringWriter.write(StringWriter.java:77) ~[?:1.8.0_301]
at org.bson.json.StrictCharacterStreamJsonWriter.write(StrictCharacterStreamJsonWriter.java:383) ~[flink-sql-connector-mongodb-cdc-2.4.1.jar:2.4.1]
at org.bson.json.StrictCharacterStreamJsonWriter.writeStringHelper(StrictCharacterStreamJsonWriter.java:349) ~[flink-sql-connector-mongodb-cdc-2.4.1.jar:2.4.1]
at org.bson.json.StrictCharacterStreamJsonWriter.writeString(StrictCharacterStreamJsonWriter.java:177) ~[flink-sql-connector-mongodb-cdc-2.4.1.jar:2.4.1]
at org.bson.json.JsonStringConverter.convert(JsonStringConverter.java:22) ~[flink-sql-connector-mongodb-cdc-2.4.1.jar:2.4.1]
at org.bson.json.JsonStringConverter.convert(JsonStringConverter.java:19) ~[flink-sql-connector-mongodb-cdc-2.4.1.jar:2.4.1]
at org.bson.json.JsonWriter.doWriteString(JsonWriter.java:220) ~[flink-sql-connector-mongodb-cdc-2.4.1.jar:2.4.1]
at org.bson.AbstractBsonWriter.writeString(AbstractBsonWriter.java:607) ~[flink-sql-connector-mongodb-cdc-2.4.1.jar:2.4.1]
at org.bson.AbstractBsonWriter.pipeValue(AbstractBsonWriter.java:832) ~[flink-sql-connector-mongodb-cdc-2.4.1.jar:2.4.1]
at org.bson.AbstractBsonWriter.pipeDocument(AbstractBsonWriter.java:803) ~[flink-sql-connector-mongodb-cdc-2.4.1.jar:2.4.1]
at org.bson.AbstractBsonWriter.pipe(AbstractBsonWriter.java:758) ~[flink-sql-connector-mongodb-cdc-2.4.1.jar:2.4.1]
at org.bson.codecs.RawBsonDocumentCodec.encode(RawBsonDocumentCodec.java:45) ~[flink-sql-connector-mongodb-cdc-2.4.1.jar:2.4.1]
at org.bson.RawBsonDocument.toJson(RawBsonDocument.java:329) ~[flink-sql-connector-mongodb-cdc-2.4.1.jar:2.4.1]
at com.mongodb.kafka.connect.source.schema.BsonValueToSchemaAndValue.stringToSchemaAndValue(BsonValueToSchemaAndValue.java:195) ~[flink-sql-connector-mongodb-cdc-2.4.1.jar:2.4.1]
at com.mongodb.kafka.connect.source.schema.BsonValueToSchemaAndValue.toSchemaAndValue(BsonValueToSchemaAndValue.java:91) ~[flink-sql-connector-mongodb-cdc-2.4.1.jar:2.4.1]
at com.mongodb.kafka.connect.source.schema.BsonValueToSchemaAndValue.lambda$recordToSchemaAndValue$2(BsonValueToSchemaAndValue.java:275) ~[flink-sql-connector-mongodb-cdc-2.4.1.jar:2.4.1]
at com.mongodb.kafka.connect.source.schema.BsonValueToSchemaAndValue$$Lambda$1005/1757703759.accept(Unknown Source) ~[?:?]
at java.util.ArrayList.forEach(ArrayList.java:1259) ~[?:1.8.0_301]
at java.util.Collections$UnmodifiableCollection.forEach(Collections.java:1082) ~[?:1.8.0_301]
at com.mongodb.kafka.connect.source.schema.BsonValueToSchemaAndValue.recordToSchemaAndValue(BsonValueToSchemaAndValue.java:271) ~[flink-sql-connector-mongodb-cdc-2.4.1.jar:2.4.1]
at com.mongodb.kafka.connect.source.schema.BsonValueToSchemaAndValue.toSchemaAndValue(BsonValueToSchemaAndValue.java:103) ~[flink-sql-connector-mongodb-cdc-2.4.1.jar:2.4.1]
at com.mongodb.kafka.connect.source.producer.AvroSchemaAndValueProducer.get(AvroSchemaAndValueProducer.java:39) ~[flink-sql-connector-mongodb-cdc-2.4.1.jar:2.4.1]
at com.mongodb.kafka.connect.source.MongoSourceTask.createSourceRecord(MongoSourceTask.java:305) ~[flink-sql-connector-mongodb-cdc-2.4.1.jar:2.4.1]
at com.mongodb.kafka.connect.source.MongoSourceTask.lambda$poll$0(MongoSourceTask.java:274) ~[flink-sql-connector-mongodb-cdc-2.4.1.jar:2.4.1]
at com.mongodb.kafka.connect.source.MongoSourceTask$$Lambda$1002/4546824.accept(Unknown Source) ~[?:?]
at java.util.Optional.ifPresent(Optional.java:159) ~[?:1.8.0_301]
at com.mongodb.kafka.connect.source.MongoSourceTask.poll(MongoSourceTask.java:265) ~[flink-sql-connector-mongodb-cdc-2.4.1.jar:2.4.1]
at com.ververica.cdc.connectors.mongodb.internal.MongoDBConnectorSourceTask.poll(MongoDBConnectorSourceTask.java:123) ~[flink-sql-connector-mongodb-cdc-2.4.1.jar:2.4.1]
taskmanager设置20g内存 都oom,谁碰到过这个问题,mongodb cdc 到 kafka?
根据您提供的错误信息,这是一个 OutOfMemoryError 错误,表示任务无法获取足够的内存来完成任务。
在 Flink CDC 中,您可以增加 TaskManager 的内存限制来缓解此问题。
您可以在 TaskManager 的 conf/flink-conf.yaml
文件中添加如下配置:
taskmanager.memory.process.size: 20g
taskmanager.numberOfTaskSlots: 1
这将设置 TaskManager 内存为 20GB,并将 TaskManager 分配为一个槽位。
您还可以在每个 Task 中设置最大内存限制,例如:
SET 'execution.runtime-config.taskmanager.default slot.parallelism': 1;
SET 'taskmanager.network.memory.fraction': 0.5;
SET 'taskmanager.memory.segment-size': 500m;
这将使每个 Task 最大可用内存为 10GB,并将 Task 的最小可用内存设为 500MB。
这个问题可能是由于Flink CDC在处理大量数据时,内存占用过高导致的。你可以尝试以下方法来解决这个问题:
taskmanager.memory.process.size
和taskmanager.memory.fraction
这两个参数,适当增加它们的值。例如:taskmanager.memory.process.size: 4g
taskmanager.memory.fraction: 0.8
maxChangesPerTransaction
参数来限制每个事务中的变更数量,以减少内存占用:# application.properties
flink.cdc.connector.mongodb.maxChangesPerTransaction: 1000
batchSize
参数来控制每次从MongoDB读取的数据量。这样可以减少单次处理的数据量,从而降低内存占用。例如:# application.properties
flink.cdc.connector.mongodb.batchSize: 1000
尝试以上方法后,如果问题仍然存在,可能需要进一步分析数据和配置,以找到更合适的解决方案。
根据堆栈信息来看,你的Flink任务在尝试序列化MongoDB中的大型文档时遇到了内存溢出问题。由于转换器尝试将其转换为Avro格式以发送到Kafka,而这个过程涉及到创建一个非常大的字符串,最终导致内存溢出。
有几种可能的解决方案可以解决这个问题:
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。
实时计算Flink版是阿里云提供的全托管Serverless Flink云服务,基于 Apache Flink 构建的企业级、高性能实时大数据处理系统。提供全托管版 Flink 集群和引擎,提高作业开发运维效率。