开发者社区 > 大数据与机器学习 > 实时计算 Flink > 正文

Flink CDC taskmanager设置20g内存 都oom,谁碰到过这个问题?

Flink CDC 2023-11-16 18:08:04,831 ERROR com.ververica.cdc.debezium.internal.Handover [] - Reporting error:
java.lang.OutOfMemoryError: GC overhead limit exceeded
at java.util.Arrays.copyOf(Arrays.java:3332) ~[?:1.8.0_301]
at java.lang.AbstractStringBuilder.ensureCapacityInternal(AbstractStringBuilder.java:124) ~[?:1.8.0_301]
at java.lang.AbstractStringBuilder.append(AbstractStringBuilder.java:649) ~[?:1.8.0_301]
at java.lang.StringBuffer.append(StringBuffer.java:381) ~[?:1.8.0_301]
at java.io.StringWriter.write(StringWriter.java:77) ~[?:1.8.0_301]
at org.bson.json.StrictCharacterStreamJsonWriter.write(StrictCharacterStreamJsonWriter.java:383) ~[flink-sql-connector-mongodb-cdc-2.4.1.jar:2.4.1]
at org.bson.json.StrictCharacterStreamJsonWriter.writeStringHelper(StrictCharacterStreamJsonWriter.java:349) ~[flink-sql-connector-mongodb-cdc-2.4.1.jar:2.4.1]
at org.bson.json.StrictCharacterStreamJsonWriter.writeString(StrictCharacterStreamJsonWriter.java:177) ~[flink-sql-connector-mongodb-cdc-2.4.1.jar:2.4.1]
at org.bson.json.JsonStringConverter.convert(JsonStringConverter.java:22) ~[flink-sql-connector-mongodb-cdc-2.4.1.jar:2.4.1]
at org.bson.json.JsonStringConverter.convert(JsonStringConverter.java:19) ~[flink-sql-connector-mongodb-cdc-2.4.1.jar:2.4.1]
at org.bson.json.JsonWriter.doWriteString(JsonWriter.java:220) ~[flink-sql-connector-mongodb-cdc-2.4.1.jar:2.4.1]
at org.bson.AbstractBsonWriter.writeString(AbstractBsonWriter.java:607) ~[flink-sql-connector-mongodb-cdc-2.4.1.jar:2.4.1]
at org.bson.AbstractBsonWriter.pipeValue(AbstractBsonWriter.java:832) ~[flink-sql-connector-mongodb-cdc-2.4.1.jar:2.4.1]
at org.bson.AbstractBsonWriter.pipeDocument(AbstractBsonWriter.java:803) ~[flink-sql-connector-mongodb-cdc-2.4.1.jar:2.4.1]
at org.bson.AbstractBsonWriter.pipe(AbstractBsonWriter.java:758) ~[flink-sql-connector-mongodb-cdc-2.4.1.jar:2.4.1]
at org.bson.codecs.RawBsonDocumentCodec.encode(RawBsonDocumentCodec.java:45) ~[flink-sql-connector-mongodb-cdc-2.4.1.jar:2.4.1]
at org.bson.RawBsonDocument.toJson(RawBsonDocument.java:329) ~[flink-sql-connector-mongodb-cdc-2.4.1.jar:2.4.1]
at com.mongodb.kafka.connect.source.schema.BsonValueToSchemaAndValue.stringToSchemaAndValue(BsonValueToSchemaAndValue.java:195) ~[flink-sql-connector-mongodb-cdc-2.4.1.jar:2.4.1]
at com.mongodb.kafka.connect.source.schema.BsonValueToSchemaAndValue.toSchemaAndValue(BsonValueToSchemaAndValue.java:91) ~[flink-sql-connector-mongodb-cdc-2.4.1.jar:2.4.1]
at com.mongodb.kafka.connect.source.schema.BsonValueToSchemaAndValue.lambda$recordToSchemaAndValue$2(BsonValueToSchemaAndValue.java:275) ~[flink-sql-connector-mongodb-cdc-2.4.1.jar:2.4.1]
at com.mongodb.kafka.connect.source.schema.BsonValueToSchemaAndValue$$Lambda$1005/1757703759.accept(Unknown Source) ~[?:?]
at java.util.ArrayList.forEach(ArrayList.java:1259) ~[?:1.8.0_301]
at java.util.Collections$UnmodifiableCollection.forEach(Collections.java:1082) ~[?:1.8.0_301]
at com.mongodb.kafka.connect.source.schema.BsonValueToSchemaAndValue.recordToSchemaAndValue(BsonValueToSchemaAndValue.java:271) ~[flink-sql-connector-mongodb-cdc-2.4.1.jar:2.4.1]
at com.mongodb.kafka.connect.source.schema.BsonValueToSchemaAndValue.toSchemaAndValue(BsonValueToSchemaAndValue.java:103) ~[flink-sql-connector-mongodb-cdc-2.4.1.jar:2.4.1]
at com.mongodb.kafka.connect.source.producer.AvroSchemaAndValueProducer.get(AvroSchemaAndValueProducer.java:39) ~[flink-sql-connector-mongodb-cdc-2.4.1.jar:2.4.1]
at com.mongodb.kafka.connect.source.MongoSourceTask.createSourceRecord(MongoSourceTask.java:305) ~[flink-sql-connector-mongodb-cdc-2.4.1.jar:2.4.1]
at com.mongodb.kafka.connect.source.MongoSourceTask.lambda$poll$0(MongoSourceTask.java:274) ~[flink-sql-connector-mongodb-cdc-2.4.1.jar:2.4.1]
at com.mongodb.kafka.connect.source.MongoSourceTask$$Lambda$1002/4546824.accept(Unknown Source) ~[?:?]
at java.util.Optional.ifPresent(Optional.java:159) ~[?:1.8.0_301]
at com.mongodb.kafka.connect.source.MongoSourceTask.poll(MongoSourceTask.java:265) ~[flink-sql-connector-mongodb-cdc-2.4.1.jar:2.4.1]
at com.ververica.cdc.connectors.mongodb.internal.MongoDBConnectorSourceTask.poll(MongoDBConnectorSourceTask.java:123) ~[flink-sql-connector-mongodb-cdc-2.4.1.jar:2.4.1]
taskmanager设置20g内存 都oom,谁碰到过这个问题,mongodb cdc 到 kafka?

展开
收起
真的很搞笑 2023-11-22 09:26:41 264 0
3 条回答
写回答
取消 提交回答
  • 面对过去,不要迷离;面对未来,不必彷徨;活在今天,你只要把自己完全展示给别人看。

    根据您提供的错误信息,这是一个 OutOfMemoryError 错误,表示任务无法获取足够的内存来完成任务。
    在 Flink CDC 中,您可以增加 TaskManager 的内存限制来缓解此问题。
    您可以在 TaskManager 的 conf/flink-conf.yaml 文件中添加如下配置:

    taskmanager.memory.process.size: 20g
    taskmanager.numberOfTaskSlots: 1
    

    这将设置 TaskManager 内存为 20GB,并将 TaskManager 分配为一个槽位。
    您还可以在每个 Task 中设置最大内存限制,例如:

    SET 'execution.runtime-config.taskmanager.default slot.parallelism': 1;
    SET 'taskmanager.network.memory.fraction': 0.5;
    SET 'taskmanager.memory.segment-size': 500m;
    

    这将使每个 Task 最大可用内存为 10GB,并将 Task 的最小可用内存设为 500MB。

    2023-11-29 13:38:44
    赞同 1 展开评论 打赏
  • 这个问题可能是由于Flink CDC在处理大量数据时,内存占用过高导致的。你可以尝试以下方法来解决这个问题:

    1. 增加TaskManager的内存分配。在Flink配置文件(flink-conf.yaml)中,找到taskmanager.memory.process.sizetaskmanager.memory.fraction这两个参数,适当增加它们的值。例如:
    taskmanager.memory.process.size: 4g
    taskmanager.memory.fraction: 0.8
    
    1. 调整Flink CDC的配置。在Flink CDC的配置文件(如application.properties或application.yaml)中,可以设置一些参数来优化内存使用。例如,可以设置maxChangesPerTransaction参数来限制每个事务中的变更数量,以减少内存占用:
    # application.properties
    flink.cdc.connector.mongodb.maxChangesPerTransaction: 1000
    
    1. 如果问题仍然存在,可以考虑将数据分批处理。在Flink CDC的配置中,可以设置batchSize参数来控制每次从MongoDB读取的数据量。这样可以减少单次处理的数据量,从而降低内存占用。例如:
    # application.properties
    flink.cdc.connector.mongodb.batchSize: 1000
    

    尝试以上方法后,如果问题仍然存在,可能需要进一步分析数据和配置,以找到更合适的解决方案。

    2023-11-29 12:01:22
    赞同 展开评论 打赏
  • 根据堆栈信息来看,你的Flink任务在尝试序列化MongoDB中的大型文档时遇到了内存溢出问题。由于转换器尝试将其转换为Avro格式以发送到Kafka,而这个过程涉及到创建一个非常大的字符串,最终导致内存溢出。
    有几种可能的解决方案可以解决这个问题:

    1. 增加TaskManager的内存分配。这是一个临时的解决方案,可能会缓解问题,但并不是长期有效的解决方案,因为它不能解决根本原因。
    2. 尝试使用更高效的序列化方式,比如使用BinaryRowData而不是Avro格式。BinaryRowData是一种二进制编码格式,它比Avro更紧凑并且可以减少内存消耗。
    3. 在你的MongoDB查询中添加适当的限制,只返回必要的字段。这将有助于减小序列化的数据量。
    2023-11-24 10:21:44
    赞同 1 展开评论 打赏

实时计算Flink版是阿里云提供的全托管Serverless Flink云服务,基于 Apache Flink 构建的企业级、高性能实时大数据处理系统。提供全托管版 Flink 集群和引擎,提高作业开发运维效率。

相关产品

  • 实时计算 Flink版
  • 相关电子书

    更多
    Flink CDC Meetup PPT - 龚中强 立即下载
    Flink CDC Meetup PPT - 王赫 立即下载
    Flink CDC Meetup PPT - 覃立辉 立即下载