源端oracle 和目标mysql库都没有 file字段:
Caused by: com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.errors.DataException: file is not a valid field name
Flink 1.16.0
cdc 2.3.0 oracle 11g Mysql 5.7
具体日志:
2023-03-14 18:02:29,935 INFO org.apache.flink.runtime.jobmaster.JobMaster [] - 1 tasks will be restarted to recover the failed task 87bf3a73786c58f4d6d3a66b907ef6b5_cbc357ccb763df2852fee8c4fc7d55f2_0_176. 2023-03-14 18:02:29,935 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Job insert-into_default_catalog.default_database.M_MY_TABLE (9c57f249480d040657046b7ad13d34b5) switched from state RUNNING to RESTARTING. 2023-03-14 18:02:29,935 INFO org.apache.flink.runtime.resourcemanager.slotmanager.DeclarativeSlotManager [] - Clearing resource requirements of job 9c57f249480d040657046b7ad13d34b5 2023-03-14 18:02:29,935 INFO org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - Removing registered reader after failure for subtask 0 (#176) of source Source: O_TB_TEST[1]. 2023-03-14 18:02:30,936 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Job insert-into_default_catalog.default_database.M_MY_TABLE (9c57f249480d040657046b7ad13d34b5) switched from state RESTARTING to RUNNING. 2023-03-14 18:02:30,936 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Restoring job 9c57f249480d040657046b7ad13d34b5 from Checkpoint 97 @ 1678788146099 for 9c57f249480d040657046b7ad13d34b5 located at . 2023-03-14 18:02:30,936 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - No master state to restore 2023-03-14 18:02:30,937 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Source: O_TB_TEST[1] -> DropUpdateBefore[2] -> SinkConversion[3] -> Sink: JdbcUpsertTableSink(ID, NAME) (1/1) (87bf3a73786c58f4d6d3a66b907ef6b5_cbc357ccb763df2852fee8c4fc7d55f2_0_177) switched from CREATED to SCHEDULED. 2023-03-14 18:02:30,937 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Source: O_TB_TEST[1] -> DropUpdateBefore[2] -> SinkConversion[3] -> Sink: JdbcUpsertTableSink(ID, NAME) (1/1) (87bf3a73786c58f4d6d3a66b907ef6b5_cbc357ccb763df2852fee8c4fc7d55f2_0_177) switched from SCHEDULED to DEPLOYING. 2023-03-14 18:02:30,937 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Deploying Source: O_TB_TEST[1] -> DropUpdateBefore[2] -> SinkConversion[3] -> Sink: JdbcUpsertTableSink(ID, NAME) (1/1) (attempt #177) with attempt id 87bf3a73786c58f4d6d3a66b907ef6b5_cbc357ccb763df2852fee8c4fc7d55f2_0_177 and vertex id cbc357ccb763df2852fee8c4fc7d55f2_0 to localhost:37022-bfd87b @ localhost (dataPort=44033) with allocation id be462b7f02a3e0e4100f5cfe0d229b65 2023-03-14 18:02:30,937 INFO org.apache.flink.runtime.resourcemanager.slotmanager.DeclarativeSlotManager [] - Received resource requirements from job 9c57f249480d040657046b7ad13d34b5: [ResourceRequirement{resourceProfile=ResourceProfile{UNKNOWN}, numberOfRequiredSlots=1}] 2023-03-14 18:02:30,937 INFO org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - Recovering subtask 0 to checkpoint 97 for source Source: O_TB_TEST[1] to checkpoint. 2023-03-14 18:02:30,943 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Source: O_TB_TEST[1] -> DropUpdateBefore[2] -> SinkConversion[3] -> Sink: JdbcUpsertTableSink(ID, NAME) (1/1) (87bf3a73786c58f4d6d3a66b907ef6b5_cbc357ccb763df2852fee8c4fc7d55f2_0_177) switched from DEPLOYING to INITIALIZING. 2023-03-14 18:02:31,043 INFO org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - Source Source: O_TB_TEST[1] registering reader for parallel task 0 (#177) @ localhost 2023-03-14 18:02:31,062 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Source: O_TB_TEST[1] -> DropUpdateBefore[2] -> SinkConversion[3] -> Sink: JdbcUpsertTableSink(ID, NAME) (1/1) (87bf3a73786c58f4d6d3a66b907ef6b5_cbc357ccb763df2852fee8c4fc7d55f2_0_177) switched from INITIALIZING to RUNNING. 2023-03-14 18:02:33,705 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Source: O_TB_TEST[1] -> DropUpdateBefore[2] -> SinkConversion[3] -> Sink: JdbcUpsertTableSink(ID, NAME) (1/1) (87bf3a73786c58f4d6d3a66b907ef6b5_cbc357ccb763df2852fee8c4fc7d55f2_0_177) switched from RUNNING to FAILED on localhost:37022-bfd87b @ localhost (dataPort=44033). java.lang.RuntimeException: One or more fetchers have encountered exception at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcherManager.checkErrors(SplitFetcherManager.java:225) ~[flink-connector-files-1.16.0.jar:1.16.0] at org.apache.flink.connector.base.source.reader.SourceReaderBase.getNextFetch(SourceReaderBase.java:169) ~[flink-connector-files-1.16.0.jar:1.16.0] at org.apache.flink.connector.base.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:130) ~[flink-connector-files-1.16.0.jar:1.16.0] at org.apache.flink.streaming.api.operators.SourceOperator.emitNext(SourceOperator.java:385) ~[flink-dist-1.16.0.jar:1.16.0] at org.apache.flink.streaming.runtime.io.StreamTaskSourceInput.emitNext(StreamTaskSourceInput.java:68) ~[flink-dist-1.16.0.jar:1.16.0] at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65) ~[flink-dist-1.16.0.jar:1.16.0] at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:542) ~[flink-dist-1.16.0.jar:1.16.0] at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231) ~[flink-dist-1.16.0.jar:1.16.0] at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:831) ~[flink-dist-1.16.0.jar:1.16.0] at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:780) ~[flink-dist-1.16.0.jar:1.16.0] at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:935) ~[flink-dist-1.16.0.jar:1.16.0] at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:914) ~[flink-dist-1.16.0.jar:1.16.0] at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:728) ~[flink-dist-1.16.0.jar:1.16.0] at org.apache.flink.runtime.taskmanager.Task.run(Task.java:550) ~[flink-dist-1.16.0.jar:1.16.0] at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_262] Caused by: java.lang.RuntimeException: SplitFetcher thread 0 received unexpected exception while polling the records at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:150) ~[flink-connector-files-1.16.0.jar:1.16.0] at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.run(SplitFetcher.java:105) ~[flink-connector-files-1.16.0.jar:1.16.0] at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) ~[?:1.8.0_262] at java.util.concurrent.FutureTask.run(FutureTask.java:266) ~[?:1.8.0_262] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) ~[?:1.8.0_262] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ~[?:1.8.0_262] ... 1 more Caused by: com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.errors.ConnectException: An exception occurred in the change event producer. This connector will be stopped. at io.debezium.pipeline.ErrorHandler.setProducerThrowable(ErrorHandler.java:42) ~[flink-sql-connector-mysql-cdc-2.3.0.jar:2.3.0] at io.debezium.connector.oracle.logminer.LogMinerStreamingChangeEventSource.execute(LogMinerStreamingChangeEventSource.java:325) ~[flink-sql-connector-oracle-cdc-2.3.0.jar:2.3.0] at com.ververica.cdc.connectors.oracle.source.reader.fetch.OracleStreamFetchTask$RedoLogSplitReadTask.execute(OracleStreamFetchTask.java:123) ~[flink-sql-connector-oracle-cdc-2.3.0.jar:2.3.0] at com.ververica.cdc.connectors.oracle.source.reader.fetch.OracleStreamFetchTask.execute(OracleStreamFetchTask.java:71) ~[flink-sql-connector-oracle-cdc-2.3.0.jar:2.3.0] at com.ververica.cdc.connectors.base.source.reader.external.IncrementalSourceStreamFetcher.lambda$submitTask$0(IncrementalSourceStreamFetcher.java:86) ~[flink-sql-connector-oracle-cdc-2.3.0.jar:2.3.0] at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) ~[?:1.8.0_262] at java.util.concurrent.FutureTask.run(FutureTask.java:266) ~[?:1.8.0_262] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) ~[?:1.8.0_262] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ~[?:1.8.0_262] ... 1 more Caused by: com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.errors.DataException: file is not a valid field name at com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.data.Struct.lookupField(Struct.java:254) ~[flink-sql-connector-mysql-cdc-2.3.0.jar:2.3.0] at com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.data.Struct.getCheckType(Struct.java:261) ~[flink-sql-connector-mysql-cdc-2.3.0.jar:2.3.0] at com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.data.Struct.getString(Struct.java:158) ~[flink-sql-connector-mysql-cdc-2.3.0.jar:2.3.0] at com.ververica.cdc.connectors.base.relational.JdbcSourceEventDispatcher$SchemaChangeEventReceiver.schemaChangeRecordValue(JdbcSourceEventDispatcher.java:184) ~[flink-sql-connector-oracle-cdc-2.3.0.jar:2.3.0] at com.ververica.cdc.connectors.base.relational.JdbcSourceEventDispatcher$SchemaChangeEventReceiver.schemaChangeEvent(JdbcSourceEventDispatcher.java:214) ~[flink-sql-connector-oracle-cdc-2.3.0.jar:2.3.0] at io.debezium.connector.oracle.OracleSchemaChangeEventEmitter.emitSchemaChangeEvent(OracleSchemaChangeEventEmitter.java:113) ~[flink-sql-connector-oracle-cdc-2.3.0.jar:2.3.0] at com.ververica.cdc.connectors.base.relational.JdbcSourceEventDispatcher.dispatchSchemaChangeEvent(JdbcSourceEventDispatcher.java:143) ~[flink-sql-connector-oracle-cdc-2.3.0.jar:2.3.0] at com.ververica.cdc.connectors.base.relational.JdbcSourceEventDispatcher.dispatchSchemaChangeEvent(JdbcSourceEventDispatcher.java:60) ~[flink-sql-connector-oracle-cdc-2.3.0.jar:2.3.0] at io.debezium.connector.oracle.logminer.LogMinerQueryResultProcessor.dispatchSchemaChangeEventAndGetTableForNewCapturedTable(LogMinerQueryResultProcessor.java:336) ~[flink-sql-connector-oracle-cdc-2.3.0.jar:2.3.0] at io.debezium.connector.oracle.logminer.LogMinerQueryResultProcessor.getTableForDmlEvent(LogMinerQueryResultProcessor.java:323) ~[flink-sql-connector-oracle-cdc-2.3.0.jar:2.3.0] at io.debezium.connector.oracle.logminer.LogMinerQueryResultProcessor.processResult(LogMinerQueryResultProcessor.java:257) ~[flink-sql-connector-oracle-cdc-2.3.0.jar:2.3.0] at io.debezium.connector.oracle.logminer.LogMinerStreamingChangeEventSource.execute(LogMinerStreamingChangeEventSource.java:280) ~[flink-sql-connector-oracle-cdc-2.3.0.jar:2.3.0] at com.ververica.cdc.connectors.oracle.source.reader.fetch.OracleStreamFetchTask$RedoLogSplitReadTask.execute(OracleStreamFetchTask.java:123) ~[flink-sql-connector-oracle-cdc-2.3.0.jar:2.3.0] at com.ververica.cdc.connectors.oracle.source.reader.fetch.OracleStreamFetchTask.execute(OracleStreamFetchTask.java:71) ~[flink-sql-connector-oracle-cdc-2.3.0.jar:2.3.0] at com.ververica.cdc.connectors.base.source.reader.external.IncrementalSourceStreamFetcher.lambda$submitTask$0(IncrementalSourceStreamFetcher.java:86) ~[flink-sql-connector-oracle-cdc-2.3.0.jar:2.3.0] at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) ~[?:1.8.0_262] at java.util.concurrent.FutureTask.run(FutureTask.java:266) ~[?:1.8.0_262] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) ~[?:1.8.0_262] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ~[?:1.8.0_262] ... 1 more
Flink SQL> SET execution.checkpointing.interval = 10s;
Flink SQL> CREATE TABLE O_TB_TEST ( ID STRING, NAME STRING, PRIMARY KEY (ID) NOT ENFORCED ) WITH ( 'connector' = 'oracle-cdc', 'hostname' = 'ip', 'port' = '1521', 'username' = 'username', 'password' = 'pswd', 'database-name' = 'dbname', 'schema-name' = 'schemaname', 'table-name' = 'TB_TEST', 'scan.startup.mode' = 'initial', 'debezium.snapshot.mode' = 'schema_only', 'debezium.log.mining.strategy' = 'online_catalog', 'debezium.log.mining.continuous.mine' = 'true', 'debezium.database.connection.adapter' = 'logminer' );
Flink SQL>
CREATE TABLE M_MY_TABLE (
ID STRING, NAME STRING, PRIMARY KEY (ID) NOT ENFORCED
) WITH (
'connector.type' = 'jdbc', 'connector.driver' = 'com.mysql.jdbc.Driver', 'connector.url' = 'jdbc:mysql://ip/RSTEST',
'connector.table' = 'MY_TABLE',
'connector.username' = 'username',
'connector.password' = 'pswd'
);
Flink SQL> INSERT INTO M_MY_TABLE(ID, NAME) SELECT ID,NAME FROM O_TB_TEST;
问题可能是由于在处理增量数据时,Flink CDC连接器从Oracle数据库中读取了不合法的文件字段。文件字段在Oracle数据库中是不存在的,因此引发了一个数据异常。
为了解决这个问题,您可以尝试以下方法:
某个字段名有问题,检查一下。可以尝试通过修改 Flink CDC Connector 的配置文件,将 "file" 列映射到一个有效的 MySQL 列名。
这个错误通常表示增量数据源中的某个字段名不正确。请检查以下几点:
楼主解决这个问题了么,同样也遇到这个问题, oracle 全量没问题,增量写入的时候,报同样的错误。 flink 1.14 、cdc 2.3.0 、oracle 11g
这个报错信息看起来像是 Flink CDC Connector 在将 Oracle 数据库中的数据同步到 MySQL 数据库时出现了问题,具体原因可能是 Oracle 数据库中存在一个名为 "file" 的列,而这个列名在 MySQL 中不是一个有效的列名。
你可以尝试通过修改 Flink CDC Connector 的配置文件,将 "file" 列映射到一个有效的 MySQL 列名,例如使用 "file_name" 或者 "file_path"代替 "file",这样就可以避免这个问题了。
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。
实时计算Flink版是阿里云提供的全托管Serverless Flink云服务,基于 Apache Flink 构建的企业级、高性能实时大数据处理系统。提供全托管版 Flink 集群和引擎,提高作业开发运维效率。