执行如下代码提示异常,改为旧方法StreamTableEnvironment.registerFunction执行正常, Flink version: 1.11.1
package com.test;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.EnvironmentSettings; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; import org.apache.flink.table.functions.ScalarFunction; import org.apache.flink.table.functions.TableFunction; import org.apache.flink.types.Row;
public class TestUTDFOk { public static class UDTF extends TableFunction {
public void eval(String input) { Row row = new Row(3); row.setField(0, input); row.setField(1, input.length()); row.setField(2, input + 2); collect(row); } }
public static class UDF extends ScalarFunction { public String eval(Row row, Integer index) { try { return String.valueOf(row.getField(index)); } catch (Exception e) { throw e; } } }
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); StreamTableEnvironment tEnv = StreamTableEnvironment.create( env, EnvironmentSettings.newInstance().useBlinkPlanner().build()); // tEnv.registerFunction("udtf", new UDTF()); // tEnv.registerFunction("udf", new UDF()); tEnv.createTemporarySystemFunction("udtf", new UDTF()); tEnv.createTemporarySystemFunction("udf", new UDF());
tEnv.createTemporaryView("source", tEnv.fromValues("a", "b", "c").as("f0")); String sinkDDL = "create table sinkTable (" + "f0 String" + ", x String" + ", y String" + ", z String" + ") with (" + " 'connector.type' = 'filesystem'," + " 'format.type' = 'csv'," + " 'connector.path' = 'F:\workspace\douyu-git\bd-flink\core\logs\a.csv'" + ")"; String udtfCall = "insert into sinkTable SELECT S.f0" + ", udf(f1, 0) as x" + ", udf(f1, 1) as y" + ", udf(f1, 2) as z" + " FROM source as S, LATERAL TABLE(udtf(f0)) as T(f1)";
tEnv.executeSql(sinkDDL); tEnv.executeSql(udtfCall); } }
异常如下: Exception in thread "main" org.apache.flink.table.api.ValidationException: SQL validation failed. An error occurred in the type inference logic of function 'udf'. at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:146) at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:108) at org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:185) at org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlInsert(SqlToOperationConverter.java:525) at org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:202) at org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:78) at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:684) at com.test.TestUTDFOk.main(TestUTDFOk.java:64) Caused by: org.apache.flink.table.api.ValidationException: An error occurred in the type inference logic of function 'udf'. at org.apache.flink.table.planner.catalog.FunctionCatalogOperatorTable.convertToBridgingSqlFunction(FunctionCatalogOperatorTable.java:165) at org.apache.flink.table.planner.catalog.FunctionCatalogOperatorTable.convertToSqlFunction(FunctionCatalogOperatorTable.java:148) at org.apache.flink.table.planner.catalog.FunctionCatalogOperatorTable.lambda$lookupOperatorOverloads$0(FunctionCatalogOperatorTable.java:100) at java.util.Optional.flatMap(Optional.java:241) at org.apache.flink.table.planner.catalog.FunctionCatalogOperatorTable.lookupOperatorOverloads(FunctionCatalogOperatorTable.java:99) at org.apache.calcite.sql.util.ChainedSqlOperatorTable.lookupOperatorOverloads(ChainedSqlOperatorTable.java:73) at org.apache.calcite.sql.validate.SqlValidatorImpl.performUnconditionalRewrites(SqlValidatorImpl.java:1303) at org.apache.calcite.sql.validate.SqlValidatorImpl.performUnconditionalRewrites(SqlValidatorImpl.java:1288) at org.apache.calcite.sql.validate.SqlValidatorImpl.performUnconditionalRewrites(SqlValidatorImpl.java:1318) at org.apache.calcite.sql.validate.SqlValidatorImpl.performUnconditionalRewrites(SqlValidatorImpl.java:1288) at org.apache.calcite.sql.validate.SqlValidatorImpl.validateScopedExpression(SqlValidatorImpl.java:1052) at org.apache.calcite.sql.validate.SqlValidatorImpl.validate(SqlValidatorImpl.java:766) at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:141) ... 7 more Caused by: org.apache.flink.table.api.ValidationException: Could not extract a valid type inference for function class 'com.test.TestUTDFOk$UDF'. Please check for implementation mistakes and/or provide a corresponding hint. at org.apache.flink.table.types.extraction.ExtractionUtils.extractionError(ExtractionUtils.java:313) at org.apache.flink.table.types.extraction.TypeInferenceExtractor.extractTypeInference(TypeInferenceExtractor.java:160) at org.apache.flink.table.types.extraction.TypeInferenceExtractor.forScalarFunction(TypeInferenceExtractor.java:85) at org.apache.flink.table.functions.ScalarFunction.getTypeInference(ScalarFunction.java:144) at org.apache.flink.table.planner.catalog.FunctionCatalogOperatorTable.convertToBridgingSqlFunction(FunctionCatalogOperatorTable.java:162) ... 19 more Caused by: org.apache.flink.table.api.ValidationException: Error in extracting a signature to output mapping. at org.apache.flink.table.types.extraction.ExtractionUtils.extractionError(ExtractionUtils.java:313) at org.apache.flink.table.types.extraction.FunctionMappingExtractor.extractOutputMapping(FunctionMappingExtractor.java:115) at org.apache.flink.table.types.extraction.TypeInferenceExtractor.extractTypeInferenceOrError(TypeInferenceExtractor.java:170) at org.apache.flink.table.types.extraction.TypeInferenceExtractor.extractTypeInference(TypeInferenceExtractor.java:158) ... 22 more Caused by: org.apache.flink.table.api.ValidationException: Unable to extract a type inference from method: public java.lang.String com.test.TestUTDFOk$UDF.eval(org.apache.flink.types.Row,java.lang.Integer) at org.apache.flink.table.types.extraction.ExtractionUtils.extractionError(ExtractionUtils.java:313) at org.apache.flink.table.types.extraction.FunctionMappingExtractor.extractResultMappings(FunctionMappingExtractor.java:172) at org.apache.flink.table.types.extraction.FunctionMappingExtractor.extractOutputMapping(FunctionMappingExtractor.java:110) ... 24 more Caused by: org.apache.flink.table.api.ValidationException: Could not extract a data type from 'class org.apache.flink.types.Row' in parameter 0 of method 'eval' in class 'com.test.TestUTDFOk$UDF'. Please pass the required data type manually or allow RAW types. at org.apache.flink.table.types.extraction.ExtractionUtils.extractionError(ExtractionUtils.java:313) at org.apache.flink.table.types.extraction.DataTypeExtractor.extractDataTypeOrRawWithTemplate(DataTypeExtractor.java:246) at org.apache.flink.table.types.extraction.DataTypeExtractor.extractDataTypeOrRaw(DataTypeExtractor.java:224) at org.apache.flink.table.types.extraction.DataTypeExtractor.extractDataTypeWithClassContext(DataTypeExtractor.java:198) at org.apache.flink.table.types.extraction.DataTypeExtractor.extractFromMethodParameter(DataTypeExtractor.java:147) at org.apache.flink.table.types.extraction.FunctionMappingExtractor.extractDataTypeArgument(FunctionMappingExtractor.java:396) at org.apache.flink.table.types.extraction.FunctionMappingExtractor.lambda$null$10(FunctionMappingExtractor.java:375) at java.util.Optional.orElseGet(Optional.java:267) at org.apache.flink.table.types.extraction.FunctionMappingExtractor.lambda$extractArgumentTemplates$11(FunctionMappingExtractor.java:375) at java.util.stream.IntPipeline$4$1.accept(IntPipeline.java:250) at java.util.stream.Streams$RangeIntSpliterator.forEachRemaining(Streams.java:110) at java.util.Spliterator$OfInt.forEachRemaining(Spliterator.java:693) at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481) at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471) at java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708) at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) at java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:499) at org.apache.flink.table.types.extraction.FunctionMappingExtractor.extractArgumentTemplates(FunctionMappingExtractor.java:376) at org.apache.flink.table.types.extraction.FunctionMappingExtractor.lambda$createParameterSignatureExtraction$9(FunctionMappingExtractor.java:354) at org.apache.flink.table.types.extraction.FunctionMappingExtractor.putExtractedResultMappings(FunctionMappingExtractor.java:314) at org.apache.flink.table.types.extraction.FunctionMappingExtractor.collectMethodMappings(FunctionMappingExtractor.java:252) at org.apache.flink.table.types.extraction.FunctionMappingExtractor.extractResultMappings(FunctionMappingExtractor.java:164) ... 25 more Caused by: org.apache.flink.table.api.ValidationException: Cannot extract a data type from a pure 'org.apache.flink.types.Row' class. Please use annotations to define field names and field types. at org.apache.flink.table.types.extraction.ExtractionUtils.extractionError(ExtractionUtils.java:313) at org.apache.flink.table.types.extraction.ExtractionUtils.extractionError(ExtractionUtils.java:305) at org.apache.flink.table.types.extraction.DataTypeExtractor.checkForCommonErrors(DataTypeExtractor.java:343) at org.apache.flink.table.types.extraction.DataTypeExtractor.extractDataTypeOrError(DataTypeExtractor.java:277) at org.apache.flink.table.types.extraction.DataTypeExtractor.extractDataTypeOrRawWithTemplate(DataTypeExtractor.java:238) ... 45 more
Process finished with exit code 1 *来自志愿者整理的flink邮件归档
1.11中引入的新的udf注册接口,使用的是新的udf类型推断机制,所以会有上面的问题。 你可以参考新的udf类型推导文档[1] 来写一下type hint试试
[1] https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/functions/udfs.html#type-inference*来自志愿者整理的flink
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。