## 总览 ![](https://ucc.alicdn.com/notfound.png) ## bulk_insert 用于快速导入快照数据到hudi。 ### 基本特性 bulk_insert可以减少数据序列化以及合并操作,于此同时,该数据写入方式会`跳过数据去重`,所以用户需要保证数据的唯一性。 bulk_insert在批量写入模式中是更加有效率的。默认情况下,批量执行模式按照分区路径对输入记录进行排序,并将这些记录写入Hudi,该方式可以避免频繁切换文件句柄导致的写性能下降。 bulk_insert的并行度有`write.tasks`参数指定,并行度会影响小文件的数量。理论上来说,bulk_insert的并行度就是bucket的数量(特别是,当每个bucket写到最大文件大小时,它将转到新的文件句柄。最后,文件的数量将大于参数`write.bucket.assign.tasks`指定的数量 ) ### 可选配置参数 |参数名称|是否必须|默认值|参数说明| |---|---|---|---| |`write.operation`|`true`|`upsert`|设置为 `bulk_insert` 以开启bulk_insert功能| |`write.tasks`|`false`|`4`| `bulk_insert` 并行度, `the number of files` >= [`write.bucket_assign.tasks`](https://hudi.apache.org/cn/docs/flink-quick-start-guide/#parallelism )| |`write.bulk_insert.shuffle_by_partition`|`false`|`true`|写入前是否根据分区字段进行数据重分布。 启用此选项将减少小文件的数量,但可能存在数据倾斜的风险 | |`write.bulk_insert.sort_by_partition`|`false`|`true`|写入前是否根据分区字段对数据进行排序。 启用此选项将在写任务写多个分区时减少小文件的数量 | |`write.sort.memory`|`false`|`128`|排序算子的可用托管内存。 默认为 `128` MB| ### Flink SQL实践 使用datafaker生成100000条数据,放到mysql数据库中的stu4表。 使用bulk_insert方式写入到hudi中。 Flink SQL client 创建myql数据源 ```SQL create table stu4( id bigint not null, name string, school string, nickname string, age int not null, score decimal(4,2) not null, class_num int not null, phone bigint not null, email string, ip string, PRIMARY KEY (id) NOT ENFORCED ) with ( 'connector' = 'jdbc', 'url' = 'jdbc:mysql://hadoop:3306/test?serverTimezone=GMT%2B8', 'username' = 'root', 'password' = 'Pass-123-root', 'table-name' = 'stu4' ); ``` 创建hudi表 ```SQL create table stu4_sink_hudi( id bigint not null, name string, `school` string, nickname string, age int not null, score decimal(4,2) not null, class_num int not null, phone bigint not null, email string, ip string, primary key (id) not enforced ) partitioned by (`school`) with ( 'connector' = 'hudi', 'path' = 'hdfs://hadoop:9000/tmp/stu4_sink_hudi', 'table.type' = 'MERGE_ON_READ', 'write.option' = 'bulk_insert', 'write.precombine.field' = 'school' ); ``` mysql数据插入到hudi中 ```sql insert into stu4_sink_hudi select * from stu4; ``` 执行时间 ![](https://ucc.alicdn.com/notfound.png) ## Index bootstrap ### 基本特性 该方式用于快照数据+增量数据的导入。如果快照数据已经通过bulk_insert导入到hudi,那么用户就可以近实时插入增量数据并且通过index bootstrap功能来确保数据不会重复。 温馨提示: 如果你觉得这个过程特别耗时,那么你在写快照数据的时候可以多设置计算资源,然后在插入增量数据时减少计算资源。 ### 可选配置参数 |参数名称|是否必须|默认值|参数说明| |---|---|---|---| |`index.bootstrap.enabled`|`true`|`false`|当启用index bootstrap功能时,会将Hudi表中的剩余记录一次性加载到Flink状态中| |`index.partition.regex`|`false`|`*`|优化参数,设置正则表达式来过滤分区。 默认情况下,所有分区都被加载到flink状态 | ### 使用方法 1. CREATE TABLE创建一条与Hudi表对应的语句。 注意这个table.type配置必须正确。 2. 设置`index.bootstrap.enabled = true`来启用index bootstrap功能 3. 在flink-conf.yaml文件中设置Flink checkpoint的容错机制,设置配置项`execution.checkpointing.tolerable-failed-checkpoints = n`(取决于Flink checkpoint执行时间) 4. 等待直到第一个checkpoint成功,表明index bootstrap完成。 5. 在index bootstrap完成后,用户可以退出并保存savepoint(或直接使用外部 checkpoint)。 6. 重启任务,并且设置`index.bootstrap.enable` 为 `false` 温馨提示: 1. 索引引导是一个阻塞过程,因此在索引引导期间无法完成checkpoint。 2. index bootstrap由输入数据触发。 用户需要确保每个分区中至少有一条记录。 3. index bootstrap是并发执行的。用户可以在日志文件中通过`finish loading the index under partition`以及`Load record form file`观察index bootstrap的进度。 4. 第一个成功的checkpoint表明index bootstrap已完成。 从checkpoint恢复时,不需要再次加载索引。 ### Flink SQL实践 前提条件: 1. 已有50w条数据已写入kafka,使用bulk_insert的方式将其导入hudi表。 2. 再通过创建任务消费最新kafka数据,并开启index bootstrap特性。 创建bulk_insert任务: ```sql create table stu3_binlog_source_kafka( id bigint not null, name string, school string, nickname string, age int not null, class_num int not null, phone bigint not null, email string, ip string ) with ( 'connector' = 'kafka', 'topic' = 'cdc_mysql_stu3_sink_test', 'properties.bootstrap.servers' = 'hadoop1:9092', 'format' = 'debezium-json', 'scan.startup.mode' = 'earliest-offset', 'properties.group.id' = 'testGroup_20210929_4' ); create table stu3_binlog_sink_hudi( id bigint not null, name string, `school` string, nickname string, age int not null, class_num int not null, phone bigint not null, email string, ip string, primary key (id) not enforced ) partitioned by (`school`) with ( 'connector' = 'hudi', 'path' = 'hdfs://hadoop:9000/tmp/stu3_binlog_sink_hudi_20210929_4', 'table.type' = 'MERGE_ON_READ', 'write.option' = 'bulk_insert', 'write.precombine.field' = 'school' ); insert into stu3_binlog_sink_hudi select * from stu3_binlog_source_kafka; ``` 创建开启index bootstrap特性、离线压缩任务。 ```sql create table stu3_binlog_source_kafka_1( id bigint not null, name string, school string, nickname string, age int not null, class_num int not null, phone bigint not null, email string, ip string ) with ( 'connector' = 'kafka', 'topic' = 'cdc_mysql_stu3_sink_test', 'properties.bootstrap.servers' = 'hadoop1:9092', 'format' = 'debezium-json', 'scan.startup.mode' = 'latest-offset', 'properties.group.id' = 'testGroup_20210929_4' ); create table stu3_binlog_sink_hudi_1( id bigint not null, name string, `school` string, nickname string, age int not null, class_num int not null, phone bigint not null, email string, ip string, primary key (id) not enforced ) partitioned by (`school`) with ( 'connector' = 'hudi', 'path' = 'hdfs://hadoop:9000/tmp/stu3_binlog_sink_hudi_20210929_4', 'table.type' = 'MERGE_ON_READ', 'write.option' = 'upsert', 'write.tasks' = '4', 'write.precombine.field' = 'school', 'compaction.async.enabled' = 'false', 'index.bootstrap.enabled' = 'true' ); insert into stu3_binlog_sink_hudi_1 select * from stu3_binlog_source_kafka_1; ``` 提交离线压缩任务: ```bash ./bin/flink run -c org.apache.hudi.sink.compact.HoodieFlinkCompactor lib/hudi-flink-bundle_2.12-0.9.0.jar --path hdfs://hadoop:9000/tmp/stu3_binlog_sink_hudi_20210929_4 ``` 创建bulk_insert任务: ## Changelog Mode ### 基本特性 Hudi可以保留消息的所有中间变化(I / -U / U / D),然后通过flink的状态计算消费,从而拥有一个接近实时的数据仓库ETL管道(增量计算)。 Hudi MOR表以行的形式存储消息,支持保留所有更改日志(格式级集成)。 所有的更新日志记录可以使用Flink流阅读器。 ### 可选配置参数 |参数名称|是否必须|默认值|参数说明| |---|---|---|---| |`changelog.enabled`|`false`|`false`|它在默认情况下是关闭的,为了拥有upsert语义,只有合并的消息被确保保留,中间的更改可以被合并。 设置为true以支持使用所有更改 | 温馨提示: 不管格式是否存储了中间更改日志消息,批处理(快照)读取仍然合并所有中间更改。 在设置changelog.enable为true时,更新日志记录的保留只是最大的努力: 异步压缩任务将更新日志记录合并到一条记录中,因此如果流源不及时消费,则压缩后只能读取每个key的合并记录。 解决方案是通过调整压缩策略,比如压缩选项:compress.delta_commits和compression.delta_seconds,为读取器保留一些缓冲时间。 ## Insert Mode ### 基本特性 默认情况下,Hudi对插入模式采用小文件策略:MOR将增量记录追加到日志文件中,COW合并基本parquet文件(增量数据集将被重复数据删除)。 这种策略会导致性能下降。 如果要禁止文件合并行为,可将write.insert.deduplicate设置为false,则跳过重复数据删除。 每次刷新行为直接写入一个新的 parquet文件(MOR表也直接写入parquet文件)。 ### 可选配置参数 |参数名称|是否必须|默认值|参数说明| |---|---|---|---| |`write.insert.deduplicate`|`false`|`true`|“插入模式”默认启用重复数据删除功能。 关闭此选项后,每次刷新行为直接写入一个新的 parquet文件 |