1. 上游数据准备
上游数据以mysql为例作为hudi入湖的上游应用表,详细的建表、插入及更新语句如下
droptable if exists `sunyf_db`.`flink_test_02_hudi`;createtable if not EXISTS `sunyf_db`.`flink_test_02_hudi`( id BIGINTnotnull PRIMARY KEY ,`name` varchar(20),datime TIMESTAMP,price FLOAT);INSERTINTO `sunyf_db`.`flink_test_02_hudi` values(1,'zhao', CURRENT_TIMESTAMP,20);INSERTINTO `sunyf_db`.`flink_test_02_hudi` values(2,'qian', CURRENT_TIMESTAMP,30);INSERTINTO `sunyf_db`.`flink_test_02_hudi` values(3,'sun', CURRENT_TIMESTAMP,40);INSERTINTO `sunyf_db`.`flink_test_02_hudi` values(4,'li', CURRENT_TIMESTAMP,50);INSERTINTO `sunyf_db`.`flink_test_02_hudi` values(5,'zhou', CURRENT_TIMESTAMP,60);select*from `sunyf_db`.`flink_test_02_hudi` update `sunyf_db`.`flink_test_02_hudi` set `name` ='sunyf35'where id =3;
2. Flink-SQL任务
flink-hudi-connector参数参考:https://hudi.apache.org/docs/basic_configurations#Flink-Options
--********************************************************************---- Author: sunyf-- Created Time: 2023-07-03 17:52:47-- Description: Write your description here-- Hints: You can use SET statements to modify the configuration--********************************************************************--CREATE TEMPORARY TABLE mysqlcdc_source ( id bigint,`name` STRING ,datime TIMESTAMP,price float,PRIMARY KEY (id)NOT ENFORCED )WITH ('connector'='mysql','hostname'='rm-xxx.mysql.rds.aliyuncs.com','port'='3306','username'='用户名','password'='密码','database-name'='sunyf_db','table-name'='flink_test_02_hudi');CREATE TEMPORARY TABLE hudi_sink ( id bigintNOTNULL,`name` STRING ,`price` float,PRIMARY KEY (id)NOT ENFORCED -- 或者直接在id字段后面写 id int not null primary key 也可以-- 或者 指定如下参数 hoodie.datasource.write.recordkey.field)WITH ('connector'='hudi','oss.endpoint'='oss-cn-xx-internal.aliyuncs.com','accessKeyId'='用户名','accessKeySecret'='密码','path'='oss://sunyf-oss-emr02-hudi/emr_hudi_cluster0630.db/flink_test_03_hudi_cow','table.type'='COPY_ON_WRITE'--MERGE_ON_READ,'write.operation'='insert'-- ,'precombine.field' = 'ts'-- ***************-- 下面的元数据相关参数如果不指定的话--hive_sync.enable=false,仅落地文件,需要参考(标题3)中的两种方案建表,'hive_sync.db'='emr_hudi_cluster0630','hive_sync.table'='flink_test_03_hudi_cow','hive_sync.enable'='true','hive_sync.mode'='hms','dlf.catalog.endpoint'='dlf-vpc.cn-beijing.aliyuncs.com','dlf.catalog.region'='cn-beijing'-- ***************-- 在线压缩参数-- ,'compaction.async.enabled' = 'false'-- 指定分区字段,'hoodie.datasource.write.partitionpath.field'='name'-- 指定使用hive的分区表达方式来展示-- 文件夹名为:(day=20230101),不然的话分在oss上展示为20230101,'hoodie.datasource.write.hive_style_partitioning'='true');CREATE TEMPORARY TABLE print_sink ( id bigintNOTNULL,`name` STRING ,datime TIMESTAMP,`price` float,PRIMARY KEY (id)NOT ENFORCED )WITH ('connector'='print','logger'='true','print-identifier'='sunyfOutputFormat');BEGIN STATEMENT SET;INSERTINTO hudi_sink SELECTid ,`name` ,price FROM mysqlcdc_source ;INSERTINTO print_sink SELECT*FROM mysqlcdc_source ;end;
3. 建表方案
3.1. 创建hive外表读hudi文件
-- 使用hive外表的时候可以不指定flink hudi sink表中hive_sync 相关参数-- 直接采用外表的属性即可,这个外表的数据没有hudi的元数据字段createtable flink_sink_hudi_externel ( id bigintnotnull primary key ,`name` string ,price float)ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'STORED AS INPUTFORMAT 'org.apache.hudi.hadoop.HoodieParquetInputFormat'OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'LOCATION 'oss://sunyf-oss-emr02-hudi/emr_hudi_cluster0630.db/flink_sink_hudi';
3.2. 创建spark hudi外表
# /bin/bash on spark 3.xspark-sql \ --conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' \--conf 'spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension' \--conf 'spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog'# sql on spark use emr_hudi_cluster0630;# 指定location在oss而非默认的oss-hdfs((标题4)中有相关说明)createtable emr_hudi_cluster0630.spark_hudi_location_cow( id bigint, name string, price float)using hudi options(type ='cow',primaryKey='id')location 'oss://sunyf-oss-emr02-hudi/emr_hudi_cluster0630.db/spark_hudi_location_cow';-- 为了避免字段类型不一致,导致的spark报错,这里伪造的原始数据都进行了一下cast-- 有些值cast过后会有精度的损失,hiveinsertinto emr_hudi_cluster0630.spark_hudi_location_cowvalues(cast(1asbigint),'sun', cast(999.2asfloat)),(cast(2asbigint),'li', cast(12.1asfloat));
3.3. 外表的删除
# 任一客户端执行:droptable emr_hudi_cluster0630.flink_test_02_hudi_cow;# 外表要手动删除oss上的文件,drop表仅drop元数据,不会删除数据文件 hdfs dfs -rm -f -r oss://sunyf-oss-emr02-hudi/emr_hudi_cluster0630.db/flink_test_02_hudi_cow
4. hudi表更新
通过修改参数实现cow表的实时更新
'write.operation' = 'insert'
'write.operation' = 'upsert'
5. 踩坑问题
5.1. spark创建hudi表
创建的时候指定location,desc的时候就会看到表是EXTERNAL 而不是 MANAGED,要注意表删除时相关数据是否留存或者删表重建,数据异常的问题
5.2. spark与hive获取元数据差异
在oss或者通过hdfs dfs命令对hudi表的文件进行删除或者更改后,维持当前的spark-sql-session是不能获取到这一元数据更改的,select查询该表会报文件不存在的异常,需要重启spark-sql客户端或者
refresh table[table_name]
进行元数据更新,hive侧正常执行。
5.3. 字段类型报错
hudi表字段类型与flink的表结构中字段类型要对应,否则在某些增量(手动写入+flink写入)场景下,可能导致数据读取异常(spark),hive兼容性较好,会进行字段类型的隐式转换,但是会有精度损失,如double->float
5.4. flink hudi con对oss-hdfs支持问题
- 根据dlf中表存储的path(图3)直接添加到with参数(图1)中,会报非法参数异常(图2)
- 尝试:将oss-hdfs的路径后面的endpoint更改到endpoint参数中,jobmanager可以正常启动,但是写下的文件还是不在该内表指定的oss-hdfs存储路径下(日志中可以观察到,图4),oss上验证文件写入情况,如(图5,6)
- 查了下oss-hdfs的文档,应该是目前还不支持这个connector直接写默认存储在oss-hdfs的表。
- 参考文档:https://help.aliyun.com/document_detail/419069.html?spm=a2c4g.427753.0.i0
- 参考文档:https://help.aliyun.com/document_detail/471050.html?spm=a2c4g.141562.0.i6
- 绕行方案:在hudi建表的时候指定location在oss正常的bucket路径中,而不是oss-hdfs,可以正常使用
5.5. oss目录未删除问题
oss-hdfs 删除的db未删除(图1),dlf中已删除(图2 show databases 与dlf结果一致)
hdfs dfs -rm -f -r oss://sunyf-oss-emr02-hudi/emr_hudi_cluster0630.db/flink_sink_hudi_location