flink写入数据到hudi的四种方式

本文涉及的产品
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
云数据库 RDS MySQL,集群版 2核4GB 100GB
推荐场景:
搭建个人博客
实时计算 Flink 版,5000CU*H 3个月
简介: 本文描述flink写入数据到hudi的四种方式

总览

bulk_insert

用于快速导入快照数据到hudi。

基本特性

bulk_insert可以减少数据序列化以及合并操作,于此同时,该数据写入方式会跳过数据去重,所以用户需要保证数据的唯一性。

bulk_insert在批量写入模式中是更加有效率的。默认情况下,批量执行模式按照分区路径对输入记录进行排序,并将这些记录写入Hudi,该方式可以避免频繁切换文件句柄导致的写性能下降。

bulk_insert的并行度有write.tasks参数指定,并行度会影响小文件的数量。理论上来说,bulk_insert的并行度就是bucket的数量(特别是,当每个bucket写到最大文件大小时,它将转到新的文件句柄。最后,文件的数量将大于参数write.bucket.assign.tasks指定的数量 )

可选配置参数

参数名称 是否必须 默认值 参数说明
write.operation true upsert 设置为 bulk_insert 以开启bulk_insert功能
write.tasks false 4 bulk_insert 并行度, the number of files >= write.bucket_assign.tasks
write.bulk_insert.shuffle_by_partition false true 写入前是否根据分区字段进行数据重分布。 启用此选项将减少小文件的数量,但可能存在数据倾斜的风险
write.bulk_insert.sort_by_partition false true 写入前是否根据分区字段对数据进行排序。 启用此选项将在写任务写多个分区时减少小文件的数量
write.sort.memory false 128 排序算子的可用托管内存。 默认为 128 MB

Flink SQL实践

使用datafaker生成100000条数据,放到mysql数据库中的stu4表。

数据生成方式以及Flink SQL使用方法见Flink SQL Client实战CDC数据入湖

使用bulk_insert方式写入到hudi中。

Flink SQL client 创建myql数据源

create table stu4(
  id bigint not null,
  name string,
  school string,
  nickname string,
  age int not null,
  score decimal(4,2) not null,
  class_num int not null,
  phone bigint not null,
  email string,
  ip string,
  PRIMARY KEY (id) NOT ENFORCED
) with (
  'connector' = 'jdbc',
  'url' = 'jdbc:mysql://hadoop:3306/test?serverTimezone=GMT%2B8',
  'username' = 'root',
  'password' = 'Pass-123-root',
  'table-name' = 'stu4'
);

创建hudi表

 create table stu4_sink_hudi(
  id bigint not null,
  name string,
  `school` string,
  nickname string,
  age int not null,
 score decimal(4,2) not null,
  class_num int not null,
  phone bigint not null,
  email string,
  ip string,
  primary key (id) not enforced
)
 partitioned by (`school`)
 with (
  'connector' = 'hudi',
  'path' = 'hdfs://hadoop:9000/tmp/stu4_sink_hudi',
  'table.type' = 'MERGE_ON_READ',
  'write.option' = 'bulk_insert',
  'write.precombine.field' = 'school'
  );

mysql数据插入到hudi中

insert into stu4_sink_hudi select * from stu4;

执行时间

Index bootstrap

基本特性

该方式用于快照数据+增量数据的导入。如果快照数据已经通过bulk_insert导入到hudi,那么用户就可以近实时插入增量数据并且通过index bootstrap功能来确保数据不会重复。

温馨提示:

如果你觉得这个过程特别耗时,那么你在写快照数据的时候可以多设置计算资源,然后在插入增量数据时减少计算资源。

可选配置参数

参数名称 是否必须 默认值 参数说明
index.bootstrap.enabled true false 当启用index bootstrap功能时,会将Hudi表中的剩余记录一次性加载到Flink状态中
index.partition.regex false * 优化参数,设置正则表达式来过滤分区。 默认情况下,所有分区都被加载到flink状态

使用方法

  1. CREATE TABLE创建一条与Hudi表对应的语句。 注意这个table.type配置必须正确。
  2. 设置index.bootstrap.enabled = true来启用index bootstrap功能
  3. 在flink-conf.yaml文件中设置Flink checkpoint的容错机制,设置配置项execution.checkpointing.tolerable-failed-checkpoints = n(取决于Flink checkpoint执行时间)
  4. 等待直到第一个checkpoint成功,表明index bootstrap完成。
  5. 在index bootstrap完成后,用户可以退出并保存savepoint(或直接使用外部 checkpoint)。
  6. 重启任务,并且设置index.bootstrap.enablefalse

温馨提示:

  1. 索引引导是一个阻塞过程,因此在索引引导期间无法完成checkpoint。
  2. index bootstrap由输入数据触发。 用户需要确保每个分区中至少有一条记录。
  3. index bootstrap是并发执行的。用户可以在日志文件中通过finish loading the index under partition以及Load record form file观察index bootstrap的进度。
  4. 第一个成功的checkpoint表明index bootstrap已完成。 从checkpoint恢复时,不需要再次加载索引。

Flink SQL实践

前提条件:

  1. 已有50w条数据已写入kafka,使用bulk_insert的方式将其导入hudi表。
  2. 再通过创建任务消费最新kafka数据,并开启index bootstrap特性。

如果未将数据导入kafka可使用Flink SQL Client实战CDC数据入湖文章提供的方法将数据导入kafka。

创建bulk_insert任务:

create table stu3_binlog_source_kafka(
  id bigint not null,
  name string,
  school string,
  nickname string,
  age int not null,
  class_num int not null,
  phone bigint not null,
  email string,
  ip string
 ) with (
  'connector' = 'kafka',
  'topic' = 'cdc_mysql_stu3_sink_test',
  'properties.bootstrap.servers' = 'hadoop1:9092',
  'format' = 'debezium-json',
  'scan.startup.mode' = 'earliest-offset',
  'properties.group.id' = 'testGroup_20210929_4'
  );
  
 create table stu3_binlog_sink_hudi(
  id bigint not null,
  name string,
  `school` string,
  nickname string,
  age int not null,
  class_num int not null,
  phone bigint not null,
  email string,
  ip string,
  primary key (id) not enforced
)
 partitioned by (`school`)
 with (
  'connector' = 'hudi',
  'path' = 'hdfs://hadoop:9000/tmp/stu3_binlog_sink_hudi_20210929_4',
  'table.type' = 'MERGE_ON_READ',
  'write.option' = 'bulk_insert',
  'write.precombine.field' = 'school'
  );
  
insert into stu3_binlog_sink_hudi select * from stu3_binlog_source_kafka;

创建开启index bootstrap特性、离线压缩任务。

create table stu3_binlog_source_kafka_1(
  id bigint not null,
  name string,
  school string,
  nickname string,
  age int not null,
  class_num int not null,
  phone bigint not null,
  email string,
  ip string
 ) with (
  'connector' = 'kafka',
  'topic' = 'cdc_mysql_stu3_sink_test',
  'properties.bootstrap.servers' = 'hadoop1:9092',
  'format' = 'debezium-json',
  'scan.startup.mode' = 'latest-offset',
  'properties.group.id' = 'testGroup_20210929_4'
  );
  
   create table stu3_binlog_sink_hudi_1(
  id bigint not null,
  name string,
  `school` string,
  nickname string,
  age int not null,
  class_num int not null,
  phone bigint not null,
  email string,
  ip string,
  primary key (id) not enforced
)
 partitioned by (`school`)
 with (
  'connector' = 'hudi',
  'path' = 'hdfs://hadoop:9000/tmp/stu3_binlog_sink_hudi_20210929_4',
  'table.type' = 'MERGE_ON_READ',
  'write.option' = 'upsert',
  'write.tasks' = '4',
  'write.precombine.field' = 'school',
  'compaction.async.enabled' = 'false', 
  'index.bootstrap.enabled' = 'true'
  );
  
insert into stu3_binlog_sink_hudi_1 select * from stu3_binlog_source_kafka_1;

提交离线压缩任务:

./bin/flink run -c org.apache.hudi.sink.compact.HoodieFlinkCompactor lib/hudi-flink-bundle_2.12-0.9.0.jar --path hdfs://hadoop:9000/tmp/stu3_binlog_sink_hudi_20210929_4

创建bulk_insert任务:

Changelog Mode

基本特性

Hudi可以保留消息的所有中间变化(I / -U / U / D),然后通过flink的状态计算消费,从而拥有一个接近实时的数据仓库ETL管道(增量计算)。 Hudi MOR表以行的形式存储消息,支持保留所有更改日志(格式级集成)。 所有的更新日志记录可以使用Flink流阅读器。

可选配置参数

参数名称 是否必须 默认值 参数说明
changelog.enabled false false 它在默认情况下是关闭的,为了拥有upsert语义,只有合并的消息被确保保留,中间的更改可以被合并。 设置为true以支持使用所有更改

温馨提示:

不管格式是否存储了中间更改日志消息,批处理(快照)读取仍然合并所有中间更改。

在设置changelog.enable为true时,更新日志记录的保留只是最大的努力: 异步压缩任务将更新日志记录合并到一条记录中,因此如果流源不及时消费,则压缩后只能读取每个key的合并记录。 解决方案是通过调整压缩策略,比如压缩选项:compress.delta_commits和compression.delta_seconds,为读取器保留一些缓冲时间。

Insert Mode

基本特性

默认情况下,Hudi对插入模式采用小文件策略:MOR将增量记录追加到日志文件中,COW合并基本parquet文件(增量数据集将被重复数据删除)。 这种策略会导致性能下降。

如果要禁止文件合并行为,可将write.insert.deduplicate设置为false,则跳过重复数据删除。 每次刷新行为直接写入一个新的 parquet文件(MOR表也直接写入parquet文件)。

可选配置参数

参数名称 是否必须 默认值 参数说明
write.insert.deduplicate false true “插入模式”默认启用重复数据删除功能。 关闭此选项后,每次刷新行为直接写入一个新的 parquet文件
相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
相关文章
|
4天前
|
资源调度 关系型数据库 MySQL
【Flink on YARN + CDC 3.0】神操作!看完这篇教程,你也能成为数据流处理高手!从零开始,一步步教会你在Flink on YARN模式下如何配置Debezium CDC 3.0,让你的数据库变更数据瞬间飞起来!
【8月更文挑战第15天】随着Apache Flink的普及,企业广泛采用Flink on YARN部署流处理应用,高效利用集群资源。变更数据捕获(CDC)工具在现代数据栈中至关重要,能实时捕捉数据库变化并转发给下游系统处理。本文以Flink on YARN为例,介绍如何在Debezium CDC 3.0中配置MySQL连接器,实现数据流处理。首先确保YARN上已部署Flink集群,接着安装Debezium MySQL连接器并配置Kafka Connect。最后,创建Flink任务消费变更事件并提交任务到Flink集群。通过这些步骤,可以构建出从数据库变更到实时处理的无缝数据管道。
17 2
|
1天前
|
资源调度 Oracle Java
实时计算 Flink版产品使用问题之在YARN集群上运行时,如何查看每个并行度的详细处理数据情况
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
1天前
|
SQL Oracle Java
实时计算 Flink版产品使用问题之采集Oracle数据时,为什么无法采集到其他TABLESPACE的表
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
1天前
|
SQL Oracle 数据处理
实时计算 Flink版产品使用问题之如何优化数据读取速度
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
1天前
|
分布式计算 Oracle 关系型数据库
实时计算 Flink版产品使用问题之获取Oracle的数据时无法获取clob类型的数据,该怎么办
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
1天前
|
Kubernetes 关系型数据库 API
实时计算 Flink版产品使用问题之连接的PG表长时间无数据写入,WAL日志持续增长,该如何解决
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
1天前
|
存储 Java 关系型数据库
实时计算 Flink版产品使用问题之以jar包方式同步数据是否需要定义存储oss的位置
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
1天前
|
数据采集 Oracle 关系型数据库
实时计算 Flink版产品使用问题之怎么实现从Oracle数据库读取多个表并将数据写入到Iceberg表
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
5天前
|
消息中间件 分布式计算 Kafka
流计算引擎数据问题之MillWheel 和 Flink 实现数据流的同步处理如何解决
流计算引擎数据问题之MillWheel 和 Flink 实现数据流的同步处理如何解决
12 0
|
5天前
|
消息中间件 Kafka Apache
流计算引擎数据问题之Apache Flink 的完整性推理方案设计如何解决
流计算引擎数据问题之Apache Flink 的完整性推理方案设计如何解决
12 0