使用Flink 读取kafka Catalog 源表数据展开json写入下游hologres表示例操作

本文涉及的产品
实时数仓Hologres,5000CU*H 100GB 3个月
实时计算 Flink 版,5000CU*H 3个月
简介: 本文主要介绍如何使用Flink 读取kafka Catalog 源表数据写入下游hologres存储,并演示'json.infer-schema.flatten-nested-columns.enable参数,递归的展开JSON中的嵌套列的效果。

本文主要介绍如何使用Flink  读取kafka Catalog 源表数据写入下游hologres存储,并演示'json.infer-schema.flatten-nested-columns.enable参数,递归的展开JSON中的嵌套列的效果。


一 准备工作

本文测试使用 阿里云消息队列Kafka版进行演示,依据Flink集群的网络环境(VPC),构建相关上下游数据源,并做好白明单访问策略配置,保证网络链路可连通。


1 创建消息队列Kafka资源

需要注意的是,测试使用的kafka的vpc id和flink的vpc保持一致,不一致的话后续需要网络侧去打通网络,不然会报错网络会连接不上。


创建topic:


创建group:


2 创建 holo catalog

(参考https://help.aliyun.com/document_detail/290056.html


3 编写作业

 创建flinksql作业

CREATE TEMPORARY TABLE kafkaTable (
  `offset` INT NOT NULL METADATA,
  `part` BIGINT NOT NULL METADATA FROM 'partition',
  PRIMARY KEY (`part`, `offset`) NOT ENFORCED
) WITH (
  'connector' = 'kafka',
  'properties.bootstrap.servers' = 'servers:9092',  -- 在kafka接入点获取
  'topic' = 'dry_test',
  'scan.startup.mode' = 'latest-offset',
  'properties.group.id' = 'dry_group',
  'format' = 'json',
  'json.infer-schema.flatten-nested-columns.enable' = 'true'      -- 可选,将嵌套列全部展开,本文主要展示该字段的用法。
);
CREATE TABLE IF NOT EXISTS holodrytest.dry_db1.`dry_test3`
WITH (
  'connector' = 'hologres',
  'createparttable'='true'
) AS TABLE vvp.`default`.kafkaTable;


二 运行演示

1  启动作业

将flinksql作业上线并启动,Flink全托管控制台-作业运维,查看作业,作业正常运行


2  上游数据为非标准的嵌套json

进入kafka控制台,制造生产如下的测试数据

{"appid":"20221020","data":"{\"#account_id\":1111,\"user_name\":\"test111\",\"level\":80},\"debug\":1}"}

到目标hologres表查看表数据,发现data中是和json解析的一样的2个字段:


3  上游数据为标准的嵌套json

在kafka控制台,制造生产如下的测试数据

{"appid":"20221020","data":{"#account_id":1111,"user_name":"test111","level":80},"debug":1}


到目标hologres表查看表数据,发现结果表中把json中嵌套的字段全都解析成了独立的字段存表:


三 总结拓展

本文主要测试 'json.infer-schema.flatten-nested-columns.enable' = 'true'的用法

schema.flatten-nested-columns.enable

是否递归式地展开JSON中的嵌套列。

Boolean

参数取值如下:

  • true:递归式展开。对于被展开的列,Flink使用索引该值的路径作为名字。例如,对于JSON{"nested": {"col": true}}中的列col,它展开后的名字为nested.col。
  • false(默认值):将嵌套类型当作String处理。

说明该参数仅在Kafka作为CTAS数据同步的数据源时生效。

如上查看结果可以得知,当插入kafka的数据为标准的json格式的时候,是会根据嵌入的数据递归的展开,生成对应的字段。


如果业务侧不想要把所有字段递归展开,也可以使在DDL中CTAS语法中添加计算列 `rowkey` AS JSON_VALUE(`properties`, `$.rowkey`),来指定要展开的列。

相关文章
|
1月前
|
消息中间件 关系型数据库 MySQL
大数据-117 - Flink DataStream Sink 案例:写出到MySQL、写出到Kafka
大数据-117 - Flink DataStream Sink 案例:写出到MySQL、写出到Kafka
135 0
|
1月前
|
消息中间件 Java Kafka
Flink-07 Flink Java 3分钟上手 滚动窗口 事件驱动 Kafka TumblingWindow GlobalWindow CountWindow
Flink-07 Flink Java 3分钟上手 滚动窗口 事件驱动 Kafka TumblingWindow GlobalWindow CountWindow
36 7
|
1月前
|
消息中间件 NoSQL Kafka
Flink-10 Flink Java 3分钟上手 Docker容器化部署 JobManager TaskManager Kafka Redis Dockerfile docker-compose
Flink-10 Flink Java 3分钟上手 Docker容器化部署 JobManager TaskManager Kafka Redis Dockerfile docker-compose
39 4
|
1月前
|
消息中间件 NoSQL Kafka
大数据-116 - Flink DataStream Sink 原理、概念、常见Sink类型 配置与使用 附带案例1:消费Kafka写到Redis
大数据-116 - Flink DataStream Sink 原理、概念、常见Sink类型 配置与使用 附带案例1:消费Kafka写到Redis
130 0
|
1月前
|
消息中间件 资源调度 大数据
大数据-112 Flink DataStreamAPI 程序输入源 DataSource 基于文件、集合、Kafka连接器
大数据-112 Flink DataStreamAPI 程序输入源 DataSource 基于文件、集合、Kafka连接器
41 0
|
6月前
|
SQL 关系型数据库 MySQL
实时计算 Flink版产品使用合集之如何将Hologres字段转换为小写
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStreamAPI、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
3月前
|
SQL 分布式计算 数据库
畅捷通基于Flink的实时数仓落地实践
本文整理自畅捷通总架构师、阿里云MVP专家郑芸老师在 Flink Forward Asia 2023 中闭门会上的分享。
8314 15
畅捷通基于Flink的实时数仓落地实践
|
3月前
|
消息中间件 关系型数据库 MySQL
实时计算 Flink版产品使用问题之使用CTAS同步MySQL到Hologres时出现的时区差异,该如何解决
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
3月前
|
SQL 关系型数据库 MySQL
实时数仓 Hologres操作报错合集之Flink CTAS Source(Mysql) 表字段从可空改为非空的原因是什么
实时数仓Hologres是阿里云推出的一款高性能、实时分析的数据库服务,专为大数据分析和复杂查询场景设计。使用Hologres,企业能够打破传统数据仓库的延迟瓶颈,实现数据到决策的无缝衔接,加速业务创新和响应速度。以下是Hologres产品的一些典型使用场景合集。
|
3月前
|
SQL 关系型数据库 测试技术
实时数仓 Hologres操作报错合集之执行Flink的sink操作时出现报错,是什么原因
实时数仓Hologres是阿里云推出的一款高性能、实时分析的数据库服务,专为大数据分析和复杂查询场景设计。使用Hologres,企业能够打破传统数据仓库的延迟瓶颈,实现数据到决策的无缝衔接,加速业务创新和响应速度。以下是Hologres产品的一些典型使用场景合集。

热门文章

最新文章