使用flink SQL Client将mysql数据写入到hudi并同步到hive

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
云数据库 RDS MySQL,集群版 2核4GB 100GB
推荐场景:
搭建个人博客
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
简介: 本文介绍如何使用flink SQL Client将mysql数据写入到hudi并同步到hive

测试环境

组件版本

  • mysql 5.7
  • hive 3.1.2
  • flink 1.12.2
  • hudi 0.9.0
  • hadoop 3.2.0

首先请确保以下组件正常启动:

  • mysql
  • hivemetastore
  • hiveserver2
  • hdfs
  • yarn

hudi适配hive 3.1.2源码编译

0.9.0版本的hudi在适配hive3时,其hudi/package/hudi-flink-bundle/pom.xml文件使用的flink-connector-hive版本有问题,所以需要修改pom文件。

修改点一:

143行,修改为:

<include>org.apache.flink:flink-sql-connector-hive-${hive.version}_${scala.binary.version}</include>

642行,修改为:

<artifactId>flink-sql-connector-hive-${hive.version}_${scala.binary.version}</artifactId>

编译命令:

mvn clean install -DskipTests -Pflink-bundle-shade-hive3 -Dhadoop.version=3.2.0 -Dhive.version=3.1.2 -Pinclude-flink-sql-connector-hive -U -Dscala.version=2.12.10 -Dscala.binary.version=2.12

将编译后得到的hudi/package/hudi-flink-bundle/target/hudi-flink-bundle_2.12-0.9.0.jar拷贝到flink/lib目录下,将得到的hudi/package/hudi-hadoop-mr-bundle/target/hudi-hadoop-mr-bundle-0.9.0.jar拷贝到hive/auxlib目录下,如果没有这个目录则新建一个即可。

关于flink操作hudi的相关方法如果有疑惑的可先看本系列的其他文章,例如使用flink插入数据到hudi数据湖初探Flink SQL Client实战CDC数据入湖等。

生成测试数据

使用datafaker生成100000条数据,放到mysql数据库中的stu4表。

datafaker工具使用方法见datafaker --- 测试数据生成工具

首先在mysql中新建表test.stu4

create database test;
use test;
create table stu4 (
  id int unsigned auto_increment primary key COMMENT '自增id',
  name varchar(20) not null comment '学生名字',
  school varchar(20) not null comment '学校名字',
  nickname varchar(20) not null comment '学生小名',
  age int not null comment '学生年龄',
  score decimal(4,2) not null comment '成绩',
  class_num int not null comment '班级人数',
  phone bigint not null comment '电话号码',
  email varchar(64) comment '家庭网络邮箱',
  ip varchar(32) comment 'IP地址'
  ) engine=InnoDB default charset=utf8;

新建meta.txt文件,文件内容为:

id||int||自增id[:inc(id,1)]
name||varchar(20)||学生名字
school||varchar(20)||学校名字[:enum(qinghua,beida,shanghaijiaoda,fudan,xidian,zhongda)]
nickname||varchar(20)||学生小名[:enum(tom,tony,mick,rich,jasper)]
age||int||学生年龄[:age]
score||decimal(4,2)||成绩[:decimal(4,2,1)]
class_num||int||班级人数[:int(10, 100)]
phone||bigint||电话号码[:phone_number]
email||varchar(64)||家庭网络邮箱[:email]
ip||varchar(32)||IP地址[:ipv4]

生成10000条数据并写入到mysql中的test.stu4表

datafaker rdb mysql+mysqldb://root:Pass-123-root@hadoop:3306/test?charset=utf8 stu4 100000 --meta meta.txt 

datafaker工具有详细使用方法,请参考。

导入mysql数据

使用flink sql client进行如下操作

构建源表

create table stu4(
  id bigint not null,
  name string,
  school string,
  nickname string,
  age int not null,
  score decimal(4,2) not null,
  class_num int not null,
  phone bigint not null,
  email string,
  ip string,
  PRIMARY KEY (id) NOT ENFORCED
) with (
  'connector' = 'jdbc',
  'url' = 'jdbc:mysql://hadoop:3306/test?serverTimezone=GMT%2B8',
  'username' = 'root',
  'password' = 'Pass-123-root',
  'table-name' = 'stu4'
);

构建目标表

 create table stu4_tmp_1(
  id bigint not null,
  name string,
  `school` string,
  nickname string,
  age int not null,
 score decimal(4,2) not null,
  class_num int not null,
  phone bigint not null,
  email string,
  ip string,
  primary key (id) not enforced
)
 partitioned by (`school`)
 with (
  'connector' = 'hudi',
  'path' = 'hdfs://hadoop:9000/tmp/stu4_tmp_1',
  'table.type' = 'COPY_ON_WRITE',
  'write.precombine.field' = 'school',
  'hive_sync.enable' = 'true',
  'hive_sync.mode' = 'hms',
  'hive_sync.metastore.uris' = 'thrift://hadoop:9083',
  'hive_sync.jdbc_url' = 'jdbc:hive2://hadoop:10000',
  'hive_sync.table' = 'stu4_tmp_1',
  'hive_sync.db' = 'test',
  'hive_sync.username' = 'hive',
  'hive_sync.password' = 'hive'
  );

插入数据

insert into stu4_tmp_1 select * from stu4;

hive数据查询

使用hive命令进入hive cli

执行如下命令查询数据

select * from test.stu4_tmp_1 limit 10;

结果:

了解更多

相关实践学习
如何在云端创建MySQL数据库
开始实验后,系统会自动创建一台自建MySQL的 源数据库 ECS 实例和一台 目标数据库 RDS。
全面了解阿里云能为你做什么
阿里云在全球各地部署高效节能的绿色数据中心,利用清洁计算为万物互联的新世界提供源源不断的能源动力,目前开服的区域包括中国(华北、华东、华南、香港)、新加坡、美国(美东、美西)、欧洲、中东、澳大利亚、日本。目前阿里云的产品涵盖弹性计算、数据库、存储与CDN、分析与搜索、云通信、网络、管理与监控、应用服务、互联网中间件、移动服务、视频服务等。通过本课程,来了解阿里云能够为你的业务带来哪些帮助 &nbsp; &nbsp; 相关的阿里云产品:云服务器ECS 云服务器 ECS(Elastic Compute Service)是一种弹性可伸缩的计算服务,助您降低 IT 成本,提升运维效率,使您更专注于核心业务创新。产品详情: https://www.aliyun.com/product/ecs
相关文章
|
11天前
|
SQL 物联网 数据处理
"颠覆传统,Hive SQL与Flink激情碰撞!解锁流批一体数据处理新纪元,让数据决策力瞬间爆表,你准备好了吗?"
【8月更文挑战第9天】数据时代,实时性和准确性至关重要。传统上,批处理与流处理各司其职,但Apache Flink打破了这一界限,尤其Flink与Hive SQL的结合,开创了流批一体的数据处理新时代。这不仅简化了数据处理流程,还极大提升了效率和灵活性。例如,通过Flink SQL,可以轻松实现流数据与批数据的融合分析,无需在两者间切换。这种融合不仅降低了技术门槛,还为企业提供了更强大的数据支持,无论是在金融、电商还是物联网领域,都将发挥巨大作用。
31 6
|
1月前
|
SQL 关系型数据库 MySQL
实时计算 Flink版操作报错合集之从mysql读数据写到hive报错,是什么原因
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
|
1月前
|
SQL 分布式计算 关系型数据库
Hadoop-12-Hive 基本介绍 下载安装配置 MariaDB安装 3台云服务Hadoop集群 架构图 对比SQL HQL
Hadoop-12-Hive 基本介绍 下载安装配置 MariaDB安装 3台云服务Hadoop集群 架构图 对比SQL HQL
36 2
|
1月前
|
SQL 关系型数据库 MySQL
实时计算 Flink版产品使用问题之如何使用Flink SQL连接带有Kerberos认证的Hive
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
2月前
|
SQL 分布式计算 HIVE
实时计算 Flink版产品使用问题之同步到Hudi的数据是否可以被Hive或Spark直接读取
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
1月前
|
关系型数据库 MySQL 数据安全/隐私保护
Navicat连接mysql8报错解决:1251- Client does not support authentication protocol requested by server
Navicat连接mysql8报错解决:1251- Client does not support authentication protocol requested by server
57 0
|
2月前
|
消息中间件 存储 SQL
实时计算 Flink版产品使用问题之kafka2hive同步数据时,如何回溯历史数据
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
6天前
|
存储 关系型数据库 MySQL
MySQL——数据库备份上传到阿里云OSS存储
MySQL——数据库备份上传到阿里云OSS存储
20 0
|
1天前
|
SQL 存储 关系型数据库
数据库-MySQL-01(一)
数据库-MySQL-01(一)
12 4
|
6天前
|
缓存 NoSQL Redis
一天五道Java面试题----第九天(简述MySQL中索引类型对数据库的性能的影响--------->缓存雪崩、缓存穿透、缓存击穿)
这篇文章是关于Java面试中可能会遇到的五个问题,包括MySQL索引类型及其对数据库性能的影响、Redis的RDB和AOF持久化机制、Redis的过期键删除策略、Redis的单线程模型为何高效,以及缓存雪崩、缓存穿透和缓存击穿的概念及其解决方案。