使用了1.12.0的flink,3.7的python。自定义了一个pandas的UDF,定义大概如下
@udf(input_types=[DataTypes.STRING(), DataTypes.FLOAT()], result_type=DataTypes.ROW( [DataTypes.FIELD('buyQtl', DataTypes.BIGINT()), DataTypes.FIELD('aveBuy', DataTypes.INT())), func_type='pandas') def orderCalc(code, amount):
df = pd.DataFrame({'code': code, 'amount': amount})
return (output['buyQtl'], output['aveBuy'])
定义了csv的sink如下
create table csvSink ( buyQtl BIGINT, aveBuy INT ) with ( 'connector.type' = 'filesystem', 'format.type' = 'csv', 'connector.path' = 'e:/output' )
然后进行如下的操作:
result_table = t_env.sql_query(""" select orderCalc(code, amount) from some_source
group by TUMBLE(eventTime, INTERVAL '1' SECOND), code, amount """) result_table.execute_insert("csvSink")
在执行程序的时候提示没法入库
py4j.protocol.Py4JJavaError: An error occurred while calling o98.executeInsert.
: org.apache.flink.table.api.ValidationException: Column types of query result and sink for registered table 'default_catalog.default_database.csvSink' do not match.
Cause: Different number of columns.
Query schema: [EXPR$0: ROW<buyQtl
BIGINT, aveBuy
INT >]
Sink schema: [buyQtl: BIGINT, aveBuy: INT]
at org.apache.flink.table.planner.sinks.DynamicSinkUtils.createSchemaMismatchEx ception(DynamicSinkUtils.java:304)
at org.apache.flink.table.planner.sinks.DynamicSinkUtils.validateSchemaAndApply ImplicitCast(DynamicSinkUtils.java:134)
是UDF的输出结构不对吗,还是需要调整sink table的结构?*来自志愿者整理的flink邮件归档
是这样的,这个Pandas的输出类型是一列Row, 而你现在的sink需要接收的是一列BIGINT和一列INT。
你可以尝试将sql语句改成以下形式:
select orderCalc(code, amount).get(0), orderCalc(code, amount).get(1) from some_source
group by TUMBLE(eventTime, INTERVAL '1' SECOND), code, amount
此外你这里实际是Pandas UDAF的用法吧,如果是的话则需要把”@udf”换成”@udaf”*来自志愿者整理的FLINK邮件归档
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。