使用Flink 读取kafka Catalog 源表数据展开json写入下游hologres表示例操作

本文涉及的产品
实时数仓Hologres,5000CU*H 100GB 3个月
实时计算 Flink 版,5000CU*H 3个月
简介: 本文主要介绍如何使用Flink 读取kafka Catalog 源表数据写入下游hologres存储,并演示'json.infer-schema.flatten-nested-columns.enable参数,递归的展开JSON中的嵌套列的效果。

本文主要介绍如何使用Flink  读取kafka Catalog 源表数据写入下游hologres存储,并演示'json.infer-schema.flatten-nested-columns.enable参数,递归的展开JSON中的嵌套列的效果。


一 准备工作

本文测试使用 阿里云消息队列Kafka版进行演示,依据Flink集群的网络环境(VPC),构建相关上下游数据源,并做好白明单访问策略配置,保证网络链路可连通。


1 创建消息队列Kafka资源

需要注意的是,测试使用的kafka的vpc id和flink的vpc保持一致,不一致的话后续需要网络侧去打通网络,不然会报错网络会连接不上。


创建topic:


创建group:


2 创建 holo catalog

(参考https://help.aliyun.com/document_detail/290056.html


3 编写作业

 创建flinksql作业

CREATE TEMPORARY TABLE kafkaTable (
  `offset` INT NOT NULL METADATA,
  `part` BIGINT NOT NULL METADATA FROM 'partition',
  PRIMARY KEY (`part`, `offset`) NOT ENFORCED
) WITH (
  'connector' = 'kafka',
  'properties.bootstrap.servers' = 'servers:9092',  -- 在kafka接入点获取
  'topic' = 'dry_test',
  'scan.startup.mode' = 'latest-offset',
  'properties.group.id' = 'dry_group',
  'format' = 'json',
  'json.infer-schema.flatten-nested-columns.enable' = 'true'      -- 可选,将嵌套列全部展开,本文主要展示该字段的用法。
);
CREATE TABLE IF NOT EXISTS holodrytest.dry_db1.`dry_test3`
WITH (
  'connector' = 'hologres',
  'createparttable'='true'
) AS TABLE vvp.`default`.kafkaTable;


二 运行演示

1  启动作业

将flinksql作业上线并启动,Flink全托管控制台-作业运维,查看作业,作业正常运行


2  上游数据为非标准的嵌套json

进入kafka控制台,制造生产如下的测试数据

{"appid":"20221020","data":"{\"#account_id\":1111,\"user_name\":\"test111\",\"level\":80},\"debug\":1}"}

到目标hologres表查看表数据,发现data中是和json解析的一样的2个字段:


3  上游数据为标准的嵌套json

在kafka控制台,制造生产如下的测试数据

{"appid":"20221020","data":{"#account_id":1111,"user_name":"test111","level":80},"debug":1}


到目标hologres表查看表数据,发现结果表中把json中嵌套的字段全都解析成了独立的字段存表:


三 总结拓展

本文主要测试 'json.infer-schema.flatten-nested-columns.enable' = 'true'的用法

schema.flatten-nested-columns.enable

是否递归式地展开JSON中的嵌套列。

Boolean

参数取值如下:

  • true:递归式展开。对于被展开的列,Flink使用索引该值的路径作为名字。例如,对于JSON{"nested": {"col": true}}中的列col,它展开后的名字为nested.col。
  • false(默认值):将嵌套类型当作String处理。

说明该参数仅在Kafka作为CTAS数据同步的数据源时生效。

如上查看结果可以得知,当插入kafka的数据为标准的json格式的时候,是会根据嵌入的数据递归的展开,生成对应的字段。


如果业务侧不想要把所有字段递归展开,也可以使在DDL中CTAS语法中添加计算列 `rowkey` AS JSON_VALUE(`properties`, `$.rowkey`),来指定要展开的列。

相关文章
|
23天前
|
存储 消息中间件 OLAP
基于 Flink+Paimon+Hologres 搭建淘天集团湖仓一体数据链路
本文整理自淘天集团高级数据开发工程师朱奥在Flink Forward Asia 2024的分享,围绕实时数仓优化展开。内容涵盖项目背景、核心策略、解决方案、项目价值及未来计划五部分。通过引入Paimon和Hologres技术,解决当前流批存储不统一、实时数据可见性差等痛点,实现流批一体存储与高效近实时数据加工。项目显著提升了数据时效性和开发运维效率,降低了使用门槛与成本,并规划未来在集团内推广湖仓一体架构,探索更多技术创新场景。
572 2
基于 Flink+Paimon+Hologres 搭建淘天集团湖仓一体数据链路
|
3月前
|
存储 SQL Java
Flink CDC + Hologres高性能数据同步优化实践
本文整理自阿里云高级技术专家胡一博老师在Flink Forward Asia 2024数据集成(二)专场的分享,主要内容包括:1. Hologres介绍:实时数据仓库,支持毫秒级写入和高QPS查询;2. 写入优化:通过改进缓冲队列、连接池和COPY模式提高吞吐量和降低延迟;3. 消费优化:优化离线场景和分区表的消费逻辑,提升性能和资源利用率;4. 未来展望:进一步简化用户操作,支持更多DDL操作及全增量消费。Hologres 3.0全新升级为一体化实时湖仓平台,提供多项新功能并降低使用成本。
381 1
Flink CDC + Hologres高性能数据同步优化实践
|
4月前
|
SQL 消息中间件 Kafka
Flink+Paimon+Hologres,面向未来的一体化实时湖仓平台架构设计
本文介绍了阿里云实时数仓Hologres负责人姜伟华在Flink Forward Asia 2024上的分享,涵盖实时数仓的发展历程、从实时数仓到实时湖仓的演进,以及总结。文章通过三代实时数仓架构的演变,详细解析了Lambda架构、Kafka实时数仓分层+OLAP、Hologres实时数仓分层复用等方案,并探讨了未来从实时数仓到实时湖仓的演进方向。最后,结合实际案例和Demo展示了Hologres + Flink + Paimon在实时湖仓中的应用,帮助用户根据业务需求选择合适的方案。
872 20
Flink+Paimon+Hologres,面向未来的一体化实时湖仓平台架构设计
|
5月前
|
SQL 监控 关系型数据库
用友畅捷通在Flink上构建实时数仓、挑战与最佳实践
本文整理自用友畅捷通数据架构师王龙强在FFA2024上的分享,介绍了公司在Flink上构建实时数仓的经验。内容涵盖业务背景、数仓建设、当前挑战、最佳实践和未来展望。随着数据量增长,公司面临数据库性能瓶颈及实时数据处理需求,通过引入Flink技术逐步解决了数据同步、链路稳定性和表结构差异等问题,并计划在未来进一步优化链路稳定性、探索湖仓一体架构以及结合AI技术推进数据资源高效利用。
568 25
用友畅捷通在Flink上构建实时数仓、挑战与最佳实践
|
3月前
|
SQL 消息中间件 Serverless
​Flink+Paimon+Hologres,面向未来的一体化实时湖仓平台架构设计
​Flink+Paimon+Hologres,面向未来的一体化实时湖仓平台架构设计
100 4
|
5月前
|
存储 消息中间件 OLAP
Hologres+Flink企业级实时数仓核心能力介绍-2024实时数仓Hologres线上公开课03
本次分享由阿里云产品经理骆撷冬(观秋)主讲,主题为“Hologres+Flink企业级实时数仓核心能力”,是2024实时数仓Hologres线上公开课的第三期。课程详细介绍了Hologres与Flink结合搭建的企业级实时数仓的核心能力,包括解决实时数仓分层问题、基于Flink Catalog的Streaming Warehouse实践,并通过典型客户案例展示了其应用效果。
165 10
Hologres+Flink企业级实时数仓核心能力介绍-2024实时数仓Hologres线上公开课03
|
7月前
|
运维 数据挖掘 网络安全
场景实践 | 基于Flink+Hologres搭建GitHub实时数据分析
基于Flink和Hologres构建的实时数仓方案在数据开发运维体验、成本与收益等方面均表现出色。同时,该产品还具有与其他产品联动组合的可能性,能够为企业提供更全面、更智能的数据处理和分析解决方案。
|
7月前
|
SQL 运维 网络安全
【实践】基于Hologres+Flink搭建GitHub实时数据查询
本文介绍了如何利用Flink和Hologres构建GitHub公开事件数据的实时数仓,并对接BI工具实现数据实时分析。流程包括创建VPC、Hologres、OSS、Flink实例,配置Hologres内部表,通过Flink实时写入数据至Hologres,查询实时数据,以及清理资源等步骤。
|
10月前
|
SQL 分布式计算 数据库
畅捷通基于Flink的实时数仓落地实践
本文整理自畅捷通总架构师、阿里云MVP专家郑芸老师在 Flink Forward Asia 2023 中闭门会上的分享。
8400 15
畅捷通基于Flink的实时数仓落地实践
|
12月前
|
存储 SQL 消息中间件
Hologres+Flink企业级实时数仓核心能力介绍
通过Hologres+Flink构建易用、统一的企业级实时数仓。