开发者社区 > 大数据与机器学习 > 实时计算 Flink > 正文

Flink CDC全量没问题,增量报错 file is not a valid field name

源端oracle 和目标mysql库都没有 file字段:

Caused by: com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.errors.DataException: file is not a valid field name

Flink 1.16.0
cdc 2.3.0 oracle 11g Mysql 5.7

具体日志:

2023-03-14 18:02:29,935 INFO org.apache.flink.runtime.jobmaster.JobMaster [] - 1 tasks will be restarted to recover the failed task 87bf3a73786c58f4d6d3a66b907ef6b5_cbc357ccb763df2852fee8c4fc7d55f2_0_176. 2023-03-14 18:02:29,935 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Job insert-into_default_catalog.default_database.M_MY_TABLE (9c57f249480d040657046b7ad13d34b5) switched from state RUNNING to RESTARTING. 2023-03-14 18:02:29,935 INFO org.apache.flink.runtime.resourcemanager.slotmanager.DeclarativeSlotManager [] - Clearing resource requirements of job 9c57f249480d040657046b7ad13d34b5 2023-03-14 18:02:29,935 INFO org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - Removing registered reader after failure for subtask 0 (#176) of source Source: O_TB_TEST[1]. 2023-03-14 18:02:30,936 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Job insert-into_default_catalog.default_database.M_MY_TABLE (9c57f249480d040657046b7ad13d34b5) switched from state RESTARTING to RUNNING. 2023-03-14 18:02:30,936 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Restoring job 9c57f249480d040657046b7ad13d34b5 from Checkpoint 97 @ 1678788146099 for 9c57f249480d040657046b7ad13d34b5 located at . 2023-03-14 18:02:30,936 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - No master state to restore 2023-03-14 18:02:30,937 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Source: O_TB_TEST[1] -> DropUpdateBefore[2] -> SinkConversion[3] -> Sink: JdbcUpsertTableSink(ID, NAME) (1/1) (87bf3a73786c58f4d6d3a66b907ef6b5_cbc357ccb763df2852fee8c4fc7d55f2_0_177) switched from CREATED to SCHEDULED. 2023-03-14 18:02:30,937 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Source: O_TB_TEST[1] -> DropUpdateBefore[2] -> SinkConversion[3] -> Sink: JdbcUpsertTableSink(ID, NAME) (1/1) (87bf3a73786c58f4d6d3a66b907ef6b5_cbc357ccb763df2852fee8c4fc7d55f2_0_177) switched from SCHEDULED to DEPLOYING. 2023-03-14 18:02:30,937 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Deploying Source: O_TB_TEST[1] -> DropUpdateBefore[2] -> SinkConversion[3] -> Sink: JdbcUpsertTableSink(ID, NAME) (1/1) (attempt #177) with attempt id 87bf3a73786c58f4d6d3a66b907ef6b5_cbc357ccb763df2852fee8c4fc7d55f2_0_177 and vertex id cbc357ccb763df2852fee8c4fc7d55f2_0 to localhost:37022-bfd87b @ localhost (dataPort=44033) with allocation id be462b7f02a3e0e4100f5cfe0d229b65 2023-03-14 18:02:30,937 INFO org.apache.flink.runtime.resourcemanager.slotmanager.DeclarativeSlotManager [] - Received resource requirements from job 9c57f249480d040657046b7ad13d34b5: [ResourceRequirement{resourceProfile=ResourceProfile{UNKNOWN}, numberOfRequiredSlots=1}] 2023-03-14 18:02:30,937 INFO org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - Recovering subtask 0 to checkpoint 97 for source Source: O_TB_TEST[1] to checkpoint. 2023-03-14 18:02:30,943 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Source: O_TB_TEST[1] -> DropUpdateBefore[2] -> SinkConversion[3] -> Sink: JdbcUpsertTableSink(ID, NAME) (1/1) (87bf3a73786c58f4d6d3a66b907ef6b5_cbc357ccb763df2852fee8c4fc7d55f2_0_177) switched from DEPLOYING to INITIALIZING. 2023-03-14 18:02:31,043 INFO org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - Source Source: O_TB_TEST[1] registering reader for parallel task 0 (#177) @ localhost 2023-03-14 18:02:31,062 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Source: O_TB_TEST[1] -> DropUpdateBefore[2] -> SinkConversion[3] -> Sink: JdbcUpsertTableSink(ID, NAME) (1/1) (87bf3a73786c58f4d6d3a66b907ef6b5_cbc357ccb763df2852fee8c4fc7d55f2_0_177) switched from INITIALIZING to RUNNING. 2023-03-14 18:02:33,705 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Source: O_TB_TEST[1] -> DropUpdateBefore[2] -> SinkConversion[3] -> Sink: JdbcUpsertTableSink(ID, NAME) (1/1) (87bf3a73786c58f4d6d3a66b907ef6b5_cbc357ccb763df2852fee8c4fc7d55f2_0_177) switched from RUNNING to FAILED on localhost:37022-bfd87b @ localhost (dataPort=44033). java.lang.RuntimeException: One or more fetchers have encountered exception at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcherManager.checkErrors(SplitFetcherManager.java:225) ~[flink-connector-files-1.16.0.jar:1.16.0] at org.apache.flink.connector.base.source.reader.SourceReaderBase.getNextFetch(SourceReaderBase.java:169) ~[flink-connector-files-1.16.0.jar:1.16.0] at org.apache.flink.connector.base.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:130) ~[flink-connector-files-1.16.0.jar:1.16.0] at org.apache.flink.streaming.api.operators.SourceOperator.emitNext(SourceOperator.java:385) ~[flink-dist-1.16.0.jar:1.16.0] at org.apache.flink.streaming.runtime.io.StreamTaskSourceInput.emitNext(StreamTaskSourceInput.java:68) ~[flink-dist-1.16.0.jar:1.16.0] at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65) ~[flink-dist-1.16.0.jar:1.16.0] at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:542) ~[flink-dist-1.16.0.jar:1.16.0] at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231) ~[flink-dist-1.16.0.jar:1.16.0] at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:831) ~[flink-dist-1.16.0.jar:1.16.0] at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:780) ~[flink-dist-1.16.0.jar:1.16.0] at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:935) ~[flink-dist-1.16.0.jar:1.16.0] at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:914) ~[flink-dist-1.16.0.jar:1.16.0] at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:728) ~[flink-dist-1.16.0.jar:1.16.0] at org.apache.flink.runtime.taskmanager.Task.run(Task.java:550) ~[flink-dist-1.16.0.jar:1.16.0] at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_262] Caused by: java.lang.RuntimeException: SplitFetcher thread 0 received unexpected exception while polling the records at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:150) ~[flink-connector-files-1.16.0.jar:1.16.0] at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.run(SplitFetcher.java:105) ~[flink-connector-files-1.16.0.jar:1.16.0] at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) ~[?:1.8.0_262] at java.util.concurrent.FutureTask.run(FutureTask.java:266) ~[?:1.8.0_262] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) ~[?:1.8.0_262] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ~[?:1.8.0_262] ... 1 more Caused by: com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.errors.ConnectException: An exception occurred in the change event producer. This connector will be stopped. at io.debezium.pipeline.ErrorHandler.setProducerThrowable(ErrorHandler.java:42) ~[flink-sql-connector-mysql-cdc-2.3.0.jar:2.3.0] at io.debezium.connector.oracle.logminer.LogMinerStreamingChangeEventSource.execute(LogMinerStreamingChangeEventSource.java:325) ~[flink-sql-connector-oracle-cdc-2.3.0.jar:2.3.0] at com.ververica.cdc.connectors.oracle.source.reader.fetch.OracleStreamFetchTask$RedoLogSplitReadTask.execute(OracleStreamFetchTask.java:123) ~[flink-sql-connector-oracle-cdc-2.3.0.jar:2.3.0] at com.ververica.cdc.connectors.oracle.source.reader.fetch.OracleStreamFetchTask.execute(OracleStreamFetchTask.java:71) ~[flink-sql-connector-oracle-cdc-2.3.0.jar:2.3.0] at com.ververica.cdc.connectors.base.source.reader.external.IncrementalSourceStreamFetcher.lambda$submitTask$0(IncrementalSourceStreamFetcher.java:86) ~[flink-sql-connector-oracle-cdc-2.3.0.jar:2.3.0] at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) ~[?:1.8.0_262] at java.util.concurrent.FutureTask.run(FutureTask.java:266) ~[?:1.8.0_262] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) ~[?:1.8.0_262] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ~[?:1.8.0_262] ... 1 more Caused by: com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.errors.DataException: file is not a valid field name at com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.data.Struct.lookupField(Struct.java:254) ~[flink-sql-connector-mysql-cdc-2.3.0.jar:2.3.0] at com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.data.Struct.getCheckType(Struct.java:261) ~[flink-sql-connector-mysql-cdc-2.3.0.jar:2.3.0] at com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.data.Struct.getString(Struct.java:158) ~[flink-sql-connector-mysql-cdc-2.3.0.jar:2.3.0] at com.ververica.cdc.connectors.base.relational.JdbcSourceEventDispatcher$SchemaChangeEventReceiver.schemaChangeRecordValue(JdbcSourceEventDispatcher.java:184) ~[flink-sql-connector-oracle-cdc-2.3.0.jar:2.3.0] at com.ververica.cdc.connectors.base.relational.JdbcSourceEventDispatcher$SchemaChangeEventReceiver.schemaChangeEvent(JdbcSourceEventDispatcher.java:214) ~[flink-sql-connector-oracle-cdc-2.3.0.jar:2.3.0] at io.debezium.connector.oracle.OracleSchemaChangeEventEmitter.emitSchemaChangeEvent(OracleSchemaChangeEventEmitter.java:113) ~[flink-sql-connector-oracle-cdc-2.3.0.jar:2.3.0] at com.ververica.cdc.connectors.base.relational.JdbcSourceEventDispatcher.dispatchSchemaChangeEvent(JdbcSourceEventDispatcher.java:143) ~[flink-sql-connector-oracle-cdc-2.3.0.jar:2.3.0] at com.ververica.cdc.connectors.base.relational.JdbcSourceEventDispatcher.dispatchSchemaChangeEvent(JdbcSourceEventDispatcher.java:60) ~[flink-sql-connector-oracle-cdc-2.3.0.jar:2.3.0] at io.debezium.connector.oracle.logminer.LogMinerQueryResultProcessor.dispatchSchemaChangeEventAndGetTableForNewCapturedTable(LogMinerQueryResultProcessor.java:336) ~[flink-sql-connector-oracle-cdc-2.3.0.jar:2.3.0] at io.debezium.connector.oracle.logminer.LogMinerQueryResultProcessor.getTableForDmlEvent(LogMinerQueryResultProcessor.java:323) ~[flink-sql-connector-oracle-cdc-2.3.0.jar:2.3.0] at io.debezium.connector.oracle.logminer.LogMinerQueryResultProcessor.processResult(LogMinerQueryResultProcessor.java:257) ~[flink-sql-connector-oracle-cdc-2.3.0.jar:2.3.0] at io.debezium.connector.oracle.logminer.LogMinerStreamingChangeEventSource.execute(LogMinerStreamingChangeEventSource.java:280) ~[flink-sql-connector-oracle-cdc-2.3.0.jar:2.3.0] at com.ververica.cdc.connectors.oracle.source.reader.fetch.OracleStreamFetchTask$RedoLogSplitReadTask.execute(OracleStreamFetchTask.java:123) ~[flink-sql-connector-oracle-cdc-2.3.0.jar:2.3.0] at com.ververica.cdc.connectors.oracle.source.reader.fetch.OracleStreamFetchTask.execute(OracleStreamFetchTask.java:71) ~[flink-sql-connector-oracle-cdc-2.3.0.jar:2.3.0] at com.ververica.cdc.connectors.base.source.reader.external.IncrementalSourceStreamFetcher.lambda$submitTask$0(IncrementalSourceStreamFetcher.java:86) ~[flink-sql-connector-oracle-cdc-2.3.0.jar:2.3.0] at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) ~[?:1.8.0_262] at java.util.concurrent.FutureTask.run(FutureTask.java:266) ~[?:1.8.0_262] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) ~[?:1.8.0_262] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ~[?:1.8.0_262] ... 1 more

Flink SQL> SET execution.checkpointing.interval = 10s;

Flink SQL> CREATE TABLE O_TB_TEST ( ID STRING, NAME STRING, PRIMARY KEY (ID) NOT ENFORCED ) WITH ( 'connector' = 'oracle-cdc', 'hostname' = 'ip', 'port' = '1521', 'username' = 'username', 'password' = 'pswd', 'database-name' = 'dbname', 'schema-name' = 'schemaname', 'table-name' = 'TB_TEST', 'scan.startup.mode' = 'initial', 'debezium.snapshot.mode' = 'schema_only', 'debezium.log.mining.strategy' = 'online_catalog', 'debezium.log.mining.continuous.mine' = 'true', 'debezium.database.connection.adapter' = 'logminer' );

Flink SQL>
CREATE TABLE M_MY_TABLE (
ID STRING, NAME STRING, PRIMARY KEY (ID) NOT ENFORCED
) WITH (
'connector.type' = 'jdbc', 'connector.driver' = 'com.mysql.jdbc.Driver', 'connector.url' = 'jdbc:mysql://ip/RSTEST',
'connector.table' = 'MY_TABLE',
'connector.username' = 'username',
'connector.password' = 'pswd'
);

Flink SQL> INSERT INTO M_MY_TABLE(ID, NAME) SELECT ID,NAME FROM O_TB_TEST;

展开
收起
2336uk34dpiqc 2023-03-14 18:15:16 11667 1
7 条回答
写回答
取消 提交回答
  • 北京阿里云ACE会长

    问题可能是由于在处理增量数据时,Flink CDC连接器从Oracle数据库中读取了不合法的文件字段。文件字段在Oracle数据库中是不存在的,因此引发了一个数据异常。
    为了解决这个问题,您可以尝试以下方法:

    1. 检查O_TB_TEST表结构,确保没有名为"file"的字段。如果有,请删除或修改该字段。
    2. 如果O_TB_TEST表结构中确实没有名为"file"的字段,请检查Flink CDC连接器的配置。在您的配置中,您使用了debezium.log.mining.strategy = 'online_catalog',debezium.log.mining.continuous.mine = 'true'。这表示连接器将尝试从Oracle数据库的在线目录中读取增量数据。请确保您的配置正确地指定了Oracle数据库的在线目录。
    3. 如果问题仍然存在,您可以尝试将debezium.log.mining.strategy更改为debezium.log.mining.strategy = 'change_data_capture'。这将要求连接器从Oracle数据库的变更数据捕获中读取增量数据。这种方法可能会减少不合法数据引发的问题。
    2024-01-05 09:03:00
    赞同 展开评论 打赏
  • 企业微信截图_16893192292780.png

    2023-07-14 15:28:31
    赞同 展开评论 打赏
  • 某个字段名有问题,检查一下。可以尝试通过修改 Flink CDC Connector 的配置文件,将 "file" 列映射到一个有效的 MySQL 列名。

    2023-06-30 17:08:34
    赞同 展开评论 打赏
  • CSDN全栈领域优质创作者,万粉博主;InfoQ签约博主;华为云享专家;华为Iot专家;亚马逊人工智能自动驾驶(大众组)吉尼斯世界纪录获得者

    这个错误通常表示增量数据源中的某个字段名不正确。请检查以下几点:

    1. 确认你的增量数据源中是否存在一个名为“file”的字段,并且该字段类型为字符串类型。
    2. 确认你的 Flink CDC 配置文件中是否正确指定了增量数据源的路径和文件格式。例如,如果你使用 Kafka 作为增量数据源,则需要在配置文件中指定 Kafka 主题和分区等信息。
    3. 确认你的 Flink CDC 配置文件中是否正确指定了全量数据源的路径和文件格式。如果全量数据源使用的是不同的文件格式或路径,则需要在增量部分中手动指定对应的文件格式和路径。
    4. 如果以上步骤都没有解决问题,可以尝试在 Flink CDC 中启用调试模式,查看具体的错误信息和堆栈跟踪,以便更好地定位问题所在。
    2023-05-20 14:10:45
    赞同 展开评论 打赏
  • 楼主解决这个问题了么,同样也遇到这个问题, oracle 全量没问题,增量写入的时候,报同样的错误。 flink 1.14 、cdc 2.3.0 、oracle 11g

    2023-03-27 10:50:13
    赞同 展开评论 打赏
  • 随心分享,欢迎友善交流讨论:)

    这个报错信息看起来像是 Flink CDC Connector 在将 Oracle 数据库中的数据同步到 MySQL 数据库时出现了问题,具体原因可能是 Oracle 数据库中存在一个名为 "file" 的列,而这个列名在 MySQL 中不是一个有效的列名。

    你可以尝试通过修改 Flink CDC Connector 的配置文件,将 "file" 列映射到一个有效的 MySQL 列名,例如使用 "file_name" 或者 "file_path"代替 "file",这样就可以避免这个问题了。

    2023-03-16 18:07:55
    赞同 展开评论 打赏
  • 从Oracle 同步数据到Mysql 启动作业数据全量同步成功,后续增删改出现上述错误。

    2023-03-14 18:17:21
    赞同 展开评论 打赏
滑动查看更多

实时计算Flink版是阿里云提供的全托管Serverless Flink云服务,基于 Apache Flink 构建的企业级、高性能实时大数据处理系统。提供全托管版 Flink 集群和引擎,提高作业开发运维效率。

相关产品

  • 实时计算 Flink版
  • 相关电子书

    更多
    Flink CDC Meetup PPT - 龚中强 立即下载
    Flink CDC Meetup PPT - 王赫 立即下载
    Flink CDC Meetup PPT - 覃立辉 立即下载