开发者社区> 问答> 正文

尝试了很久,还是无法解析嵌套结构的Json,为什么?

有哪位大佬帮我看下,谢谢

尝试了很久,还是无法解析嵌套结构的Json

Error

Caused by: org.apache.flink.table.api.ValidationException: SQL validation failed. From line 4, column 9 to line 4, column 31: Column 'data.transaction_type' not found in any table at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:130) at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:105) at org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:127) at org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlInsert(SqlToOperationConverter.java:342) at org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:142) at org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:66) at org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:484) at com.shopee.data.ordermart.airpay_v3.AirpayV3Flink$.createPipeline(AirpayV3Flink.scala:133) at com.shopee.data.ordermart.airpay_v3.AirpayV3Flink$.main(AirpayV3Flink.scala:39) at com.shopee.data.ordermart.airpay_v3.AirpayV3Flink.main(AirpayV3Flink.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:321)

嵌套Json 定义的 format 和 schema 如下:

.withFormat(new Json() .jsonSchema( """{type: 'object', | properties: { | database: { | type: 'string' | }, | table: { | type: 'string' | }, | maxwell_ts: { | type: 'integer' | }, | data: { | type: 'object', | properties :{ | reference_id :{ | type: 'string' | }, | transaction_type :{ | type: 'integer' | }, | merchant_id :{ | type: 'integer' | }, | create_time :{ | type: 'integer' | }, | status :{ | type: 'integer' | } | } | } | } | } """.stripMargin.replaceAll("\n", " ") ) ) .withSchema(new Schema() .field("table", STRING()) .field("database", STRING()) .field("data", ROW(FIELD("reference_id",STRING()), FIELD("transaction_type",INT()), FIELD("merchant_id",INT()), FIELD("status",INT()))) //.field("event_time", BIGINT()) // .from("maxwell_ts") //.rowtime(new Rowtime() // //.timestampsFromField("ts" * 1000) // .timestampsFromField("ts") // .watermarksPeriodicBounded(60000) //) )

bsTableEnv.sqlUpdate("""INSERT INTO yyyyy | SELECT table, database | data.reference_id, | data.transaction_type, | data.merchant_id, | data.create_time, | data.status | FROM xxxx""".stripMargin)

来自志愿者整理的flink邮件归档来自志愿者整理的FLINK邮件归档

展开
收起
小阿怪 2021-12-04 19:12:37 568 0
1 条回答
写回答
取消 提交回答
  • 你可以尝试一下直接用DDL来定义source和format。比如你的数据的话,大概的DDL 类似于下面这样子: create table my_source ( database varchar, maxwell_ts bigint, table varchar, data row< transaction_sn varchar, parent_id int, user_id int, amount int, reference_id varchar, status int, transaction_type int, merchant_id int, update_time int, create_time int

    来自志愿者整理的flink邮件归档来自志愿者整理的FLINK邮件归档

    2021-12-04 22:30:27
    赞同 展开评论 打赏
问答排行榜
最热
最新

相关电子书

更多
神龙云服务器产品及技术深度解析 立即下载
弹性创造价值:基于ECS的最佳性价比实践解析 立即下载
又快又稳:阿里云下一代虚拟交换机解析 立即下载

相关镜像