我正在使用配置为使用flink-tablejar 的本地Flink 1.6集群(意味着我的程序的jar不包括在内flink-table)。使用以下代码
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.java.BatchTableEnvironment;
import org.apache.flink.table.functions.AggregateFunction;
import org.apache.flink.types.Row;
import java.util.ArrayList;
import java.util.List;
public class JMain {
public static void main(String[] args) throws Exception {
ExecutionEnvironment execEnv = ExecutionEnvironment.getExecutionEnvironment();
BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(execEnv);
tableEnv.registerFunction("enlist", new Enlister());
DataSource<Tuple2<String, String>> source = execEnv.fromElements(
new Tuple2<>("a", "1"),
new Tuple2<>("a", "2"),
new Tuple2<>("b", "3")
);
Table table = tableEnv.fromDataSet(source, "a, b")
.groupBy("a")
.select("enlist(a, b)");
tableEnv.toDataSet(table, Row.class)
.print();
}
public static class Enlister
extends AggregateFunction<List<String>, ArrayList<String>>
implements ResultTypeQueryable<List<String>>
{
@Override
public ArrayList<String> createAccumulator() {
return new ArrayList<>();
}
@Override
public List<String> getValue(ArrayList<String> acc) {
return acc;
}
@SuppressWarnings("unused")
public void accumulate(ArrayList<String> acc, String a, String b) {
acc.add(a + ":" + b);
}
@SuppressWarnings("unused")
public void merge(ArrayList<String> acc, Iterable<ArrayList<String>> it) {
for (ArrayList<String> otherAcc : it) {
acc.addAll(otherAcc);
}
}
@SuppressWarnings("unused")
public void resetAccumulator(ArrayList<String> acc) {
acc.clear();
}
@Override
public TypeInformation<List<String>> getProducedType() {
return TypeInformation.of(new TypeHint<List<String>>(){});
}
}
}
我得到了这个奇怪的例外:
org.apache.flink.table.api.ValidationException: Expression Enlister(List('a, 'b)) failed on input check: Given parameters do not match any signature.
Actual: (java.lang.String, java.lang.String)
Expected: (java.lang.String, java.lang.String)
但是,如果我没有实现ResultTypeQueryable,我得到预期的输出:
Starting execution of program
[b:3]
[a:1, a:2]
Program execution finished
Job with JobID 20497bd3efe44fab0092a05a8eb7d9de has finished.
Job Runtime: 270 ms
Accumulator Results:
The return type of function ... could not be determined automatically,
due to type erasure. You can give type information hints by using the
returns(...) method on the result of the transformation call,
or by letting your function implement the 'ResultTypeQueryable' interface
我能解决这个问题吗?
ResultTypeQueryable在这种情况下,实施是不正确的。例外是误导性的。而是覆盖getResultType()和getAccumulatorType()。这背后的原因是,在为序列化器生成类型信息时,泛型通常会导致问题(由于Java的类型擦除)。
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。