开发者社区> 问答> 正文

使用pyflink的Table api时pyflink udf函数报错怎么办?

目前在学习使用pyflink的Table api,请教一个pyflink udf函数报错问题:

@udf(input_types=[DataTypes.STRING()], result_type=DataTypes.STRING())

def drop_fields(message, *fields): import json message = json.loads(message) for field in fields: message.pop(field) return json.dumps(message)

st_env
.form_path("source")
.select("drop_fields(message,'x')")
.insert_into("sink")

message 格式: {“a”:"1","x","2"}

报错参数类型不匹配: Actual:(java.lang.String, java.lang.String) Expected:(org.apache.flink.table.dataformat.BinaryString)

新手入门,请多指教,感谢。

*来自志愿者整理的flink邮件归档

展开
收起
游客nnqbtnagn7h6s 2021-12-06 20:49:58 1458 0
1 条回答
写回答
取消 提交回答
  • The input types should be as following:

    input_types=[DataTypes.STRING(), DataTypes.ARRAY(DataTypes.STRING())]

    *来自志愿者整理的flink邮件归档

    2021-12-06 21:58:23
    赞同 展开评论 打赏
问答排行榜
最热
最新

相关电子书

更多
Spring Boot2.0实战Redis分布式缓存 立即下载
CUDA MATH API 立即下载
API PLAYBOOK 立即下载