本文主要介绍如何使用Flink 读取kafka Catalog 源表数据写入下游hologres存储,并演示'json.infer-schema.flatten-nested-columns.enable参数,递归的展开JSON中的嵌套列的效果。
一 准备工作
本文测试使用 阿里云消息队列Kafka版进行演示,依据Flink集群的网络环境(VPC),构建相关上下游数据源,并做好白明单访问策略配置,保证网络链路可连通。
1 创建消息队列Kafka资源
需要注意的是,测试使用的kafka的vpc id和flink的vpc保持一致,不一致的话后续需要网络侧去打通网络,不然会报错网络会连接不上。
创建topic:
创建group:
2 创建 holo catalog
(参考https://help.aliyun.com/document_detail/290056.html)
3 编写作业
创建flinksql作业
CREATE TEMPORARY TABLE kafkaTable ( `offset` INT NOT NULL METADATA, `part` BIGINT NOT NULL METADATA FROM 'partition', PRIMARY KEY (`part`, `offset`) NOT ENFORCED ) WITH ( 'connector' = 'kafka', 'properties.bootstrap.servers' = 'servers:9092', -- 在kafka接入点获取 'topic' = 'dry_test', 'scan.startup.mode' = 'latest-offset', 'properties.group.id' = 'dry_group', 'format' = 'json', 'json.infer-schema.flatten-nested-columns.enable' = 'true' -- 可选,将嵌套列全部展开,本文主要展示该字段的用法。 ); CREATE TABLE IF NOT EXISTS holodrytest.dry_db1.`dry_test3` WITH ( 'connector' = 'hologres', 'createparttable'='true' ) AS TABLE vvp.`default`.kafkaTable;
二 运行演示
1 启动作业
将flinksql作业上线并启动,Flink全托管控制台-作业运维,查看作业,作业正常运行
2 上游数据为非标准的嵌套json
进入kafka控制台,制造生产如下的测试数据
{"appid":"20221020","data":"{\"#account_id\":1111,\"user_name\":\"test111\",\"level\":80},\"debug\":1}"}
到目标hologres表查看表数据,发现data中是和json解析的一样的2个字段:
3 上游数据为标准的嵌套json
在kafka控制台,制造生产如下的测试数据
{"appid":"20221020","data":{"#account_id":1111,"user_name":"test111","level":80},"debug":1}
到目标hologres表查看表数据,发现结果表中把json中嵌套的字段全都解析成了独立的字段存表:
三 总结拓展
本文主要测试 'json.infer-schema.flatten-nested-columns.enable' = 'true'的用法
schema.flatten-nested-columns.enable |
是否递归式地展开JSON中的嵌套列。 |
否 |
Boolean |
参数取值如下:
说明该参数仅在Kafka作为CTAS数据同步的数据源时生效。 |
如上查看结果可以得知,当插入kafka的数据为标准的json格式的时候,是会根据嵌入的数据递归的展开,生成对应的字段。
如果业务侧不想要把所有字段递归展开,也可以使在DDL中CTAS语法中添加计算列 `rowkey` AS JSON_VALUE(`properties`, `$.rowkey`),来指定要展开的列。