这个问题困扰我好久了? 查阅资料是debezium的bug么 报错说是 INT64 类型转换错误?加了很多防止解析报错也没用
组件 | 版本 |
---|---|
flink | 13.6 |
oracle-cdc | 2.3.0 |
oracle | 11g |
原表字段:
ID NUMBER(10) not null primary key, NUM NUMBER(10) not null
作业代码:
CREATE TABLE ac_paymentbooks_cdc(
ID DECIMAL(10) ,
NUM DECIMAL(10) ,
。。。
PRIMARY KEY (ID) NOT ENFORCED
) WITH (
'connector' = 'oracle-cdc',
'hostname' = 'xxx',
'port' = 'xxx',
'username' = 'xxx',
'password' = 'xxx',
'database-name' = 'xxx',
'schema-name' = 'xxx',
'table-name' = 'xxx',
'debezium.log.mining.strategy' = 'online_catalog',
'debezium.log.mining.continuous.mine' = 'true',
'debezium.database.tablename.case.insensitive' = 'false',
'debezium.decimal.handling.mode' = 'double',
'debezium.event.processing.failure.handling.mode' = 'skip',
'debezium.json.debezium.ignore-parse-errors' = 'true',
'debezium.database.history.kafka.topic' = 'test1',
'scan.startup.mode' = 'latest-offset'
);
完整的异常栈
Caused by: java.lang.RuntimeException: SplitFetcher thread 0 received unexpected exception while polling the records
at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:148)
at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.run(SplitFetcher.java:103)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
... 1 more
Caused by: com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.errors.ConnectException: An exception occurred in the change event producer. This connector will be stopped.
at io.debezium.pipeline.ErrorHandler.setProducerThrowable(ErrorHandler.java:42)
at io.debezium.connector.oracle.logminer.LogMinerStreamingChangeEventSource.execute(LogMinerStreamingChangeEventSource.java:325)
at com.ververica.cdc.connectors.oracle.source.reader.fetch.OracleStreamFetchTask$RedoLogSplitReadTask.execute(OracleStreamFetchTask.java:123)
at com.ververica.cdc.connectors.oracle.source.reader.fetch.OracleStreamFetchTask.execute(OracleStreamFetchTask.java:71)
at com.ververica.cdc.connectors.base.source.reader.external.IncrementalSourceStreamFetcher.lambda$submitTask$0(IncrementalSourceStreamFetcher.java:86)
... 5 more
Caused by: com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.errors.SchemaBuilderException: Invalid default value
at com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.data.SchemaBuilder.defaultValue(SchemaBuilder.java:131)
at io.debezium.relational.TableSchemaBuilder.addField(TableSchemaBuilder.java:374)
at io.debezium.relational.TableSchemaBuilder.lambda$create$2(TableSchemaBuilder.java:119)
at java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:183)
at java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:175)
at java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1384)
at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
at java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:150)
at java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:173)
at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
at java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:485)
at io.debezium.relational.TableSchemaBuilder.create(TableSchemaBuilder.java:117)
at io.debezium.relational.RelationalDatabaseSchema.buildAndRegisterSchema(RelationalDatabaseSchema.java:130)
at io.debezium.connector.oracle.OracleDatabaseSchema.lambda$applySchemaChange$0(OracleDatabaseSchema.java:73)
at java.lang.Iterable.forEach(Iterable.java:75)
at io.debezium.connector.oracle.OracleDatabaseSchema.applySchemaChange(OracleDatabaseSchema.java:72)
at com.ververica.cdc.connectors.base.relational.JdbcSourceEventDispatcher$SchemaChangeEventReceiver.schemaChangeEvent(JdbcSourceEventDispatcher.java:208)
at io.debezium.connector.oracle.OracleSchemaChangeEventEmitter.emitSchemaChangeEvent(OracleSchemaChangeEventEmitter.java:113)
at com.ververica.cdc.connectors.base.relational.JdbcSourceEventDispatcher.dispatchSchemaChangeEvent(JdbcSourceEventDispatcher.java:143)
at com.ververica.cdc.connectors.base.relational.JdbcSourceEventDispatcher.dispatchSchemaChangeEvent(JdbcSourceEventDispatcher.java:60)
at io.debezium.connector.oracle.logminer.LogMinerQueryResultProcessor.dispatchSchemaChangeEventAndGetTableForNewCapturedTable(LogMinerQueryResultProcessor.java:336)
at io.debezium.connector.oracle.logminer.LogMinerQueryResultProcessor.getTableForDmlEvent(LogMinerQueryResultProcessor.java:323)
at io.debezium.connector.oracle.logminer.LogMinerQueryResultProcessor.processResult(LogMinerQueryResultProcessor.java:257)
at io.debezium.connector.oracle.logminer.LogMinerStreamingChangeEventSource.execute(LogMinerStreamingChangeEventSource.java:280)
... 8 more
Caused by: com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.errors.DataException: Invalid Java object for schema type INT64: class java.lang.String for field: "null"
at com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.data.ConnectSchema.validateValue(ConnectSchema.java:245)
at com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.data.ConnectSchema.validateValue(ConnectSchema.java:213)
at com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.data.SchemaBuilder.defaultValue(SchemaBuilder.java:129)
... 32 more
可能是由于数据类型的不匹配所致。你可以尝试下面这种方法来进行解决:
在 Debezium 的配置文件中启用 schema.include.list 参数,用于限定解析数据的表。
在 Flink 中使用 Cast 函数将 INT64 类型转换为 LONG 类型,或者将 LONG 类型转换为 INT 类型。
例如:
SELECT CAST(column_name AS LONG) FROM table_name;
或者
SELECT CAST(column_name AS INT) FROM table_name;
如果以上方法仍然无法解决问题,建议你查看 Debezium 和 Flink 的详细日志,分析具体出现问题的步骤,进而找到问题的根本原因并解决。
这个错误信息是 Flink 中就某个 SplitFetcher 线程的异常信息,表明该线程在从数据源中轮询数据时遇到了无法预期的异常。
SplitFetcher 负责从数据源中读取数据,并将读取到的数据传递给相应的 SourceReader,而这个异常信息可能与数据读取和处理相关。下面是一些可能导致这个错误的原因和对应的解决方法:
如果数据源中的数据出现了格式错误,可能会导致 SplitFetcher 无法正确地处理这些数据。可以检查数据源中的数据格式是否符合预期,并尝试修改数据源中的数据格式使其符合要求。同时,也可以在相关的代码中加入容错处理,避免出现异常导致程序崩溃。
网络连接问题也可能导致 SplitFetcher 出现异常。可以检查网络连接是否正常,确认 Flink 与数据源之间的通信是否受到了任何限制或干扰,例如防火墙、路由器等。
错误的应用程序配置也可能导致 SplitFetcher 出现异常,如网络连接超时时间过短、内存或线程池分配不足等。可以检查应用程序配置,确认其与数据源和实际负载相符,并进行必要调整。
如果数据源中存在过多异常数据,可能会导致 SplitFetcher 出现异常。可以检查数据源中的数据,确认其中是否存在异常数据,并将异常数据过滤或进行特殊处理。
Flink 版本与其他依赖项的版本不一致可能也导致 SplitFetcher 出现异常。可以检查 Flink 以及相应依赖项的版本,并进行必要修改或升级。
综上所述,以上是 SplitFetcher 线程出现异常的一些常见原因和解决方法。需要根据具体的错误信息和应用场景进行分析和诊断,从而找到最合适的解决方法。
根据提供的异常信息,问题可能是由于DECIMAL类型与INT64类型之间的转换引起的。在的作业代码中,可能存在以下情况:
要解决这个问题,可以尝试以下方法:
OracleSourceFunction
中添加DecimalTypeInfo.of(precision, scale)
来指定DECIMAL类型的精度和长度。如果仍然遇到问题,请检查连接字符串是否正确,并确保具有访问Oracle数据库的权限。如果问题仍然存在,请考虑查看Flink的日志文件以获取更多详细信息。
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。
实时计算Flink版是阿里云提供的全托管Serverless Flink云服务,基于 Apache Flink 构建的企业级、高性能实时大数据处理系统。提供全托管版 Flink 集群和引擎,提高作业开发运维效率。