开发者社区> 问答> 正文

flink1.10在通过TableFunction实现行转列时,Row一直是空

我的业务场景,是想实现一个行转列的效果。然后通过自定义tableFunction来实现。 在自定义tableFunction的时候,如果SQL中传入的是ARRAY类型, 那么在eval方法接收到的就是Row[], 问题出在,Row[]中的数据获取不到,里面的元素都是NULL

通过下面的步骤和代码可还原车祸场景: kafka topic: test_action kafka message: {"action": [{"actionID": "id001", "actionName": "aaa"}, {"actionID": "id002", "actionName": "bbb"} ] }

代码1:Problem.java package com.flink;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.EnvironmentSettings; import org.apache.flink.table.api.Table; import org.apache.flink.table.api.java.StreamTableEnvironment; import org.apache.flink.types.Row;

/** * * 在自定义tableFunction的时候,如果SQL中传入的是ARRAY类型, * 那么在eval方法接收到的就是Row[], * 问题出在,Row[]中的数据获取不到,里面的元素都是NULL * * kafka topic: test_action * * kafka message: * {"action": [{"actionID": "id001", "actionName": "aaa"}, {"actionID": "id002", "actionName": "bbb"} ] } */ public class Problem {

public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); EnvironmentSettings envSettings = EnvironmentSettings.newInstance() .useBlinkPlanner() .inStreamingMode() .build(); StreamTableEnvironment bsEnv = StreamTableEnvironment.create(env, envSettings); bsEnv.registerFunction("explode2", new ExplodeFunction());

String ddlSource = "CREATE TABLE actionTable (\n" + " action ARRAY<\n" + " ROW<" + " actionID STRING,\n" + " actionName STRING\n" + " >\n" + " >\n" + ") WITH (\n" + " 'connector.type' = 'kafka',\n" + " 'connector.version' = '0.11',\n" + " 'connector.topic' = 'test_action',\n" + " 'connector.startup-mode' = 'earliest-offset',\n" + " 'connector.properties.zookeeper.connect' = 'localhost:2181',\n" + " 'connector.properties.bootstrap.servers' = 'localhost:9092',\n" + " 'update-mode' = 'append',\n" + " 'format.type' = 'json'\n" + ")"; bsEnv.sqlUpdate(ddlSource);

// Table table = bsEnv.sqlQuery("select action from actionTable"); Table table = bsEnv.sqlQuery("select * from actionTable, LATERAL TABLE(explode2(action)) as T(word)"); table.printSchema(); bsEnv.toAppendStream(table, Row.class) .print("==tb==");

bsEnv.execute("ARRAY tableFunction Problem"); } }

代码2:ExplodeFunction.java package com.flink;

import org.apache.flink.table.functions.TableFunction; import org.apache.flink.types.Row;

import java.util.ArrayList; import java.util.Arrays;

public class ExplodeFunction extends TableFunction {

public void eval(Row[] values) { System.out.println(values.length); if (values.length > 0) { for (Row row : values) { if (row != null) {// 这里debug出来的row总是空 ArrayList list = new ArrayList<>(); for (int i = 0; i < row.getArity(); i++) { Object field = row.getField(i); list.add(field); }

collector.collect(Row.of(Arrays.toString(list.toArray()))); } } } } }

*来自志愿者整理的flink邮件归档

展开
收起
小阿矿 2021-12-06 15:17:05 862 0
1 条回答
写回答
取消 提交回答
  • 当前还不支持 Row[] 作为参数。目前有一个 issue 在解决这个问题,可以关注下。 https://issues.apache.org/jira/browse/FLINK-17855

    *来自志愿者整理的flink邮件归档

    2021-12-06 16:59:22
    赞同 展开评论 打赏
问答排行榜
最热
最新

相关电子书

更多
Flink SQL in 2020 立即下载
Flink Streaming SQL 2018 立即下载
基于 Flink SQL + Paimon 构建流式湖仓新方 立即下载