各位大佬好,初学pyflink,有一个问题需要帮忙解决下。
代码为: from pyflink.table import StreamTableEnvironment, DataTypes, EnvironmentSettings,TableConfig,BatchTableEnvironment from pyflink.table.descriptors import Schema, OldCsv, FileSystem from pyflink.table.udf import udf from pyflink.datastream import StreamExecutionEnvironment elements = 'aaa|bbb' env = StreamExecutionEnvironment.get_execution_environment() env.set_parallelism(1) t_env = StreamTableEnvironment.create(env, environment_settings=EnvironmentSettings.new_instance().use_blink_planner().build())
@udf(input_types=[DataTypes.STRING()], result_type=DataTypes.ARRAY(DataTypes.STRING())) def split(x): return x.strip().split("|")
t_env.register_function("split", split) #split拆分后为一个2元数组 @udf(input_types=[DataTypes.ARRAY(DataTypes.STRING()), DataTypes.INT()], result_type=DataTypes.STRING()) def get(array, index): return array[index]
@udf(input_types=[DataTypes.ARRAY(DataTypes.STRING())], result_type=DataTypes.STRING()) def convert(array): return get(array, 0)
t_env.register_function("convert", convert)
t_env.connect(FileSystem().path('/tmp/output'))
.with_format(OldCsv() .field('b', DataTypes.STRING()))
.with_schema(Schema() .field('b', DataTypes.STRING()))
.create_temporary_table('mySink')
t_env.from_elements(elements)
.alias('line')
.select('split(line)')
.alias('array')
.select('convert(array) as b')
.insert_into('mySink')
.t_env.execute("convert_job")
报错为: TypeError: 'UserDefinedFunctionWrapper' object is not callable
*来自志愿者整理的flink邮件归档
Hi, 小学生。 把函数get的标签udf给去掉,它只是普通的Python函数,不要加上@udf,加上之后就不是python的函数了。只有Python的UDF你才要加上@udf
*来自志愿者整理的flink邮件归档
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。