开发者社区 > 大数据与机器学习 > 实时计算 Flink > 正文

hello,请教下,我有个flinkcdc任务,同步mysql中的一张表到kafka,表量级不大?

hello,请教下,我有个flinkcdc任务,同步mysql中的一张表到kafka,表量级不大不到3万条,但是有个字段存储字符串较长,启动flinkcdc程序后一致OOM,请问有遇到过么?java.lang.runtimeexception: One or more fetchers have encountered exceptionat org.apache.flink.connector.base.source.reader.fetcher.SplitFetcherManager.checkErrors(SplitFetcherManager.java:225)at org.apache.flink.connector.base.source.reader.SourceReaderBase.getNextFetch(SourceReaderBase.java:169)at org.apache.flink.connector.base.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:130)at org.apache.flink.streaming.api.operators.SourceOperator.emitNext(SourceOperator.java:385)at org.apache.flink.streaming.runtime.io.StreamTaskSourceInput.emitNext(StreamTaskSourceInput.java:68)at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:519)at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:203)at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:804)at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:753)at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948)at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:927)at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:741)at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563)at java.lang.Thread.run(Thread.java:750)Caused by: java.lang.RuntimeException: SplitFetcher thread 1 received unexpected exception while polling the recordsat org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:150)at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.run(SplitFetcher.java:105)at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)at java.util.concurrent.FutureTask.run(FutureTask.java:266)at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)... 1 moreCaused by: org.apache.flink.util.FlinkRuntimeException: Read split MySqlSnapshotSplit{tableId=BBB.AAA, splitId='BBB.AAA:2', splitKeyType=[spu_product_spu_id DECIMAL(20, 0) NOT NULL], splitStart=[156969], splitEnd=null, highWatermark=null} error due to com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.errors.ConnectException: Snapshotting of table BBB.AAA failed.at com.ververica.cdc.connectors.mysql.debezium.reader.SnapshotSplitReader.checkReadException(SnapshotSplitReader.java:325)at com.ververica.cdc.connectors.mysql.debezium.reader.SnapshotSplitReader.pollSplitRecords(SnapshotSplitReader.java:257)at com.ververica.cdc.connectors.mysql.source.reader.MySqlSplitReader.pollSplitRecords(MySqlSplitReader.java:118)at com.ververica.cdc.connectors.mysql.source.reader.MySqlSplitReader.fetch(MySqlSplitReader.java:80)at org.apache.flink.connector.base.source.reader.fetcher.FetchTask.run(FetchTask.java:58)at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:142)... 6 moreCaused by: io.debezium.DebeziumException: com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.errors.ConnectException: Snapshotting of table BBB.AAA failedat com.ververica.cdc.connectors.mysql.debezium.task.MySqlSnapshotSplitReadTask.execute(MySqlSnapshotSplitReadTask.java:123)at com.ververica.cdc.connectors.mysql.debezium.reader.SnapshotSplitReader.lambda$submitSplit$1(SnapshotSplitReader.java:136)... 3 moreCaused by: com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.errors.ConnectException: Snapshotting of table BBB.AAA failedat com.ververica.cdc.connectors.mysql.debezium.task.MySqlSnapshotSplitReadTask.createDataEventsForTable(MySqlSnapshotSplitReadTask.java:265)at com.ververica.cdc.connectors.mysql.debezium.task.MySqlSnapshotSplitReadTask.createDataEvents(MySqlSnapshotSplitReadTask.java:191)at com.ververica.cdc.connectors.mysql.debezium.task.MySqlSnapshotSplitReadTask.doExecute(MySqlSnapshotSplitReadTask.java:153)at com.ververica.cdc.connectors.mysql.debezium.task.MySqlSnapshotSplitReadTask.execute(MySqlSnapshotSplitReadTask.java:118)... 4 moreCaused by: java.sql.SQLException: Java heap spaceat com.mysql.cj.jdbc.exceptions.SQLError.createSQLException(SQLError.java:129)at com.mysql.cj.jdbc.exceptions.SQLExceptionsMapping.translateException(SQLExceptionsMapping.java:122)at com.mysql.cj.jdbc.ClientPreparedStatement.executeInternal(ClientPreparedStatement.java:953)at com.mysql.cj.jdbc.ClientPreparedStatement.executeQuery(ClientPreparedStatement.java:1009)at com.ververica.cdc.connectors.mysql.debezium.task.MySqlSnapshotSplitReadTask.createDataEventsForTable(MySqlSnapshotSplitReadTask.java:228)... 7 moreCaused by: java.lang.OutOfMemoryError: Java heap space

展开
收起
cuicuicuic 2023-08-02 09:24:55 130 0
1 条回答
写回答
取消 提交回答
  • 北京阿里云ACE会长

    错误是由于Flink CDC任务在同步MySQL表到Kafka时消耗了过多的内存资源导致的。这种情况可能是由于以下原因之一造成的:

    计算资源不足:Flink CDC任务可能分配的计算资源不足以处理大量数据或处理复杂的转换操作。您可以尝试增加任务的计算资源,如内存和CPU,并确保集群配置足够满足任务的需求。

    数据量过大:虽然您提到表的规模不大,但是如果表中包含有较长的字符串字段,可能会导致内存消耗增加。长字符串在内存中占用的空间较大,特别是在进行数据转换和处理时。您可以尝试调整Flink任务的内存配置,增加可用的堆内存大小。

    内存管理配置不当:Flink的内存管理配置可能需要调整以适应任务的需求。您可以检查Flink的内存管理配置参数,如堆内存大小、堆外内存大小、内存分配策略等,并根据实际情况进行调整。

    数据处理算子问题:如果您在Flink任务中使用了自定义的数据处理算子,可能存在内存消耗较高的问题。您可以检查数据处理算子的实现,优化代码逻辑,减少内存占用。

    2023-08-03 21:49:04
    赞同 展开评论 打赏

实时计算Flink版是阿里云提供的全托管Serverless Flink云服务,基于 Apache Flink 构建的企业级、高性能实时大数据处理系统。提供全托管版 Flink 集群和引擎,提高作业开发运维效率。

相关电子书

更多
搭建电商项目架构连接MySQL 立即下载
搭建4层电商项目架构,实战连接MySQL 立即下载
PolarDB MySQL引擎重磅功能及产品能力盛大发布 立即下载

相关镜像