开发者社区 > 大数据与机器学习 > 实时计算 Flink > 正文

报错class java.lang.String for field: "null"

这个问题困扰我好久了? 查阅资料是debezium的bug么 报错说是 INT64 类型转换错误?加了很多防止解析报错也没用

组件版本
flink13.6
oracle-cdc2.3.0
oracle11g

原表字段:

ID NUMBER(10) not null primary key, NUM NUMBER(10) not null

作业代码:

CREATE TABLE ac_paymentbooks_cdc(
    ID  DECIMAL(10)   , 
    NUM   DECIMAL(10)  , 
     。。。
    PRIMARY KEY (ID) NOT ENFORCED
) WITH (
'connector' = 'oracle-cdc',
'hostname' = 'xxx',
'port' = 'xxx',
'username' = 'xxx',
'password' = 'xxx',
'database-name' = 'xxx',
'schema-name' = 'xxx',
'table-name' = 'xxx',
'debezium.log.mining.strategy' = 'online_catalog',
'debezium.log.mining.continuous.mine' = 'true',
'debezium.database.tablename.case.insensitive' = 'false',
'debezium.decimal.handling.mode' = 'double',
'debezium.event.processing.failure.handling.mode' = 'skip',
'debezium.json.debezium.ignore-parse-errors' = 'true',
'debezium.database.history.kafka.topic' = 'test1',
'scan.startup.mode' = 'latest-offset'
);

完整的异常栈

Caused by: java.lang.RuntimeException: SplitFetcher thread 0 received unexpected exception while polling the records
	at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:148)
	at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.run(SplitFetcher.java:103)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	... 1 more
Caused by: com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.errors.ConnectException: An exception occurred in the change event producer. This connector will be stopped.
	at io.debezium.pipeline.ErrorHandler.setProducerThrowable(ErrorHandler.java:42)
	at io.debezium.connector.oracle.logminer.LogMinerStreamingChangeEventSource.execute(LogMinerStreamingChangeEventSource.java:325)
	at com.ververica.cdc.connectors.oracle.source.reader.fetch.OracleStreamFetchTask$RedoLogSplitReadTask.execute(OracleStreamFetchTask.java:123)
	at com.ververica.cdc.connectors.oracle.source.reader.fetch.OracleStreamFetchTask.execute(OracleStreamFetchTask.java:71)
	at com.ververica.cdc.connectors.base.source.reader.external.IncrementalSourceStreamFetcher.lambda$submitTask$0(IncrementalSourceStreamFetcher.java:86)
	... 5 more
Caused by: com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.errors.SchemaBuilderException: Invalid default value
	at com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.data.SchemaBuilder.defaultValue(SchemaBuilder.java:131)
	at io.debezium.relational.TableSchemaBuilder.addField(TableSchemaBuilder.java:374)
	at io.debezium.relational.TableSchemaBuilder.lambda$create$2(TableSchemaBuilder.java:119)
	at java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:183)
	at java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:175)
	at java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1384)
	at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
	at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
	at java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:150)
	at java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:173)
	at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
	at java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:485)
	at io.debezium.relational.TableSchemaBuilder.create(TableSchemaBuilder.java:117)
	at io.debezium.relational.RelationalDatabaseSchema.buildAndRegisterSchema(RelationalDatabaseSchema.java:130)
	at io.debezium.connector.oracle.OracleDatabaseSchema.lambda$applySchemaChange$0(OracleDatabaseSchema.java:73)
	at java.lang.Iterable.forEach(Iterable.java:75)
	at io.debezium.connector.oracle.OracleDatabaseSchema.applySchemaChange(OracleDatabaseSchema.java:72)
	at com.ververica.cdc.connectors.base.relational.JdbcSourceEventDispatcher$SchemaChangeEventReceiver.schemaChangeEvent(JdbcSourceEventDispatcher.java:208)
	at io.debezium.connector.oracle.OracleSchemaChangeEventEmitter.emitSchemaChangeEvent(OracleSchemaChangeEventEmitter.java:113)
	at com.ververica.cdc.connectors.base.relational.JdbcSourceEventDispatcher.dispatchSchemaChangeEvent(JdbcSourceEventDispatcher.java:143)
	at com.ververica.cdc.connectors.base.relational.JdbcSourceEventDispatcher.dispatchSchemaChangeEvent(JdbcSourceEventDispatcher.java:60)
	at io.debezium.connector.oracle.logminer.LogMinerQueryResultProcessor.dispatchSchemaChangeEventAndGetTableForNewCapturedTable(LogMinerQueryResultProcessor.java:336)
	at io.debezium.connector.oracle.logminer.LogMinerQueryResultProcessor.getTableForDmlEvent(LogMinerQueryResultProcessor.java:323)
	at io.debezium.connector.oracle.logminer.LogMinerQueryResultProcessor.processResult(LogMinerQueryResultProcessor.java:257)
	at io.debezium.connector.oracle.logminer.LogMinerStreamingChangeEventSource.execute(LogMinerStreamingChangeEventSource.java:280)
	... 8 more
Caused by: com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.errors.DataException: Invalid Java object for schema type INT64: class java.lang.String for field: "null"
	at com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.data.ConnectSchema.validateValue(ConnectSchema.java:245)
	at com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.data.ConnectSchema.validateValue(ConnectSchema.java:213)
	at com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.data.SchemaBuilder.defaultValue(SchemaBuilder.java:129)
	... 32 more

展开
收起
游客pth5k7lgmogc4 2023-05-15 10:17:11 337 0
3 条回答
写回答
取消 提交回答
  • 公众号:网络技术联盟站,InfoQ签约作者,阿里云社区签约作者,华为云 云享专家,BOSS直聘 创作王者,腾讯课堂创作领航员,博客+论坛:https://www.wljslmz.cn,工程师导航:https://www.wljslmz.com

    可能是由于数据类型的不匹配所致。你可以尝试下面这种方法来进行解决:

    在 Debezium 的配置文件中启用 schema.include.list 参数,用于限定解析数据的表。

    在 Flink 中使用 Cast 函数将 INT64 类型转换为 LONG 类型,或者将 LONG 类型转换为 INT 类型。

    例如:

    SELECT CAST(column_name AS LONG) FROM table_name;
    

    或者

    SELECT CAST(column_name AS INT) FROM table_name;
    

    如果以上方法仍然无法解决问题,建议你查看 Debezium 和 Flink 的详细日志,分析具体出现问题的步骤,进而找到问题的根本原因并解决。

    2023-05-23 12:08:38
    赞同 展开评论 打赏
  • 这个错误信息是 Flink 中就某个 SplitFetcher 线程的异常信息,表明该线程在从数据源中轮询数据时遇到了无法预期的异常。

    SplitFetcher 负责从数据源中读取数据,并将读取到的数据传递给相应的 SourceReader,而这个异常信息可能与数据读取和处理相关。下面是一些可能导致这个错误的原因和对应的解决方法:

    1. 数据源中的数据格式错误

    如果数据源中的数据出现了格式错误,可能会导致 SplitFetcher 无法正确地处理这些数据。可以检查数据源中的数据格式是否符合预期,并尝试修改数据源中的数据格式使其符合要求。同时,也可以在相关的代码中加入容错处理,避免出现异常导致程序崩溃。

    1. 网络连接问题

    网络连接问题也可能导致 SplitFetcher 出现异常。可以检查网络连接是否正常,确认 Flink 与数据源之间的通信是否受到了任何限制或干扰,例如防火墙、路由器等。

    1. 应用程序配置不正确

    错误的应用程序配置也可能导致 SplitFetcher 出现异常,如网络连接超时时间过短、内存或线程池分配不足等。可以检查应用程序配置,确认其与数据源和实际负载相符,并进行必要调整。

    1. 数据源中存在过多异常数据

    如果数据源中存在过多异常数据,可能会导致 SplitFetcher 出现异常。可以检查数据源中的数据,确认其中是否存在异常数据,并将异常数据过滤或进行特殊处理。

    1. Flink 版本兼容性问题

    Flink 版本与其他依赖项的版本不一致可能也导致 SplitFetcher 出现异常。可以检查 Flink 以及相应依赖项的版本,并进行必要修改或升级。

    综上所述,以上是 SplitFetcher 线程出现异常的一些常见原因和解决方法。需要根据具体的错误信息和应用场景进行分析和诊断,从而找到最合适的解决方法。

    2023-05-15 11:36:04
    赞同 展开评论 打赏
  • 存在即是合理

    根据提供的异常信息,问题可能是由于DECIMAL类型与INT64类型之间的转换引起的。在的作业代码中,可能存在以下情况:

    1. 原始表中的某些字段是INT64类型,但在创建数据库表时,使用了DECIMAL类型作为这些字段的数据类型。
    2. 在Flink的connector配置中,没有正确设置DECIMAL类型的处理方式,导致数据被错误地解释为INT64类型。

    要解决这个问题,可以尝试以下方法:

    1. 在创建数据库表时,将所有INT64类型的字段更改为DECIMAL类型。这样可以确保在数据源和目标系统之间保持一致的数据类型。
    2. 在Flink的connector配置中,正确设置DECIMAL类型的处理方式。例如,在OracleSourceFunction中添加DecimalTypeInfo.of(precision, scale)来指定DECIMAL类型的精度和长度。

    如果仍然遇到问题,请检查连接字符串是否正确,并确保具有访问Oracle数据库的权限。如果问题仍然存在,请考虑查看Flink的日志文件以获取更多详细信息。

    2023-05-15 11:31:38
    赞同 展开评论 打赏

实时计算Flink版是阿里云提供的全托管Serverless Flink云服务,基于 Apache Flink 构建的企业级、高性能实时大数据处理系统。提供全托管版 Flink 集群和引擎,提高作业开发运维效率。

相关电子书

更多
Spring Cloud Alibaba - 重新定义 Java Cloud-Native 立即下载
The Reactive Cloud Native Arch 立即下载
JAVA开发手册1.5.0 立即下载