使用Table AggregateFunction和ResultTypeQueryable时的ValidationException-问答-阿里云开发者社区-阿里云

开发者社区> 问答> 正文
阿里云
为了无法计算的价值
打开APP
阿里云APP内打开

使用Table AggregateFunction和ResultTypeQueryable时的ValidationException

2018-12-10 13:06:17 6402 1

我正在使用配置为使用flink-tablejar 的本地Flink 1.6集群(意味着我的程序的jar不包括在内flink-table)。使用以下代码

import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.java.BatchTableEnvironment;
import org.apache.flink.table.functions.AggregateFunction;
import org.apache.flink.types.Row;

import java.util.ArrayList;
import java.util.List;

public class JMain {

public static void main(String[] args) throws Exception {
    ExecutionEnvironment execEnv = ExecutionEnvironment.getExecutionEnvironment();
    BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(execEnv);

    tableEnv.registerFunction("enlist", new Enlister());

    DataSource<Tuple2<String, String>> source = execEnv.fromElements(
            new Tuple2<>("a", "1"),
            new Tuple2<>("a", "2"),
            new Tuple2<>("b", "3")
    );

    Table table = tableEnv.fromDataSet(source, "a, b")
            .groupBy("a")
            .select("enlist(a, b)");

    tableEnv.toDataSet(table, Row.class)
            .print();
}

public static class Enlister
        extends AggregateFunction<List<String>, ArrayList<String>>
        implements ResultTypeQueryable<List<String>>
{
    @Override
    public ArrayList<String> createAccumulator() {
        return new ArrayList<>();
    }

    @Override
    public List<String> getValue(ArrayList<String> acc) {
        return acc;
    }

    @SuppressWarnings("unused")
    public void accumulate(ArrayList<String> acc, String a, String b) {
        acc.add(a + ":" + b);
    }

    @SuppressWarnings("unused")
    public void merge(ArrayList<String> acc, Iterable<ArrayList<String>> it) {
        for (ArrayList<String> otherAcc : it) {
            acc.addAll(otherAcc);
        }
    }

    @SuppressWarnings("unused")
    public void resetAccumulator(ArrayList<String> acc) {
        acc.clear();
    }

    @Override
    public TypeInformation<List<String>> getProducedType() {
        return TypeInformation.of(new TypeHint<List<String>>(){});
    }
}

}
我得到了这个奇怪的例外:

org.apache.flink.table.api.ValidationException: Expression Enlister(List('a, 'b)) failed on input check: Given parameters do not match any signature.
Actual: (java.lang.String, java.lang.String)
Expected: (java.lang.String, java.lang.String)
但是,如果我没有实现ResultTypeQueryable,我得到预期的输出:

Starting execution of program
[b:3]
[a:1, a:2]
Program execution finished
Job with JobID 20497bd3efe44fab0092a05a8eb7d9de has finished.
Job Runtime: 270 ms
Accumulator Results:

  • 56e0e5a9466b84ae44431c9c4b7aad71 (java.util.ArrayList) [2 elements]
    我的实际用例需要ResultTypeQueryable,否则我得到这个例外:

The return type of function ... could not be determined automatically,
due to type erasure. You can give type information hints by using the
returns(...) method on the result of the transformation call,
or by letting your function implement the 'ResultTypeQueryable' interface
我能解决这个问题吗?

取消 提交回答
全部回答(1)
  • flink小助手
    2019-07-17 23:19:10

    ResultTypeQueryable在这种情况下,实施是不正确的。例外是误导性的。而是覆盖getResultType()和getAccumulatorType()。这背后的原因是,在为序列化器生成类型信息时,泛型通常会导致问题(由于Java的类型擦除)。

    0 0
相关问答

1

回答

.OSSException: The difference between the request

2018-08-24 13:36:11 539浏览量 回答数 1

1

回答

请帮忙查询ogbio.com具体是域名还是空间的问题,无法访问

2019-02-14 18:16:35 329浏览量 回答数 1

1

回答

一个奇怪的问题oracle大字段操作clob.getCharacterStream()访问出现连接关闭错误

2016-03-19 10:22:38 6896浏览量 回答数 1

1

回答

hibernate delete语句报QueryParameterException错误

2016-06-02 13:48:56 2256浏览量 回答数 1

0

回答

android上传图片报NonRepeatableRequestException

2015-03-19 16:50:21 8038浏览量 回答数 0

1

回答

CertPathValidatorException -Java报错

2020-05-29 21:50:55 1541浏览量 回答数 1

1

回答

Jupyter中R语言中的regexpr(pattern,text)方法的作用是什么?

2021-12-01 17:16:05 82浏览量 回答数 1

1

回答

Action算子中的aggregate的作用是什么?

2021-12-06 22:19:08 269浏览量 回答数 1

1

回答

update不支持更新distribution key的原因和解决办法

2022-01-27 18:07:11 5533浏览量 回答数 1

0

回答

请问,调用CreateOrUpdateAlertRule接口报错,提示“请按照文档填写一种指标类型,

2022-08-31 10:50:31 374浏览量 回答数 0
+关注
flink小助手
flink小助手会定期更新直播回顾等资料和文章干货,还整合了大家在钉群提出的有关flink的问题及回答。
文章
问答
问答排行榜
最热
最新
相关电子书
更多
低代码开发师(初级)实战教程
立即下载
阿里巴巴DevOps 最佳实践手册
立即下载
冬季实战营第三期:MySQL数据库进阶实战
立即下载