使用flink SQL Client将mysql数据写入到hudi并同步到hive

本文涉及的产品
云数据库 RDS MySQL,集群系列 2核4GB
推荐场景:
搭建个人博客
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
实时计算 Flink 版,5000CU*H 3个月
简介: 本文介绍如何使用flink SQL Client将mysql数据写入到hudi并同步到hive

测试环境

组件版本

  • mysql 5.7
  • hive 3.1.2
  • flink 1.12.2
  • hudi 0.9.0
  • hadoop 3.2.0

首先请确保以下组件正常启动:

  • mysql
  • hivemetastore
  • hiveserver2
  • hdfs
  • yarn

hudi适配hive 3.1.2源码编译

0.9.0版本的hudi在适配hive3时,其hudi/package/hudi-flink-bundle/pom.xml文件使用的flink-connector-hive版本有问题,所以需要修改pom文件。

修改点一:

143行,修改为:

<include>org.apache.flink:flink-sql-connector-hive-${hive.version}_${scala.binary.version}</include>

642行,修改为:

<artifactId>flink-sql-connector-hive-${hive.version}_${scala.binary.version}</artifactId>

编译命令:

mvn clean install -DskipTests -Pflink-bundle-shade-hive3 -Dhadoop.version=3.2.0 -Dhive.version=3.1.2 -Pinclude-flink-sql-connector-hive -U -Dscala.version=2.12.10 -Dscala.binary.version=2.12

将编译后得到的hudi/package/hudi-flink-bundle/target/hudi-flink-bundle_2.12-0.9.0.jar拷贝到flink/lib目录下,将得到的hudi/package/hudi-hadoop-mr-bundle/target/hudi-hadoop-mr-bundle-0.9.0.jar拷贝到hive/auxlib目录下,如果没有这个目录则新建一个即可。

关于flink操作hudi的相关方法如果有疑惑的可先看本系列的其他文章,例如使用flink插入数据到hudi数据湖初探Flink SQL Client实战CDC数据入湖等。

生成测试数据

使用datafaker生成100000条数据,放到mysql数据库中的stu4表。

datafaker工具使用方法见datafaker --- 测试数据生成工具

首先在mysql中新建表test.stu4

create database test;
use test;
create table stu4 (
  id int unsigned auto_increment primary key COMMENT '自增id',
  name varchar(20) not null comment '学生名字',
  school varchar(20) not null comment '学校名字',
  nickname varchar(20) not null comment '学生小名',
  age int not null comment '学生年龄',
  score decimal(4,2) not null comment '成绩',
  class_num int not null comment '班级人数',
  phone bigint not null comment '电话号码',
  email varchar(64) comment '家庭网络邮箱',
  ip varchar(32) comment 'IP地址'
  ) engine=InnoDB default charset=utf8;

新建meta.txt文件,文件内容为:

id||int||自增id[:inc(id,1)]
name||varchar(20)||学生名字
school||varchar(20)||学校名字[:enum(qinghua,beida,shanghaijiaoda,fudan,xidian,zhongda)]
nickname||varchar(20)||学生小名[:enum(tom,tony,mick,rich,jasper)]
age||int||学生年龄[:age]
score||decimal(4,2)||成绩[:decimal(4,2,1)]
class_num||int||班级人数[:int(10, 100)]
phone||bigint||电话号码[:phone_number]
email||varchar(64)||家庭网络邮箱[:email]
ip||varchar(32)||IP地址[:ipv4]

生成10000条数据并写入到mysql中的test.stu4表

datafaker rdb mysql+mysqldb://root:Pass-123-root@hadoop:3306/test?charset=utf8 stu4 100000 --meta meta.txt 

datafaker工具有详细使用方法,请参考。

导入mysql数据

使用flink sql client进行如下操作

构建源表

create table stu4(
  id bigint not null,
  name string,
  school string,
  nickname string,
  age int not null,
  score decimal(4,2) not null,
  class_num int not null,
  phone bigint not null,
  email string,
  ip string,
  PRIMARY KEY (id) NOT ENFORCED
) with (
  'connector' = 'jdbc',
  'url' = 'jdbc:mysql://hadoop:3306/test?serverTimezone=GMT%2B8',
  'username' = 'root',
  'password' = 'Pass-123-root',
  'table-name' = 'stu4'
);

构建目标表

 create table stu4_tmp_1(
  id bigint not null,
  name string,
  `school` string,
  nickname string,
  age int not null,
 score decimal(4,2) not null,
  class_num int not null,
  phone bigint not null,
  email string,
  ip string,
  primary key (id) not enforced
)
 partitioned by (`school`)
 with (
  'connector' = 'hudi',
  'path' = 'hdfs://hadoop:9000/tmp/stu4_tmp_1',
  'table.type' = 'COPY_ON_WRITE',
  'write.precombine.field' = 'school',
  'hive_sync.enable' = 'true',
  'hive_sync.mode' = 'hms',
  'hive_sync.metastore.uris' = 'thrift://hadoop:9083',
  'hive_sync.jdbc_url' = 'jdbc:hive2://hadoop:10000',
  'hive_sync.table' = 'stu4_tmp_1',
  'hive_sync.db' = 'test',
  'hive_sync.username' = 'hive',
  'hive_sync.password' = 'hive'
  );

插入数据

insert into stu4_tmp_1 select * from stu4;

hive数据查询

使用hive命令进入hive cli

执行如下命令查询数据

select * from test.stu4_tmp_1 limit 10;

结果:

了解更多

相关实践学习
如何在云端创建MySQL数据库
开始实验后,系统会自动创建一台自建MySQL的 源数据库 ECS 实例和一台 目标数据库 RDS。
全面了解阿里云能为你做什么
阿里云在全球各地部署高效节能的绿色数据中心,利用清洁计算为万物互联的新世界提供源源不断的能源动力,目前开服的区域包括中国(华北、华东、华南、香港)、新加坡、美国(美东、美西)、欧洲、中东、澳大利亚、日本。目前阿里云的产品涵盖弹性计算、数据库、存储与CDN、分析与搜索、云通信、网络、管理与监控、应用服务、互联网中间件、移动服务、视频服务等。通过本课程,来了解阿里云能够为你的业务带来哪些帮助 &nbsp; &nbsp; 相关的阿里云产品:云服务器ECS 云服务器 ECS(Elastic Compute Service)是一种弹性可伸缩的计算服务,助您降低 IT 成本,提升运维效率,使您更专注于核心业务创新。产品详情: https://www.aliyun.com/product/ecs
相关文章
|
27天前
|
存储 监控 数据处理
flink 向doris 数据库写入数据时出现背压如何排查?
本文介绍了如何确定和解决Flink任务向Doris数据库写入数据时遇到的背压问题。首先通过Flink Web UI和性能指标监控识别背压,然后从Doris数据库性能、网络连接稳定性、Flink任务数据处理逻辑及资源配置等方面排查原因,并通过分析相关日志进一步定位问题。
157 61
|
8天前
|
存储 关系型数据库 MySQL
mysql怎么查询longblob类型数据的大小
通过本文的介绍,希望您能深入理解如何查询MySQL中 `LONG BLOB`类型数据的大小,并结合优化技术提升查询性能,以满足实际业务需求。
37 6
|
19天前
|
SQL 关系型数据库 MySQL
mysql分页读取数据重复问题
在服务端开发中,与MySQL数据库进行数据交互时,常因数据量大、网络延迟等因素需分页读取数据。文章介绍了使用`limit`和`offset`参数实现分页的方法,并针对分页过程中可能出现的数据重复问题进行了详细分析,提出了利用时间戳或确保排序规则绝对性等解决方案。
|
25天前
|
关系型数据库 MySQL 数据库
GBase 数据库如何像MYSQL一样存放多行数据
GBase 数据库如何像MYSQL一样存放多行数据
|
1月前
|
缓存 NoSQL 关系型数据库
Redis和Mysql如何保证数据⼀致?
在项目中,为了解决Redis与Mysql的数据一致性问题,我们采用了多种策略:对于低一致性要求的数据,不做特别处理;时效性数据通过设置缓存过期时间来减少不一致风险;高一致性但时效性要求不高的数据,利用MQ异步同步确保最终一致性;而对一致性和时效性都有高要求的数据,则采用分布式事务(如Seata TCC模式)来保障。
64 14
|
3月前
|
运维 数据处理 数据安全/隐私保护
阿里云实时计算Flink版测评报告
该测评报告详细介绍了阿里云实时计算Flink版在用户行为分析与标签画像中的应用实践,展示了其毫秒级的数据处理能力和高效的开发流程。报告还全面评测了该服务在稳定性、性能、开发运维及安全性方面的卓越表现,并对比自建Flink集群的优势。最后,报告评估了其成本效益,强调了其灵活扩展性和高投资回报率,适合各类实时数据处理需求。
|
1月前
|
存储 分布式计算 流计算
实时计算 Flash – 兼容 Flink 的新一代向量化流计算引擎
本文介绍了阿里云开源大数据团队在实时计算领域的最新成果——向量化流计算引擎Flash。文章主要内容包括:Apache Flink 成为业界流计算标准、Flash 核心技术解读、性能测试数据以及在阿里巴巴集团的落地效果。Flash 是一款完全兼容 Apache Flink 的新一代流计算引擎,通过向量化技术和 C++ 实现,大幅提升了性能和成本效益。
1163 73
实时计算 Flash – 兼容 Flink 的新一代向量化流计算引擎
zdl
|
1月前
|
消息中间件 运维 大数据
大数据实时计算产品的对比测评:实时计算Flink版 VS 自建Flink集群
本文介绍了实时计算Flink版与自建Flink集群的对比,涵盖部署成本、性能表现、易用性和企业级能力等方面。实时计算Flink版作为全托管服务,显著降低了运维成本,提供了强大的集成能力和弹性扩展,特别适合中小型团队和业务波动大的场景。文中还提出了改进建议,并探讨了与其他产品的联动可能性。总结指出,实时计算Flink版在简化运维、降低成本和提升易用性方面表现出色,是大数据实时计算的优选方案。
zdl
155 56
|
5月前
|
存储 监控 大数据
阿里云实时计算Flink在多行业的应用和实践
本文整理自 Flink Forward Asia 2023 中闭门会的分享。主要分享实时计算在各行业的应用实践,对回归实时计算的重点场景进行介绍以及企业如何使用实时计算技术,并且提供一些在技术架构上的参考建议。
862 7
阿里云实时计算Flink在多行业的应用和实践
|
4月前
|
SQL 消息中间件 Kafka
实时计算 Flink版产品使用问题之如何在EMR-Flink的Flink SOL中针对source表单独设置并行度
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。

推荐镜像

更多
下一篇
DataWorks