使用Flink 读取kafka Catalog 源表数据展开json写入下游hologres表示例操作

本文涉及的产品
实时数仓Hologres,5000CU*H 100GB 3个月
实时计算 Flink 版,5000CU*H 3个月
简介: 本文主要介绍如何使用Flink 读取kafka Catalog 源表数据写入下游hologres存储,并演示'json.infer-schema.flatten-nested-columns.enable参数,递归的展开JSON中的嵌套列的效果。

本文主要介绍如何使用Flink  读取kafka Catalog 源表数据写入下游hologres存储,并演示'json.infer-schema.flatten-nested-columns.enable参数,递归的展开JSON中的嵌套列的效果。


一 准备工作

本文测试使用 阿里云消息队列Kafka版进行演示,依据Flink集群的网络环境(VPC),构建相关上下游数据源,并做好白明单访问策略配置,保证网络链路可连通。


1 创建消息队列Kafka资源

需要注意的是,测试使用的kafka的vpc id和flink的vpc保持一致,不一致的话后续需要网络侧去打通网络,不然会报错网络会连接不上。


创建topic:


创建group:


2 创建 holo catalog

(参考https://help.aliyun.com/document_detail/290056.html


3 编写作业

 创建flinksql作业

CREATE TEMPORARY TABLE kafkaTable (
  `offset` INT NOT NULL METADATA,
  `part` BIGINT NOT NULL METADATA FROM 'partition',
  PRIMARY KEY (`part`, `offset`) NOT ENFORCED
) WITH (
  'connector' = 'kafka',
  'properties.bootstrap.servers' = 'servers:9092',  -- 在kafka接入点获取
  'topic' = 'dry_test',
  'scan.startup.mode' = 'latest-offset',
  'properties.group.id' = 'dry_group',
  'format' = 'json',
  'json.infer-schema.flatten-nested-columns.enable' = 'true'      -- 可选,将嵌套列全部展开,本文主要展示该字段的用法。
);
CREATE TABLE IF NOT EXISTS holodrytest.dry_db1.`dry_test3`
WITH (
  'connector' = 'hologres',
  'createparttable'='true'
) AS TABLE vvp.`default`.kafkaTable;


二 运行演示

1  启动作业

将flinksql作业上线并启动,Flink全托管控制台-作业运维,查看作业,作业正常运行


2  上游数据为非标准的嵌套json

进入kafka控制台,制造生产如下的测试数据

{"appid":"20221020","data":"{\"#account_id\":1111,\"user_name\":\"test111\",\"level\":80},\"debug\":1}"}

到目标hologres表查看表数据,发现data中是和json解析的一样的2个字段:


3  上游数据为标准的嵌套json

在kafka控制台,制造生产如下的测试数据

{"appid":"20221020","data":{"#account_id":1111,"user_name":"test111","level":80},"debug":1}


到目标hologres表查看表数据,发现结果表中把json中嵌套的字段全都解析成了独立的字段存表:


三 总结拓展

本文主要测试 'json.infer-schema.flatten-nested-columns.enable' = 'true'的用法

schema.flatten-nested-columns.enable

是否递归式地展开JSON中的嵌套列。

Boolean

参数取值如下:

  • true:递归式展开。对于被展开的列,Flink使用索引该值的路径作为名字。例如,对于JSON{"nested": {"col": true}}中的列col,它展开后的名字为nested.col。
  • false(默认值):将嵌套类型当作String处理。

说明该参数仅在Kafka作为CTAS数据同步的数据源时生效。

如上查看结果可以得知,当插入kafka的数据为标准的json格式的时候,是会根据嵌入的数据递归的展开,生成对应的字段。


如果业务侧不想要把所有字段递归展开,也可以使在DDL中CTAS语法中添加计算列 `rowkey` AS JSON_VALUE(`properties`, `$.rowkey`),来指定要展开的列。

目录
打赏
0
3
1
2
320
分享
相关文章
基于 Flink 的中国电信星海时空数据多引擎实时改造
本文整理自中国电信集团大数据架构师李新虎老师在Flink Forward Asia 2024的分享,围绕星海时空智能系统展开,涵盖四个核心部分:时空数据现状、实时场景多引擎化、典型应用及未来展望。系统日处理8000亿条数据,具备亚米级定位能力,通过Flink多引擎架构解决数据膨胀与响应时效等问题,优化资源利用并提升计算效率。应用场景包括运动状态识别、个体行为分析和群智感知,未来将推进湖仓一体改造与三维时空服务体系建设,助力数字化转型与智慧城市建设。
161 1
基于 Flink 的中国电信星海时空数据多引擎实时改造
淘宝商品详情API的调用流程(python请求示例以及json数据示例返回参考)
JSON数据示例:需要提供一个结构化的示例,展示商品详情可能包含的字段,如商品标题、价格、库存、描述、图片链接、卖家信息等。考虑到稳定性,示例应基于淘宝开放平台的标准响应格式。
如何在 Postman 中上传文件和 JSON 数据
如果你想在 Postman 中同时上传文件和 JSON 数据,本文将带你一步一步地了解整个过程,包括最佳实践和技巧,让你的工作更轻松。
如何在 Postman 中发送 JSON 数据
我们将深入探讨使用 Postman 发送 JSON 数据这一主题,Postman 是一款强大的 API 测试和开发工具。无论您是经验丰富的开发人员还是新手,掌握这项技能对于高效的 API 测试和开发都至关重要。
怎样简单计算来自 Restful 的多层 json 数据
esProc 是处理 Restful 接口 JSON 数据的强大工具,能简化多层结构解析与复杂计算。通过 httpfile 函数访问 Restful 数据并转化为序表,支持清晰的多层数据引用与筛选。例如,筛选特定分类且订单金额不低于 200 的订单轻松实现。此外,esProc 可嵌入 Java 应用,通过 JDBC 调用 SPL 脚本,作为应用内计算引擎使用,提升开发效率。还支持身份认证与安全控制,满足企业级需求。
怎样用 esProc 计算来自 Restful 的多层 json 数据
esProc 是一款强大的数据处理工具,可简化 Java 处理 Restful 接口返回的复杂多层 JSON 数据的难题。通过 esProc,不仅能轻松访问和解析 Restful 数据,还能高效完成复杂计算任务,并可无缝嵌入 Java 应用中作为计算引擎使用。例如,筛选特定分类订单或计算金额,esProc 的脚本简洁直观,远优于传统 SQL 或纯 Java 实现。此外,esProc 支持安全认证(如 Cookie 和 Token)及 JDBC 集成,为开发者提供灵活高效的解决方案。
【YashanDB知识库】Flink CDC实时同步Oracle数据到崖山
本文介绍通过Flink CDC实现Oracle数据实时同步至崖山数据库(YashanDB)的方法,支持全量与增量同步,并涵盖新增、修改和删除的DML操作。内容包括环境准备(如JDK、Flink版本等)、Oracle日志归档启用、用户权限配置、增量日志记录设置、元数据迁移、Flink安装与配置、生成Flink SQL文件、Streampark部署,以及创建和启动实时同步任务的具体步骤。适合需要跨数据库实时同步方案的技术人员参考。
【YashanDB知识库】Flink CDC实时同步Oracle数据到崖山
何如定义 JSON Schema 并验证该 json 数据?
本文定义了一个包含 audio 和 tags 两个必需属性的 JSON Schema,用于规范数据结构。其中,audio 是非空字符串,表示音频组件;tags 是非空数组,表示标签组件。通过示例数据和验证工具(如 ajv, NJsonSchema),可确保 JSON 数据符合 Schema 要求,从而保障数据的一致性和正确性。
70 1
淘宝商品详情API接口概述与JSON数据示例
淘宝商品详情API是淘宝开放平台提供的核心接口之一,为开发者提供了获取商品深度信息的能力。以下是技术细节和示例:
【实践】基于Hologres+Flink搭建GitHub实时数据查询
本文介绍了如何利用Flink和Hologres构建GitHub公开事件数据的实时数仓,并对接BI工具实现数据实时分析。流程包括创建VPC、Hologres、OSS、Flink实例,配置Hologres内部表,通过Flink实时写入数据至Hologres,查询实时数据,以及清理资源等步骤。
AI助理

你好,我是AI助理

可以解答问题、推荐解决方案等