开发者社区> 问答> 正文

flink1.11 TableEnvironment 不支持注册 Aggregate Functio

flink1.11 在TableEnvironment环境中注册并使用自定义的Aggregate Function时,报出以下错误。下面贴有代码(若是在StreamTableEnvironment 注册和使用则是正常,这应该说明自定义的函数是ok的)

org.apache.flink.table.api.TableException: Aggregate functions are not updated to the new type system yet. at org.apache.flink.table.functions.AggregateFunction.getTypeInference(AggregateFunction.java:152) at org.apache.flink.table.expressions.resolver.rules.ResolveCallByArgumentsRule$ResolvingCallVisitor.getOptionalTypeInference(ResolveCallByArgumentsRule.java:183) at org.apache.flink.table.expressions.resolver.rules.ResolveCallByArgumentsRule$ResolvingCallVisitor.visit(ResolveCallByArgumentsRule.java:112) at org.apache.flink.table.expressions.resolver.rules.ResolveCallByArgumentsRule$ResolvingCallVisitor.visit(ResolveCallByArgumentsRule.java:89) at org.apache.flink.table.expressions.ApiExpressionVisitor.visit(ApiExpressionVisitor.java:39) at org.apache.flink.table.expressions.UnresolvedCallExpression.accept(UnresolvedCallExpression.java:132) at org.apache.flink.table.expressions.resolver.rules.ResolveCallByArgumentsRule.lambda$apply$0(ResolveCallByArgumentsRule.java:83) at java.util.stream.ReferencePipeline$7$1.accept(ReferencePipeline.java:267) at java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1382) at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482) at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472) at java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708) at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) at java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:499) at org.apache.flink.table.expressions.resolver.rules.ResolveCallByArgumentsRule.apply(ResolveCallByArgumentsRule.java:84) at org.apache.flink.table.expressions.resolver.ExpressionResolver.lambda$null$1(ExpressionResolver.java:211) at java.util.function.Function.lambda$andThen$1(Function.java:88) at org.apache.flink.table.expressions.resolver.ExpressionResolver.resolve(ExpressionResolver.java:178) at org.apache.flink.table.operations.utils.OperationTreeBuilder$ExtractAliasAndAggregate.getAggregate(OperationTreeBuilder.java:651) at org.apache.flink.table.operations.utils.OperationTreeBuilder$ExtractAliasAndAggregate.visit(OperationTreeBuilder.java:616) at org.apache.flink.table.operations.utils.OperationTreeBuilder$ExtractAliasAndAggregate.visit(OperationTreeBuilder.java:598) at org.apache.flink.table.expressions.ApiExpressionVisitor.visit(ApiExpressionVisitor.java:39) at org.apache.flink.table.expressions.UnresolvedCallExpression.accept(UnresolvedCallExpression.java:132) at org.apache.flink.table.operations.utils.OperationTreeBuilder.aggregate(OperationTreeBuilder.java:511) at org.apache.flink.table.api.internal.TableImpl$AggregatedTableImpl.select(TableImpl.java:685) at com.ideacom.flink.demo.example.BatchTableExample.demo(BatchTableExample.java:48) at com.ideacom.flink.demo.TableSqlJob.main(TableSqlJob.java:36)

// 以下是代码 // main EnvironmentSettings envSettings = EnvironmentSettings.newInstance() .useBlinkPlanner() .inBatchMode() .build();

TableEnvironment tEnv = TableEnvironment.create(envSettings);

// 注册source table, jdbc table source tEnv.executeSql("CREATE TABLE wx_event_log (....) with ('connect.type'='jdbc'),....");

// 注册sink table,csv table sink tEnv.executeSql("CREATE TABLE wx_data_statistics (....) with ('connect.type'='filesystem','format.type'='csv',.....)");

// 注册agg function tEnv.createTemporarySystemFunction("firSendMsgFunc",new FirstSendMsgFunc());

Table table2 = tEnv.sqlQuery("select from_user,create_time from wx_event_log where msg_type='text' and create_time between '2020-03-20' and '2020-03-21'");

table2.groupBy($("from_user"))

.aggregate(call("firSendMsgFunc",$("create_time")).as("first_send_msg_today")) .select($("from_user"),$("first_send_msg_today")) .executeInsert("wx_data_statistics");

// 自定义agg function类 public class FirstSendMsgFunc extends AggregateFunction<LocalDateTime,CountDTO> {

public void accumulate(CountDTO acc, LocalDateTime createTime) { if (acc.getDateTime() == null) { acc.setDateTime(createTime); } else if (acc.getDateTime().isAfter(createTime)) { acc.setDateTime(createTime); } }

@Override public LocalDateTime getValue(CountDTO acc) { return acc.getDateTime(); }

@Override public CountDTO createAccumulator() { return new CountDTO(); } }

// accumulate pojo 类 public class CountDTO implements Serializable {

private Integer count;

private LocalDateTime dateTime;

public Integer getCount() { return count; }

public void setCount(Integer count) { this.count = count; }

public LocalDateTime getDateTime() { return dateTime; }

public void setDateTime(LocalDateTime dateTime) { this.dateTime = dateTime; } }

*来自志愿者整理的flink邮件归档

展开
收起
小阿矿 2021-12-03 16:34:29 628 0
1 条回答
写回答
取消 提交回答
  • 1.11 版本上 TableEnvironment#createTemporarySystemFunction 接口暂时还不支持 AggregateFunction。 你说 StreamTableEnvironment 可以,我估计你用的是 StreamTableEnvironment#registerFunction, 这个是支持 AggregateFunction 的。

    *来自志愿者整理的flink邮件归档

    2021-12-06 11:22:26
    赞同 展开评论 打赏
问答排行榜
最热
最新

相关电子书

更多
Flink CDC Meetup PPT - 龚中强 立即下载
Flink CDC Meetup PPT - 王赫 立即下载
Flink CDC Meetup PPT - 覃立辉 立即下载