阿里云全托管flink-vvp平台hudi connector实践(基于emr集群oss-hdfs存储)

本文涉及的产品
云数据库 RDS MySQL,集群系列 2核4GB
推荐场景:
搭建个人博客
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
实时计算 Flink 版,5000CU*H 3个月
简介: 阿里云全托管flink-vvp平台hudi sink connector实践,本文数据湖hudi基于阿里云E-MapReduce产品,以云对象存储oss-hdfs作为存储

1. 上游数据准备

上游数据以mysql为例作为hudi入湖的上游应用表,详细的建表、插入及更新语句如下

droptable if exists `sunyf_db`.`flink_test_02_hudi`;createtable if not EXISTS `sunyf_db`.`flink_test_02_hudi`(  id BIGINTnotnull PRIMARY KEY 
,`name` varchar(20),datime TIMESTAMP,price FLOAT);INSERTINTO `sunyf_db`.`flink_test_02_hudi` values(1,'zhao', CURRENT_TIMESTAMP,20);INSERTINTO `sunyf_db`.`flink_test_02_hudi` values(2,'qian', CURRENT_TIMESTAMP,30);INSERTINTO `sunyf_db`.`flink_test_02_hudi` values(3,'sun', CURRENT_TIMESTAMP,40);INSERTINTO `sunyf_db`.`flink_test_02_hudi` values(4,'li', CURRENT_TIMESTAMP,50);INSERTINTO `sunyf_db`.`flink_test_02_hudi` values(5,'zhou', CURRENT_TIMESTAMP,60);select*from `sunyf_db`.`flink_test_02_hudi`
update `sunyf_db`.`flink_test_02_hudi` set `name` ='sunyf35'where id =3;


2. Flink-SQL任务

flink-hudi-connector参数参考:https://hudi.apache.org/docs/basic_configurations#Flink-Options

--********************************************************************---- Author:         sunyf-- Created Time:   2023-07-03 17:52:47-- Description:    Write your description here-- Hints:          You can use SET statements to modify the configuration--********************************************************************--CREATE TEMPORARY TABLE mysqlcdc_source
(    id            bigint,`name`      STRING
,datime       TIMESTAMP,price      float,PRIMARY KEY (id)NOT ENFORCED
)WITH ('connector'='mysql','hostname'='rm-xxx.mysql.rds.aliyuncs.com','port'='3306','username'='用户名','password'='密码','database-name'='sunyf_db','table-name'='flink_test_02_hudi');CREATE TEMPORARY TABLE hudi_sink
(    id       bigintNOTNULL,`name`  STRING
,`price` float,PRIMARY KEY (id)NOT ENFORCED
-- 或者直接在id字段后面写 id int not null primary key 也可以-- 或者 指定如下参数 hoodie.datasource.write.recordkey.field)WITH ('connector'='hudi','oss.endpoint'='oss-cn-xx-internal.aliyuncs.com','accessKeyId'='用户名','accessKeySecret'='密码','path'='oss://sunyf-oss-emr02-hudi/emr_hudi_cluster0630.db/flink_test_03_hudi_cow','table.type'='COPY_ON_WRITE'--MERGE_ON_READ,'write.operation'='insert'-- ,'precombine.field' = 'ts'-- ***************-- 下面的元数据相关参数如果不指定的话--hive_sync.enable=false,仅落地文件,需要参考(标题3)中的两种方案建表,'hive_sync.db'='emr_hudi_cluster0630','hive_sync.table'='flink_test_03_hudi_cow','hive_sync.enable'='true','hive_sync.mode'='hms','dlf.catalog.endpoint'='dlf-vpc.cn-beijing.aliyuncs.com','dlf.catalog.region'='cn-beijing'-- ***************-- 在线压缩参数-- ,'compaction.async.enabled' = 'false'-- 指定分区字段,'hoodie.datasource.write.partitionpath.field'='name'-- 指定使用hive的分区表达方式来展示-- 文件夹名为:(day=20230101),不然的话分在oss上展示为20230101,'hoodie.datasource.write.hive_style_partitioning'='true');CREATE TEMPORARY TABLE print_sink
(    id       bigintNOTNULL,`name`  STRING
,datime       TIMESTAMP,`price` float,PRIMARY KEY (id)NOT ENFORCED
)WITH ('connector'='print','logger'='true','print-identifier'='sunyfOutputFormat');BEGIN STATEMENT SET;INSERTINTO hudi_sink
SELECTid
,`name`
,price
FROM mysqlcdc_source
;INSERTINTO print_sink
SELECT*FROM mysqlcdc_source
;end;

3. 建表方案

3.1. 创建hive外表读hudi文件

-- 使用hive外表的时候可以不指定flink hudi sink表中hive_sync 相关参数-- 直接采用外表的属性即可,这个外表的数据没有hudi的元数据字段createtable flink_sink_hudi_externel
(    id    bigintnotnull primary key
,`name` string
,price  float)ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'STORED AS INPUTFORMAT 'org.apache.hudi.hadoop.HoodieParquetInputFormat'OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'LOCATION 'oss://sunyf-oss-emr02-hudi/emr_hudi_cluster0630.db/flink_sink_hudi';

3.2. 创建spark hudi外表

# /bin/bash on spark 3.xspark-sql \
--conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' \--conf 'spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension' \--conf 'spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog'# sql on spark
use emr_hudi_cluster0630;# 指定location在oss而非默认的oss-hdfs((标题4)中有相关说明)createtable emr_hudi_cluster0630.spark_hudi_location_cow(  id bigint,  name string,  price float)using hudi options(type ='cow',primaryKey='id')location 'oss://sunyf-oss-emr02-hudi/emr_hudi_cluster0630.db/spark_hudi_location_cow';-- 为了避免字段类型不一致,导致的spark报错,这里伪造的原始数据都进行了一下cast-- 有些值cast过后会有精度的损失,hiveinsertinto emr_hudi_cluster0630.spark_hudi_location_cowvalues(cast(1asbigint),'sun',       cast(999.2asfloat)),(cast(2asbigint),'li',        cast(12.1asfloat));

3.3. 外表的删除

# 任一客户端执行:droptable emr_hudi_cluster0630.flink_test_02_hudi_cow;# 外表要手动删除oss上的文件,drop表仅drop元数据,不会删除数据文件
hdfs dfs -rm -f -r oss://sunyf-oss-emr02-hudi/emr_hudi_cluster0630.db/flink_test_02_hudi_cow

4. hudi表更新

通过修改参数实现cow表的实时更新

'write.operation' = 'insert'

image.png

'write.operation' = 'upsert'

image.png

5. 踩坑问题

5.1. spark创建hudi表

创建的时候指定location,desc的时候就会看到表是EXTERNAL 而不是 MANAGED,要注意表删除时相关数据是否留存或者删表重建,数据异常的问题

5.2. spark与hive获取元数据差异

在oss或者通过hdfs dfs命令对hudi表的文件进行删除或者更改后,维持当前的spark-sql-session是不能获取到这一元数据更改的,select查询该表会报文件不存在的异常,需要重启spark-sql客户端或者

refresh table[table_name]

进行元数据更新,hive侧正常执行。

5.3. 字段类型报错

hudi表字段类型与flink的表结构中字段类型要对应,否则在某些增量(手动写入+flink写入)场景下,可能导致数据读取异常(spark),hive兼容性较好,会进行字段类型的隐式转换,但是会有精度损失,如double->float

image.pngimage.png

5.4. flink hudi con对oss-hdfs支持问题

  1. 根据dlf中表存储的path(图3)直接添加到with参数(图1)中,会报非法参数异常(图2)
  2. 尝试:将oss-hdfs的路径后面的endpoint更改到endpoint参数中,jobmanager可以正常启动,但是写下的文件还是不在该内表指定的oss-hdfs存储路径下(日志中可以观察到,图4),oss上验证文件写入情况,如(图5,6)
  3. 查了下oss-hdfs的文档,应该是目前还不支持这个connector直接写默认存储在oss-hdfs的表。
  1. 参考文档:https://help.aliyun.com/document_detail/419069.html?spm=a2c4g.427753.0.i0
  2. 参考文档:https://help.aliyun.com/document_detail/471050.html?spm=a2c4g.141562.0.i6
  1. 绕行方案:在hudi建表的时候指定location在oss正常的bucket路径中,而不是oss-hdfs,可以正常使用image.pngimage.pngimage.pngimage.pngimage.pngimage.png

5.5. oss目录未删除问题

oss-hdfs 删除的db未删除(图1),dlf中已删除(图2 show databases 与dlf结果一致)

hdfs dfs -rm -f -r oss://sunyf-oss-emr02-hudi/emr_hudi_cluster0630.db/flink_sink_hudi_location

image.png

image.png

相关实践学习
如何在云端创建MySQL数据库
开始实验后,系统会自动创建一台自建MySQL的 源数据库 ECS 实例和一台 目标数据库 RDS。
全面了解阿里云能为你做什么
阿里云在全球各地部署高效节能的绿色数据中心,利用清洁计算为万物互联的新世界提供源源不断的能源动力,目前开服的区域包括中国(华北、华东、华南、香港)、新加坡、美国(美东、美西)、欧洲、中东、澳大利亚、日本。目前阿里云的产品涵盖弹性计算、数据库、存储与CDN、分析与搜索、云通信、网络、管理与监控、应用服务、互联网中间件、移动服务、视频服务等。通过本课程,来了解阿里云能够为你的业务带来哪些帮助     相关的阿里云产品:云服务器ECS 云服务器 ECS(Elastic Compute Service)是一种弹性可伸缩的计算服务,助您降低 IT 成本,提升运维效率,使您更专注于核心业务创新。产品详情: https://www.aliyun.com/product/ecs
相关文章
|
10天前
|
SQL 存储 缓存
阿里云EMR StarRocks X Paimon创建 Streaming Lakehouse
本文介绍了阿里云EMR StarRocks在数据湖分析领域的应用,涵盖StarRocks的数据湖能力、如何构建基于Paimon的实时湖仓、StarRocks与Paimon的最新进展及未来规划。文章强调了StarRocks在极速统一、简单易用方面的优势,以及在数据湖分析加速、湖仓分层建模、冷热融合及全链路ETL等场景的应用。
188 2
阿里云EMR StarRocks X Paimon创建 Streaming Lakehouse
|
3天前
|
SQL 存储 缓存
阿里云EMR StarRocks X Paimon创建 Streaming Lakehouse
讲师焦明烨介绍了StarRocks的数据湖能力,如何使用阿里云EMR StarRocks构建基于Paimon的极速实时湖仓,StarRocks与Paimon的最新进展及未来规划。
18 3
|
1月前
|
SQL 分布式计算 Serverless
阿里云 EMR Serverless Spark 版正式开启商业化
阿里云 EMR Serverless Spark 版正式开启商业化,内置 Fusion Engine,100% 兼容开源 Spark 编程接口,相比于开源 Spark 性能提升300%;提供 Notebook 及 SQL 开发、调试、发布、调度、监控诊断等一站式数据开发体验!
87 3
阿里云 EMR Serverless Spark 版正式开启商业化
|
1月前
|
SQL 存储 NoSQL
阿里云 EMR StarRocks 在七猫的应用和实践
本文整理自七猫资深大数据架构师蒋乾老师在 《阿里云 x StarRocks:极速湖仓第二季—上海站》的分享。
193 2
|
2月前
|
存储 分布式计算 大数据
大数据革新在即,阿里云EMR如何布局DeltaLake引领行业潮流?
【8月更文挑战第26天】大数据时代,实时处理与分析能力对企业至关重要。Delta Lake 作为高性能、可靠且支持 ACID 事务的开源存储层,已成为业界焦点。阿里云 EMR 深度布局 Delta Lake,计划深化集成、强化数据安全、优化实时性能,并加强生态建设与社区贡献。通过与 Spark 的无缝对接及持续的技术创新,阿里云 EMR 致力于提供更高效、安全的数据湖解决方案,引领大数据处理领域的发展新方向。
40 3
|
2月前
|
安全 数据管理 大数据
数据湖的未来已来:EMR DeltaLake携手阿里云DLF,重塑企业级数据处理格局
【8月更文挑战第26天】在大数据处理领域,阿里云EMR与DeltaLake的集成增强了数据处理能力。进一步结合阿里云DLF服务,实现了数据湖的一站式管理,自动化处理元数据及权限控制,简化管理流程。集成后的方案提升了数据安全性、可靠性和性能优化水平,让用户更专注业务价值。这一集成标志着数据湖技术向着自动化、安全和高效的未来迈出重要一步。
56 2
|
2月前
|
存储 分布式计算 大数据
阿里云 EMR 强势助力,与阿里云大数据体系共创辉煌,把握时代热点,开启生态建设之旅
【8月更文挑战第26天】阿里云EMR(Elastic MapReduce)是一种大数据处理服务,与阿里云的多个服务紧密结合,共同构建了完善的大数据生态系统。EMR与对象存储服务(OSS)集成,利用OSS提供可靠、低成本且可扩展的数据存储;与MaxCompute集成,实现深度数据分析和挖掘;还支持数据湖构建服务,加速数据湖的搭建并简化数据管理与分析过程。EMR提供多种编程接口及工具,如Hive、Spark和Flink等,帮助用户高效完成大数据处理任务。
66 2
|
3月前
|
机器学习/深度学习 人工智能 专有云
人工智能平台PAI使用问题之怎么将DLC的数据写入到另一个阿里云主账号的OSS中
阿里云人工智能平台PAI是一个功能强大、易于使用的AI开发平台,旨在降低AI开发门槛,加速创新,助力企业和开发者高效构建、部署和管理人工智能应用。其中包含了一系列相互协同的产品与服务,共同构成一个完整的人工智能开发与应用生态系统。以下是对PAI产品使用合集的概述,涵盖数据处理、模型开发、训练加速、模型部署及管理等多个环节。
|
2月前
|
存储 机器学习/深度学习 弹性计算
阿里云EMR数据湖文件系统问题之OSS-HDFS全托管服务的问题如何解决
阿里云EMR数据湖文件系统问题之OSS-HDFS全托管服务的问题如何解决
|
3月前
|
消息中间件 分布式计算 DataWorks
DataWorks产品使用合集之如何使用Python和阿里云SDK读取OSS中的文件
DataWorks作为一站式的数据开发与治理平台,提供了从数据采集、清洗、开发、调度、服务化、质量监控到安全管理的全套解决方案,帮助企业构建高效、规范、安全的大数据处理体系。以下是对DataWorks产品使用合集的概述,涵盖数据处理的各个环节。

热门文章

最新文章