请问Flink中是否可以在 json_query 函数的 path 中使用变量,例如 select json_query( col_a, col_b) from table_a?报错 Caused by: org.apache.flink.table.planner.codegen.CodeGenException: Unsupported call: JSON_QUERY(STRING, STRING, SYMBOL NOT NULL, SYMBOL NOT NULL, SYMBOL NOT NULL) If you think this function should be supported, you can create an issue and start a discussion for it. 但是 就可以执行 select json_query( col_a, '$') from table_a path 使用固定的串,sql-client 执行 sql
在 Flink 中,可以使用 json_query 函数来从 JSON 格式的字符串中提取指定的数据。json_query 函数支持使用字符串常量作为 path 参数,但不支持使用变量作为 path 参数。
如果需要在 json_query 函数的 path 参数中使用变量,可以使用字符串拼接或字符串格式化的方式来构造 path 参数。例如,可以使用 CONCAT 函数或 FORMAT 函数来构造 path 参数,例如:
pgsql
Copy
-- 使用 CONCAT 函数构造 path 参数
SELECT json_query(json_str, CONCAT('$.', column_name))
FROM my_table
WHERE ...
-- 使用 FORMAT 函数构造 path 参数
SELECT json_query(json_str, FORMAT('$.%s', column_name))
FROM my_table
WHERE ...
上述代码中,使用 CONCAT 函数或 FORMAT 函数将字符串常量和变量拼接或格式化成 path 参数,从而实现使用变量作为 path 参数的目的。需
根据您提供的错误信息,当前版本的 Flink 不支持在 json_query
函数的路径中使用变量。这也是导致出现错误的原因。
目前 Flink 中的表达式和函数的参数只支持常量值或列引用作为输入,不支持直接在函数调用中传递变量。所以,您在 JSON_QUERY 函数的路径中使用变量是不被支持的。
如果您确实需要动态构建 JSON 查询路径,一种解决方法是通过编写自定义函数来实现此功能。您可以在自定义函数中接收变量作为参数,并根据变量动态构建 JSON 查询路径。然后,在 Flink 作业中使用这个自定义函数来执行 JSON 查询操作。
下面是一个示例,展示了如何创建一个自定义函数来动态构建 JSON 查询路径:
public class DynamicJsonQueryFunction extends ScalarFunction {
public String eval(String input, String path) {
// 根据输入的变量构建 JSON 查询路径逻辑
String jsonPath = "$." + path;
// 执行 JSON 查询操作并返回结果
return JsonPath.read(input, jsonPath);
}
}
在将自定义函数注册到 Flink 表环境后,您可以在 SQL 查询中使用它:
SELECT DynamicJsonQuery(col_a, col_b) FROM table_a;
请注意,此示例仅演示了一种通用的思路,具体实现可能需要根据您的需求进行调整。
一些常用的json_query函数:
json_query(col_a, '$.key'):查询col_a中JSON对象的key为'key'的值。
json_query(col_a, '$.key1.key2'):查询col_a中JSON对象的key1的key2的值。
json_query(col_a, '$.*'):查询col_a中所有键值对的值。
json_query(col_a, '.key': '.key ′ : ′ value'):查询col_a中JSON对象的key为'key'且对应的value为'$value'的值。
开源的部分建议到社区问问https://qr.dingtalk.com/page/yunpan?route=previewDentry&spaceId=1781248759&fileId=95856306982&type=file,此回答整理自钉群“实时计算Flink产品交流群”
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。
实时计算Flink版是阿里云提供的全托管Serverless Flink云服务,基于 Apache Flink 构建的企业级、高性能实时大数据处理系统。提供全托管版 Flink 集群和引擎,提高作业开发运维效率。