使用Flink 读取kafka Catalog 源表数据展开json写入下游hologres表示例操作

本文涉及的产品
实时数仓Hologres,5000CU*H 100GB 3个月
实时计算 Flink 版,5000CU*H 3个月
简介: 本文主要介绍如何使用Flink 读取kafka Catalog 源表数据写入下游hologres存储,并演示'json.infer-schema.flatten-nested-columns.enable参数,递归的展开JSON中的嵌套列的效果。

本文主要介绍如何使用Flink  读取kafka Catalog 源表数据写入下游hologres存储,并演示'json.infer-schema.flatten-nested-columns.enable参数,递归的展开JSON中的嵌套列的效果。


一 准备工作

本文测试使用 阿里云消息队列Kafka版进行演示,依据Flink集群的网络环境(VPC),构建相关上下游数据源,并做好白明单访问策略配置,保证网络链路可连通。


1 创建消息队列Kafka资源

需要注意的是,测试使用的kafka的vpc id和flink的vpc保持一致,不一致的话后续需要网络侧去打通网络,不然会报错网络会连接不上。


创建topic:


创建group:


2 创建 holo catalog

(参考https://help.aliyun.com/document_detail/290056.html


3 编写作业

 创建flinksql作业

CREATE TEMPORARY TABLE kafkaTable (
  `offset` INT NOT NULL METADATA,
  `part` BIGINT NOT NULL METADATA FROM 'partition',
  PRIMARY KEY (`part`, `offset`) NOT ENFORCED
) WITH (
  'connector' = 'kafka',
  'properties.bootstrap.servers' = 'servers:9092',  -- 在kafka接入点获取
  'topic' = 'dry_test',
  'scan.startup.mode' = 'latest-offset',
  'properties.group.id' = 'dry_group',
  'format' = 'json',
  'json.infer-schema.flatten-nested-columns.enable' = 'true'      -- 可选,将嵌套列全部展开,本文主要展示该字段的用法。
);
CREATE TABLE IF NOT EXISTS holodrytest.dry_db1.`dry_test3`
WITH (
  'connector' = 'hologres',
  'createparttable'='true'
) AS TABLE vvp.`default`.kafkaTable;


二 运行演示

1  启动作业

将flinksql作业上线并启动,Flink全托管控制台-作业运维,查看作业,作业正常运行


2  上游数据为非标准的嵌套json

进入kafka控制台,制造生产如下的测试数据

{"appid":"20221020","data":"{\"#account_id\":1111,\"user_name\":\"test111\",\"level\":80},\"debug\":1}"}

到目标hologres表查看表数据,发现data中是和json解析的一样的2个字段:


3  上游数据为标准的嵌套json

在kafka控制台,制造生产如下的测试数据

{"appid":"20221020","data":{"#account_id":1111,"user_name":"test111","level":80},"debug":1}


到目标hologres表查看表数据,发现结果表中把json中嵌套的字段全都解析成了独立的字段存表:


三 总结拓展

本文主要测试 'json.infer-schema.flatten-nested-columns.enable' = 'true'的用法

schema.flatten-nested-columns.enable

是否递归式地展开JSON中的嵌套列。

Boolean

参数取值如下:

  • true:递归式展开。对于被展开的列,Flink使用索引该值的路径作为名字。例如,对于JSON{"nested": {"col": true}}中的列col,它展开后的名字为nested.col。
  • false(默认值):将嵌套类型当作String处理。

说明该参数仅在Kafka作为CTAS数据同步的数据源时生效。

如上查看结果可以得知,当插入kafka的数据为标准的json格式的时候,是会根据嵌入的数据递归的展开,生成对应的字段。


如果业务侧不想要把所有字段递归展开,也可以使在DDL中CTAS语法中添加计算列 `rowkey` AS JSON_VALUE(`properties`, `$.rowkey`),来指定要展开的列。

目录
打赏
0
3
1
2
321
分享
相关文章
淘宝商品评论API接口,json数据示例参考
淘宝开放平台提供了多种API接口来获取商品评论数据,其中taobao.item.reviews.get是一个常用的接口,用于获取指定商品的评论信息。以下是关于该接口的详细介绍和使用方法:
SPL 处理多层 JSON 数据比 DuckDB 方便多了
esProc SPL 处理多层 JSON 数据比 DuckDB 更便捷,尤其在保留 JSON 层次与复杂计算时优势明显。DuckDB 虽能通过 `read_json_auto()` 将 JSON 解析为表格结构,但面对深层次或复杂运算时,SQL 需频繁使用 UNNEST、子查询等结构,逻辑易变得繁琐。而 SPL 以集合运算方式直接处理子表,代码更简洁直观,无需复杂关联或 Lambda 语法,同时保持 JSON 原始结构。esProc SPL 开源免费,适合复杂 JSON 场景,欢迎至乾学院探索!
基于 Flink 的中国电信星海时空数据多引擎实时改造
本文整理自中国电信集团大数据架构师李新虎老师在Flink Forward Asia 2024的分享,围绕星海时空智能系统展开,涵盖四个核心部分:时空数据现状、实时场景多引擎化、典型应用及未来展望。系统日处理8000亿条数据,具备亚米级定位能力,通过Flink多引擎架构解决数据膨胀与响应时效等问题,优化资源利用并提升计算效率。应用场景包括运动状态识别、个体行为分析和群智感知,未来将推进湖仓一体改造与三维时空服务体系建设,助力数字化转型与智慧城市建设。
442 3
基于 Flink 的中国电信星海时空数据多引擎实时改造
PHP技巧:解析JSON及提取数据
这就是在PHP世界里探索JSON数据的艺术。这场狩猎不仅仅是为了获得数据,而是一种透彻理解数据结构的行动,让数据在你的编码海洋中畅游。通过这次冒险,你已经掌握了打开数据宝箱的钥匙。紧握它,让你在编程世界中随心所欲地航行。
118 67
配置Nginx根据IP地址进行流量限制以及返回JSON格式数据的方案
最后,记得在任何生产环境部署之前,进行透彻测试以确保一切运转如预期。遵循这些战术,守卫你的网络城堡不再是难题。
58 3
如何在 Postman 中上传文件和 JSON 数据
如果你想在 Postman 中同时上传文件和 JSON 数据,本文将带你一步一步地了解整个过程,包括最佳实践和技巧,让你的工作更轻松。
如何在 Postman 中发送 JSON 数据
我们将深入探讨使用 Postman 发送 JSON 数据这一主题,Postman 是一款强大的 API 测试和开发工具。无论您是经验丰富的开发人员还是新手,掌握这项技能对于高效的 API 测试和开发都至关重要。
怎样用 esProc 计算来自 Restful 的多层 json 数据
esProc 是一款强大的数据处理工具,可简化 Java 处理 Restful 接口返回的复杂多层 JSON 数据的难题。通过 esProc,不仅能轻松访问和解析 Restful 数据,还能高效完成复杂计算任务,并可无缝嵌入 Java 应用中作为计算引擎使用。例如,筛选特定分类订单或计算金额,esProc 的脚本简洁直观,远优于传统 SQL 或纯 Java 实现。此外,esProc 支持安全认证(如 Cookie 和 Token)及 JDBC 集成,为开发者提供灵活高效的解决方案。
kafka 的数据是放在磁盘上还是内存上,为什么速度会快?
Kafka的数据存储机制通过将数据同时写入磁盘和内存,确保高吞吐量与持久性。其日志文件按主题和分区组织,使用预写日志(WAL)保证数据持久性,并借助操作系统的页缓存加速读取。Kafka采用顺序I/O、零拷贝技术和批量处理优化性能,支持分区分段以实现并行处理。示例代码展示了如何使用KafkaProducer发送消息。
为什么说Kafka还不是完美的实时数据通道
【10月更文挑战第19天】Kafka 虽然作为数据通道被广泛应用,但在实时性、数据一致性、性能及管理方面存在局限。数据延迟受消息堆积和分区再平衡影响;数据一致性难以达到恰好一次;性能瓶颈在于网络和磁盘I/O;管理复杂性涉及集群配置与版本升级。
318 1
AI助理

你好,我是AI助理

可以解答问题、推荐解决方案等

登录插画

登录以查看您的控制台资源

管理云资源
状态一览
快捷访问