Flink CDC 程序在本地执行没有任何报错 数据也在正常进表 但是提交到集群执行就给我报了个org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: Could not forward element to next operator
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:88) ~[ywwlflink-1.0-SNAPSHOT-jar-with-dependencies.jar:?]
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46) ~[ywwlflink-1.0-SNAPSHOT-jar-with-dependencies.jar:?]
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26) ~[ywwlflink-1.0-SNAPSHOT-jar-with-dependencies.jar:?]
at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50) ~[ywwlflink-1.0-SNAPSHOT-jar-with-dependencies.jar:?]
at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28) ~[ywwlflink-1.0-SNAPSHOT-jar-with-dependencies.jar:?]
at org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollectWithTimestamp(StreamSourceContexts.java:322) ~[ywwlflink-1.0-SNAPSHOT-jar-with-dependencies.jar:?]
at org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collectWithTimestamp(StreamSourceContexts.java:426) ~[ywwlflink-1.0-SNAPSHOT-jar-with-dependencies.jar:?]
at com.aliyun.openservices.log.flink.model.LogDataFetcher.emitRecordAndUpdateState(LogDataFetcher.java:357) ~[ywwlflink-1.0-SNAPSHOT-jar-with-dependencies.jar:?]
at com.aliyun.openservices.log.flink.model.ShardConsumer.processRecordsAndSaveOffset(ShardConsumer.java:242) ~[ywwlflink-1.0-SNAPSHOT-jar-with-dependencies.jar:?]
at com.aliyun.openservices.log.flink.model.ShardConsumer.run(ShardConsumer.java:166) [ywwlflink-1.0-SNAPSHOT-jar-with-dependencies.jar:?]
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [?:1.8.0_201]
at java.util.concurrent.FutureTask.run(FutureTask.java:266) [?:1.8.0_201]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
这个错误 本地加了字段判断也没有null值 这是咋回事啊?
您好,这个错误的提示信息如下:
org.apache.flink.table.api.ValidationException: Cannot convert null to INT
这个错误表示,您在 Flink 中使用了 int 类型的字段,但是您传入的数据为 null。
您可以使用 isPresent() 方法判断数据是否为 null,如果数据为 null,则不进行类型转换。
以下是一个简单的示例:
int value = null;
if (value != null) {
System.out.println("value: " + value);
}
这个示例会在 value 不为 null 时,打印出 value 的值。
希望这个回答对您有所帮助。
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。
实时计算Flink版是阿里云提供的全托管Serverless Flink云服务,基于 Apache Flink 构建的企业级、高性能实时大数据处理系统。提供全托管版 Flink 集群和引擎,提高作业开发运维效率。