开发者社区> 问答> 正文

flink1.10 定义表时,把json数组声明成STRING类型的,查询出来是空

可以通过以下步骤还原车祸现场: kafka topic: test_action kafka message: {"action": [{"actionID": "id001", "actionName": "aaa"}, {"actionID": "id002", "actionName": "bbb"} ] }

代码Problem2.java: package com.flink;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.EnvironmentSettings; import org.apache.flink.table.api.Table; import org.apache.flink.table.api.java.StreamTableEnvironment; import org.apache.flink.types.Row;

/** * * 在自定义tableFunction的时候,如果SQL中传入的是ARRAY类型, * 那么在eval方法接收到的就是Row[], * 问题出在,Row[]中的数据获取不到,里面的元素都是NULL * * 现在思路:就是在定义表的时候,把ARRYA看成STRING, * 现在的问题,就是查询出来,都是空 * * kafka topic: test_action * * kafka message: * {"action": [{"actionID": "id001", "actionName": "aaa"}, {"actionID": "id002", "actionName": "bbb"} ] } */ public class Problem2 {

public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); EnvironmentSettings envSettings = EnvironmentSettings.newInstance() .useBlinkPlanner() .inStreamingMode() .build(); StreamTableEnvironment bsEnv = StreamTableEnvironment.create(env, envSettings); bsEnv.registerFunction("explode3", new ExplodeFunction());

String ddlSource = "CREATE TABLE actionTable3 (\n" + " action STRING\n" + ") WITH (\n" + " 'connector.type' = 'kafka',\n" + " 'connector.version' = '0.11',\n" + " 'connector.topic' = 'test_action',\n" + " 'connector.startup-mode' = 'earliest-offset',\n" + " 'connector.properties.zookeeper.connect' = 'localhost:2181',\n" + " 'connector.properties.bootstrap.servers' = 'localhost:9092',\n" + " 'update-mode' = 'append',\n" + " 'format.type' = 'json',\n" + " 'format.derive-schema' = 'false',\n" + " 'format.json-schema' = '{"type": "object", "properties": {"action": {"type": "string"} } }'" + ")"; System.out.println(ddlSource); bsEnv.sqlUpdate(ddlSource);

Table table = bsEnv.sqlQuery("select * from actionTable3"); // Table table = bsEnv.sqlQuery("select * from actionTable2, LATERAL TABLE(explode3(action)) as T(word)"); table.printSchema(); bsEnv.toAppendStream(table, Row.class) .print();// 输出都是空

bsEnv.execute("ARRAY tableFunction Problem"); } }

*来自志愿者整理的flink邮件归档

展开
收起
小阿矿 2021-12-06 15:19:16 690 0
1 条回答
写回答
取消 提交回答
  • 这是一个已知问题[1],你可以看下这个issue,是否可以解决你的问题?

    [1] https://issues.apache.org/jira/browse/FLINK-18002

    *来自志愿者整理的flink邮件归档

    2021-12-06 17:01:07
    赞同 展开评论 打赏
问答排行榜
最热
最新

相关电子书

更多
Apache Flink 流式应用中状态的数据结构定义升级 立即下载
李劲松|Flink Table Store 典型应用场景 立即下载
《Apache Flink-重新定义计算》PDF下载 立即下载