开发者社区> 问答> 正文

pyflink 嵌套使用函数出错怎么解决?

各位大佬好,初学pyflink,有一个问题需要帮忙解决下。

代码为: from pyflink.table import StreamTableEnvironment, DataTypes, EnvironmentSettings,TableConfig,BatchTableEnvironment from pyflink.table.descriptors import Schema, OldCsv, FileSystem from pyflink.table.udf import udf from pyflink.datastream import StreamExecutionEnvironment elements = 'aaa|bbb' env = StreamExecutionEnvironment.get_execution_environment() env.set_parallelism(1) t_env = StreamTableEnvironment.create(env, environment_settings=EnvironmentSettings.new_instance().use_blink_planner().build())

@udf(input_types=[DataTypes.STRING()],      result_type=DataTypes.ARRAY(DataTypes.STRING())) def split(x):     return x.strip().split("|")

t_env.register_function("split", udf(lambda i: i.strip().split("|"), [DataTypes.STRING()], DataTypes.ARRAY(DataTypes.STRING())))

t_env.register_function("split", split) #split拆分后为一个2元数组 @udf(input_types=[DataTypes.ARRAY(DataTypes.STRING()), DataTypes.INT()],      result_type=DataTypes.STRING()) def get(array, index):     return array[index]

@udf(input_types=[DataTypes.ARRAY(DataTypes.STRING())],      result_type=DataTypes.STRING()) def convert(array):     return get(array, 0)

t_env.register_function("convert", convert)

t_env.connect(FileSystem().path('/tmp/output'))
    .with_format(OldCsv()                  .field('b', DataTypes.STRING()))
    .with_schema(Schema()                  .field('b', DataTypes.STRING()))
    .create_temporary_table('mySink')

t_env.from_elements(elements)
     .alias('line')
     .select('split(line)')
     .alias('array')
     .select('convert(array) as b')
     .insert_into('mySink')
     .t_env.execute("convert_job")

报错为: TypeError: 'UserDefinedFunctionWrapper' object is not callable

*来自志愿者整理的flink邮件归档

展开
收起
游客nnqbtnagn7h6s 2021-12-06 20:32:32 960 0
1 条回答
写回答
取消 提交回答
  • Hi, 小学生。 把函数get的标签udf给去掉,它只是普通的Python函数,不要加上@udf,加上之后就不是python的函数了。只有Python的UDF你才要加上@udf

    *来自志愿者整理的flink邮件归档

    2021-12-06 22:17:22
    赞同 展开评论 打赏
问答地址:
问答排行榜
最热
最新

相关电子书

更多
JS 语言在引擎级别的执行过程 立即下载
fibjs 模块重构从回调到协程--陈垒 立即下载
不止代码 立即下载