flink写入数据到hudi的四种方式

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
云数据库 RDS MySQL,集群系列 2核4GB
推荐场景:
搭建个人博客
简介: 本文描述flink写入数据到hudi的四种方式

## 总览 ![](https://ucc.alicdn.com/notfound.png) ## bulk_insert 用于快速导入快照数据到hudi。 ### 基本特性 bulk_insert可以减少数据序列化以及合并操作,于此同时,该数据写入方式会`跳过数据去重`,所以用户需要保证数据的唯一性。 bulk_insert在批量写入模式中是更加有效率的。默认情况下,批量执行模式按照分区路径对输入记录进行排序,并将这些记录写入Hudi,该方式可以避免频繁切换文件句柄导致的写性能下降。 bulk_insert的并行度有`write.tasks`参数指定,并行度会影响小文件的数量。理论上来说,bulk_insert的并行度就是bucket的数量(特别是,当每个bucket写到最大文件大小时,它将转到新的文件句柄。最后,文件的数量将大于参数`write.bucket.assign.tasks`指定的数量 ) ### 可选配置参数 |参数名称|是否必须|默认值|参数说明| |---|---|---|---| |`write.operation`|`true`|`upsert`|设置为 `bulk_insert` 以开启bulk_insert功能| |`write.tasks`|`false`|`4`| `bulk_insert` 并行度, `the number of files` >= [`write.bucket_assign.tasks`](https://hudi.apache.org/cn/docs/flink-quick-start-guide/#parallelism )| |`write.bulk_insert.shuffle_by_partition`|`false`|`true`|写入前是否根据分区字段进行数据重分布。 启用此选项将减少小文件的数量,但可能存在数据倾斜的风险  | |`write.bulk_insert.sort_by_partition`|`false`|`true`|写入前是否根据分区字段对数据进行排序。 启用此选项将在写任务写多个分区时减少小文件的数量  | |`write.sort.memory`|`false`|`128`|排序算子的可用托管内存。 默认为 `128` MB| ### Flink SQL实践 使用datafaker生成100000条数据,放到mysql数据库中的stu4表。 使用bulk_insert方式写入到hudi中。 Flink SQL client 创建myql数据源 ```SQL create table stu4(   id bigint not null,   name string,   school string,   nickname string,   age int not null,   score decimal(4,2) not null,   class_num int not null,   phone bigint not null,   email string,   ip string,   PRIMARY KEY (id) NOT ENFORCED ) with (   'connector' = 'jdbc',   'url' = 'jdbc:mysql://hadoop:3306/test?serverTimezone=GMT%2B8',   'username' = 'root',   'password' = 'Pass-123-root',   'table-name' = 'stu4' ); ``` 创建hudi表 ```SQL  create table stu4_sink_hudi(   id bigint not null,   name string,   `school` string,   nickname string,   age int not null,  score decimal(4,2) not null,   class_num int not null,   phone bigint not null,   email string,   ip string,   primary key (id) not enforced )  partitioned by (`school`)  with (   'connector' = 'hudi',   'path' = 'hdfs://hadoop:9000/tmp/stu4_sink_hudi',   'table.type' = 'MERGE_ON_READ',   'write.option' = 'bulk_insert',   'write.precombine.field' = 'school'   ); ``` mysql数据插入到hudi中 ```sql insert into stu4_sink_hudi select * from stu4; ``` 执行时间 ![](https://ucc.alicdn.com/notfound.png) ## Index bootstrap ### 基本特性 该方式用于快照数据+增量数据的导入。如果快照数据已经通过bulk_insert导入到hudi,那么用户就可以近实时插入增量数据并且通过index bootstrap功能来确保数据不会重复。 温馨提示: 如果你觉得这个过程特别耗时,那么你在写快照数据的时候可以多设置计算资源,然后在插入增量数据时减少计算资源。 ### 可选配置参数 |参数名称|是否必须|默认值|参数说明| |---|---|---|---| |`index.bootstrap.enabled`|`true`|`false`|当启用index bootstrap功能时,会将Hudi表中的剩余记录一次性加载到Flink状态中| |`index.partition.regex`|`false`|`*`|优化参数,设置正则表达式来过滤分区。 默认情况下,所有分区都被加载到flink状态  | ### 使用方法 1. CREATE TABLE创建一条与Hudi表对应的语句。 注意这个table.type配置必须正确。 2. 设置`index.bootstrap.enabled = true`来启用index bootstrap功能 3. 在flink-conf.yaml文件中设置Flink checkpoint的容错机制,设置配置项`execution.checkpointing.tolerable-failed-checkpoints = n`(取决于Flink checkpoint执行时间) 4. 等待直到第一个checkpoint成功,表明index bootstrap完成。 5. 在index bootstrap完成后,用户可以退出并保存savepoint(或直接使用外部 checkpoint)。 6. 重启任务,并且设置`index.bootstrap.enable` 为 `false` 温馨提示: 1. 索引引导是一个阻塞过程,因此在索引引导期间无法完成checkpoint。 2. index bootstrap由输入数据触发。 用户需要确保每个分区中至少有一条记录。 3. index bootstrap是并发执行的。用户可以在日志文件中通过`finish loading the index under partition`以及`Load record form file`观察index bootstrap的进度。 4. 第一个成功的checkpoint表明index bootstrap已完成。 从checkpoint恢复时,不需要再次加载索引。 ### Flink SQL实践 前提条件: 1. 已有50w条数据已写入kafka,使用bulk_insert的方式将其导入hudi表。 2. 再通过创建任务消费最新kafka数据,并开启index bootstrap特性。 创建bulk_insert任务: ```sql create table stu3_binlog_source_kafka(   id bigint not null,   name string,   school string,   nickname string,   age int not null,   class_num int not null,   phone bigint not null,   email string,   ip string  ) with (   'connector' = 'kafka',   'topic' = 'cdc_mysql_stu3_sink_test',   'properties.bootstrap.servers' = 'hadoop1:9092',   'format' = 'debezium-json',   'scan.startup.mode' = 'earliest-offset',   'properties.group.id' = 'testGroup_20210929_4'   );     create table stu3_binlog_sink_hudi(   id bigint not null,   name string,   `school` string,   nickname string,   age int not null,   class_num int not null,   phone bigint not null,   email string,   ip string,   primary key (id) not enforced )  partitioned by (`school`)  with (   'connector' = 'hudi',   'path' = 'hdfs://hadoop:9000/tmp/stu3_binlog_sink_hudi_20210929_4',   'table.type' = 'MERGE_ON_READ',   'write.option' = 'bulk_insert',   'write.precombine.field' = 'school'   );    insert into stu3_binlog_sink_hudi select * from stu3_binlog_source_kafka; ``` 创建开启index bootstrap特性、离线压缩任务。 ```sql create table stu3_binlog_source_kafka_1(   id bigint not null,   name string,   school string,   nickname string,   age int not null,   class_num int not null,   phone bigint not null,   email string,   ip string  ) with (   'connector' = 'kafka',   'topic' = 'cdc_mysql_stu3_sink_test',   'properties.bootstrap.servers' = 'hadoop1:9092',   'format' = 'debezium-json',   'scan.startup.mode' = 'latest-offset',   'properties.group.id' = 'testGroup_20210929_4'   );       create table stu3_binlog_sink_hudi_1(   id bigint not null,   name string,   `school` string,   nickname string,   age int not null,   class_num int not null,   phone bigint not null,   email string,   ip string,   primary key (id) not enforced )  partitioned by (`school`)  with (   'connector' = 'hudi',   'path' = 'hdfs://hadoop:9000/tmp/stu3_binlog_sink_hudi_20210929_4',   'table.type' = 'MERGE_ON_READ',   'write.option' = 'upsert',   'write.tasks' = '4',   'write.precombine.field' = 'school',   'compaction.async.enabled' = 'false',    'index.bootstrap.enabled' = 'true'   );    insert into stu3_binlog_sink_hudi_1 select * from stu3_binlog_source_kafka_1; ``` 提交离线压缩任务: ```bash ./bin/flink run -c org.apache.hudi.sink.compact.HoodieFlinkCompactor lib/hudi-flink-bundle_2.12-0.9.0.jar --path hdfs://hadoop:9000/tmp/stu3_binlog_sink_hudi_20210929_4 ``` 创建bulk_insert任务: ## Changelog Mode ### 基本特性 Hudi可以保留消息的所有中间变化(I / -U / U / D),然后通过flink的状态计算消费,从而拥有一个接近实时的数据仓库ETL管道(增量计算)。 Hudi MOR表以行的形式存储消息,支持保留所有更改日志(格式级集成)。 所有的更新日志记录可以使用Flink流阅读器。 ### 可选配置参数 |参数名称|是否必须|默认值|参数说明| |---|---|---|---| |`changelog.enabled`|`false`|`false`|它在默认情况下是关闭的,为了拥有upsert语义,只有合并的消息被确保保留,中间的更改可以被合并。 设置为true以支持使用所有更改  | 温馨提示: 不管格式是否存储了中间更改日志消息,批处理(快照)读取仍然合并所有中间更改。 在设置changelog.enable为true时,更新日志记录的保留只是最大的努力: 异步压缩任务将更新日志记录合并到一条记录中,因此如果流源不及时消费,则压缩后只能读取每个key的合并记录。 解决方案是通过调整压缩策略,比如压缩选项:compress.delta_commits和compression.delta_seconds,为读取器保留一些缓冲时间。 ## Insert Mode ### 基本特性 默认情况下,Hudi对插入模式采用小文件策略:MOR将增量记录追加到日志文件中,COW合并基本parquet文件(增量数据集将被重复数据删除)。 这种策略会导致性能下降。 如果要禁止文件合并行为,可将write.insert.deduplicate设置为false,则跳过重复数据删除。 每次刷新行为直接写入一个新的 parquet文件(MOR表也直接写入parquet文件)。 ### 可选配置参数 |参数名称|是否必须|默认值|参数说明| |---|---|---|---| |`write.insert.deduplicate`|`false`|`true`|“插入模式”默认启用重复数据删除功能。 关闭此选项后,每次刷新行为直接写入一个新的 parquet文件  |

相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
相关文章
|
13天前
|
Java 关系型数据库 MySQL
SpringBoot 通过集成 Flink CDC 来实时追踪 MySql 数据变动
通过详细的步骤和示例代码,您可以在 SpringBoot 项目中成功集成 Flink CDC,并实时追踪 MySQL 数据库的变动。
106 43
|
3月前
|
存储 监控 数据处理
flink 向doris 数据库写入数据时出现背压如何排查?
本文介绍了如何确定和解决Flink任务向Doris数据库写入数据时遇到的背压问题。首先通过Flink Web UI和性能指标监控识别背压,然后从Doris数据库性能、网络连接稳定性、Flink任务数据处理逻辑及资源配置等方面排查原因,并通过分析相关日志进一步定位问题。
283 61
|
4月前
|
运维 数据处理 Apache
数据实时计算产品对比测评报告:阿里云实时计算Flink版
数据实时计算产品对比测评报告:阿里云实时计算Flink版
|
4月前
|
分布式计算 监控 大数据
大数据-148 Apache Kudu 从 Flink 下沉数据到 Kudu
大数据-148 Apache Kudu 从 Flink 下沉数据到 Kudu
119 1
|
4月前
|
消息中间件 Java Kafka
Flink-04 Flink Java 3分钟上手 FlinkKafkaConsumer消费Kafka数据 进行计算SingleOutputStreamOperatorDataStreamSource
Flink-04 Flink Java 3分钟上手 FlinkKafkaConsumer消费Kafka数据 进行计算SingleOutputStreamOperatorDataStreamSource
83 1
|
4月前
|
SQL 分布式计算 大数据
大数据-108 Flink 快速应用案例 重回Hello WordCount!方案1批数据 方案2流数据(一)
大数据-108 Flink 快速应用案例 重回Hello WordCount!方案1批数据 方案2流数据(一)
80 0
|
4月前
|
大数据 流计算
大数据-108 Flink 快速应用案例 重回Hello WordCount!方案1批数据 方案2流数据(二)
大数据-108 Flink 快速应用案例 重回Hello WordCount!方案1批数据 方案2流数据(二)
70 0
|
5月前
|
SQL 安全 数据处理
揭秘数据脱敏神器:Flink SQL的神秘力量,守护你的数据宝藏!
【9月更文挑战第7天】在大数据时代,数据管理和处理尤为重要,尤其在保障数据安全与隐私方面。本文探讨如何利用Flink SQL实现数据脱敏,为实时数据处理提供有效的隐私保护方案。数据脱敏涉及在处理、存储或传输前对敏感数据进行加密、遮蔽或替换,以遵守数据保护法规(如GDPR)。Flink SQL通过内置函数和表达式支持这一过程。
117 2
|
6月前
|
调度 流计算
Flink 新一代流计算和容错问题之Flink 中的数据可以分为什么类型
Flink 新一代流计算和容错问题之Flink 中的数据可以分为什么类型

热门文章

最新文章