开发者社区> 问答> 正文

flink1.11.0 sql自定义UDAF包含复合类型时报Incompatible types

本人使用flink版本为1.11.0,自定义udaf如下:

public class GetContinuousListenDuration extends AggregateFunction<Row, ContinuousListenDuration> {

private static final DateTimeFormatter dateTimeFormatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm");

@Override

@DataTypeHint("ROW<startTime TIMESTAMP(3), duration BIGINT>")

public Row getValue(ContinuousListenDuration acc) {

return Row.of(acc.getStartTime(), acc.getDuration());

}

@Override

public ContinuousListenDuration createAccumulator() {

return new ContinuousListenDuration();

}

public void accumulate(ContinuousListenDuration acc, @DataTypeHint("TIMESTAMP(3)") LocalDateTime dt, Boolean isListening) {

// 此处省略逻辑

}

}

聚合时以Timestamp(3)、Boolean作为参数,getValue返回类型是ROW<startTime TIMESTAMP(3), duration BIGINT>,函数名定义为get_continuous_listen_duration,调用该函数的sql如下:

insert into

report.result

select

id,

city_code,

get_continuous_listen_duration(

dt,

(order_no is null)

or (trim(order_no) = '')

).startTime as start_time,

get_continuous_listen_duration(

dt,

(order_no is null)

or (trim(order_no) = '')

).duration as duration

from

(

select

o.id,

o.dt,

o.order_no,

r.city_code

from

(

select

req [1] as id,

dt,

proctime,

req [2] as order_no

from

tmp_v

where

extra [1] is null

or extra [1] <> 'false'

) o

JOIN dim.right FOR SYSTEM_TIME AS OF o.proctime AS r ON r.id = o.id

) a

group by

id,

city_code

having

get_continuous_listen_duration(

dt,

(order_no is null)

or (trim(order_no) = '')

).duration >= 2

运行时发生如下异常:

Caused by: org.apache.calcite.sql.validate.SqlValidatorException: Incompatible types

at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) ~[?:1.8.0_171]

at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) ~[?:1.8.0_171]

at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) ~[?:1.8.0_171]

at java.lang.reflect.Constructor.newInstance(Constructor.java:423) ~[?:1.8.0_171]

at org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:457) ~[flink-table_2.11-1.11.0.jar:1.11.0]

at org.apache.calcite.runtime.Resources$ExInst.ex(Resources.java:550) ~[flink-table_2.11-1.11.0.jar:1.11.0]

at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:839) ~[flink-table_2.11-1.11.0.jar:1.11.0]

at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:824) ~[flink-table_2.11-1.11.0.jar:1.11.0]

at org.apache.calcite.sql.fun.SqlDotOperator.deriveType(SqlDotOperator.java:101) ~[flink-table_2.11-1.11.0.jar:1.11.0]

at org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5858) ~[flink-table_2.11-1.11.0.jar:1.11.0]

at org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5845) ~[flink-table_2.11-1.11.0.jar:1.11.0]

at org.apache.calcite.sql.SqlCall.accept(SqlCall.java:139) ~[flink-table_2.11-1.11.0.jar:1.11.0]

at org.apache.calcite.sql.validate.SqlValidatorImpl.deriveTypeImpl(SqlValidatorImpl.java:1800) ~[flink-table_2.11-1.11.0.jar:1.11.0]

at org.apache.calcite.sql.validate.SqlValidatorImpl.deriveType(SqlValidatorImpl.java:1785) ~[flink-table_2.11-1.11.0.jar:1.11.0]

at org.apache.calcite.sql.type.InferTypes.lambda$static$0(InferTypes.java:46) ~[flink-table_2.11-1.11.0.jar:1.11.0]

at org.apache.calcite.sql.validate.SqlValidatorImpl.inferUnknownTypes(SqlValidatorImpl.java:1980) ~[flink-table_2.11-1.11.0.jar:1.11.0]

at org.apache.calcite.sql.validate.SqlValidatorImpl.validateHavingClause(SqlValidatorImpl.java:4214) ~[flink-table_2.11-1.11.0.jar:1.11.0]

at org.apache.calcite.sql.validate.SqlValidatorImpl.validateSelect(SqlValidatorImpl.java:3515) ~[flink-table_2.11-1.11.0.jar:1.11.0]

at org.apache.calcite.sql.validate.SelectNamespace.validateImpl(SelectNamespace.java:60) ~[flink-table_2.11-1.11.0.jar:1.11.0]

at org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84) ~[flink-table_2.11-1.11.0.jar:1.11.0]

at org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:1110) ~[flink-table_2.11-1.11.0.jar:1.11.0]

at org.apache.calcite.sql.validate.SqlValidatorImpl.validateQuery(SqlValidatorImpl.java:1084) ~[flink-table_2.11-1.11.0.jar:1.11.0]

at org.apache.calcite.sql.SqlSelect.validate(SqlSelect.java:232) ~[flink-table_2.11-1.11.0.jar:1.11.0]

at org.apache.calcite.sql.validate.SqlValidatorImpl.validateScopedExpression(SqlValidatorImpl.java:1059) ~[flink-table_2.11-1.11.0.jar:1.11.0]

at org.apache.calcite.sql.validate.SqlValidatorImpl.validate(SqlValidatorImpl.java:766) ~[flink-table_2.11-1.11.0.jar:1.11.0]

at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:141) ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]

at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:108) ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]

at org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:187) ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]

at org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlInsert(SqlToOperationConverter.java:527) ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]

at org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:204) ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]

at org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:78) ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]

at org.apache.flink.table.api.internal.StatementSetImpl.addInsertSql(StatementSetImpl.java:52) ~[flink-table_2.11-1.11.0.jar:1.11.0]

at com.ververica.flink.table.gateway.operation.MultiSqlOperation.lambda$executeInternal$0(MultiSqlOperation.java:119) ~[flink-sql-gateway-0.1-SNAPSHOT.jar:?]

at com.ververica.flink.table.gateway.context.ExecutionContext.wrapClassLoader(ExecutionContext.java:223) ~[flink-sql-gateway-0.1-SNAPSHOT.jar:?]

at com.ververica.flink.table.gateway.operation.MultiSqlOperation.executeInternal(MultiSqlOperation.java:109) ~[flink-sql-gateway-0.1-SNAPSHOT.jar:?]

我的问题是这样定义udf有什么问题吗?*来自志愿者整理的flink邮件归档

展开
收起
说了是一只鲳鱼 2021-12-07 11:18:25 1223 0
1 条回答
写回答
取消 提交回答
  • 从这行报错堆栈来看:at org.apache.calcite.sql.fun.SqlDotOperator.deriveType(SqlDotOperator.java:101) , 应该是在对 row.startTime 或者 row. duration validate 阶段,推断类型时识别出不兼容类型,可以检测下用法有没有错误。*来自志愿者整理的flink邮件归档

    2021-12-07 11:23:56
    赞同 展开评论 打赏
问答排行榜
最热
最新

相关电子书

更多
Flink CDC Meetup PPT - 龚中强 立即下载
Flink CDC Meetup PPT - 王赫 立即下载
Flink CDC Meetup PPT - 覃立辉 立即下载