使用Sqoop导出Mysql数据到Hive(实战案例)

本文涉及的产品
云数据库 RDS MySQL,集群系列 2核4GB
推荐场景:
搭建个人博客
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
RDS MySQL Serverless 高可用系列,价值2615元额度,1个月
简介: 使用Sqoop导出Mysql数据到Hive(实战案例)

0x00 教程内容


  1. SQL文件准备
  2. 导出Mysql数据到Hive

前提:

  1. 安装好了Hive
  2. 安装好了Sqoop


0x01 SQL文件准备


1. 准备sql脚本

a. 准备建表与添加数据sql文件


image.png


2. 执行sql脚本

source /home/hadoop/scripts/sql/init_exec.sql

PS:文件多的话,可能执行起来会有点慢。


0x02 导出Mysql数据到Hive


1. 导出数据到HDFS

a. 将Mysql中带有电影种类信息的电影数据导入到HDFS中

sqoop import --connect jdbc:mysql://hadoop01:3306/movie \
--username root --password root \
--query 'SELECT movie.*, group_concat(genre.name) FROM movie
JOIN movie_genre ON (movie.id = movie_genre.movie_id)
JOIN genre ON (movie_genre.genre_id = genre.id)
WHERE $CONDITIONS
GROUP BY movie.id' \
--split-by movie.id \
--fields-terminated-by "\t" \
--target-dir /user/hadoop/movielens/movie \
--delete-target-dir


执行后,/user/hadoop/movielens/movie目录下就有数据了。

hadoop fs -ls /user/hadoop/movielens/movie


image.png


2. 构建Hive表关联HDFS(movie表)

a. 创建一个movielens数据库:

CREATE DATABASE IF NOT EXISTS movielens;

b. 创建一张临时存放数据的表:


CREATE EXTERNAL TABLE IF NOT EXISTS movielens.movie_temp(
  id INT, 
  title STRING,
  release_date STRING,
  video_release_date STRING,
  imdb_url STRING, 
  genres ARRAY<STRING>
)
ROW FORMAT DELIMITED
      FIELDS TERMINATED BY '\t'
      COLLECTION ITEMS TERMINATED BY ','
LOCATION '/user/hadoop/movielens/movie';


建完表,movie_temp表就已经有数据了!

select count(*) from movie_temp;


image.png


c. 创建一张真实存放数据的表(将临时表的数据导入到这里来)


CREATE EXTERNAL TABLE IF NOT EXISTS movielens.movie(
  id INT, 
  title STRING,
  release_date STRING,
  video_release_date STRING,
  imdb_url STRING, 
  genres ARRAY<STRING>)
STORED AS PARQUET;


d. 将临时表的数据导入到这里来

INSERT OVERWRITE TABLE movielens.movie SELECT * FROM movielens.movie_temp;

查看movie表有没有数据:

select count(*) from movie;


image.png


e. 最后删除临时表

DROP TABLE movielens.movie_temp;


3. 构建Hive表关联HDFS(user_rating表)

a. 启动Sqoop的metastore服务:

sqoop metastore &

b. 创建一个sqoop job

前提:创建之前确保作业已删除,否则请先删除:

sqoop job --delete user_rating_import --meta-connect jdbc:hsqldb:hsql://hadoop01:16000/sqoop


## 增量的将user_rating的数据从mysql中导入到hdfs中
sqoop job --create user_rating_import \
--meta-connect jdbc:hsqldb:hsql://hadoop01:16000/sqoop \
-- import --connect jdbc:mysql://hadoop01:3306/movie \
--username root --password root \
--table user_rating -m 5 --split-by dt_time \
--incremental append --check-column dt_time \
--fields-terminated-by "\t" \
--target-dir /user/hadoop/movielens/user_rating_stage


报错:

image.png


解决:

拷贝java-json.jar包放于$SQOOP_HOME/lib路径下:

image.png


c. 执行user_rating_import任务

前提:确保路径下没内容,如果有要先删除。

hadoop fs -rm -r /user/hadoop/movielens/user_rating_stage

执行任务:

sqoop job --exec user_rating_import --meta-connect jdbc:hsqldb:hsql://hadoop01:16000/sqoop

输入Mysql的密码,即可开始同步数据。

查看数据:

hadoop fs -ls /user/hadoop/movielens/user_rating_stage


image.png

d. 创建一张临时存放数据的表:


DROP TABLE IF EXISTS user_rating_stage;
CREATE EXTERNAL TABLE user_rating_stage (
  id INT,
  user_id INT,
  movie_id INT,
  rating INT,
  dt_time STRING
)
ROW FORMAT DELIMITED
       FIELDS TERMINATED BY '\t'
LOCATION '/user/hadoop/movielens/user_rating_stage';


select count(*) from user_rating_stage;

image.png


e. 创建一张以后会含有重复评分的user_rating表

CREATE EXTERNAL TABLE IF NOT EXISTS user_rating (
  id INT,
  user_id INT,
  movie_id INT,
  rating INT,
  dt_time STRING
)
PARTITIONED BY (year INT, month INT, day INT)
STORED AS AVRO;


f. 挑选user_rating_stage的相应字段插入到user_rating表:

SET hive.exec.dynamic.partition.mode=nonstrict;
INSERT INTO TABLE user_rating PARTITION(year, month, day)
SELECT id, user_id, movie_id, rating, dt_time, year(dt_time) as year, month(dt_time) as month, day(dt_time) as day
FROM user_rating_stage;


查看一下是否有数据:

select count(*) from user_rating;

image.png


g. 创建评分事实表(没有重复数据的)

CREATE EXTERNAL TABLE IF NOT EXISTS user_rating_fact (
  id INT,
  user_id INT,
  movie_id INT,
  rating INT,
  dt_time STRING
)
PARTITIONED BY (year INT, month INT, day INT)
STORED AS PARQUET;


将user_rating表进行去重操作:

WITH t1 as (
  SELECT 
    id, 
    user_id, 
    movie_id, 
    rating, 
    dt_time, 
    year,
    month,
    day,
    ROW_NUMBER() OVER (PARTITION BY user_id, movie_id ORDER BY dt_time DESC) as rn
  FROM user_rating
)
INSERT OVERWRITE TABLE user_rating_fact PARTITION(year, month, day)
SELECT id, user_id, movie_id, rating, dt_time, year, month, day FROM t1 WHERE rn = 1;


查看数据,因为表数据没改变,所以都是10w条:

select count(*) from user_rating_fact;

image.png


3.1 模拟增量添加数据操作

a. 打开一个终端进入Mysql,插入数据:

insert into user_rating values(0, 196, 242, 5, STR_TO_DATE("1970-01-12 12:47:30", "%Y-%m-%d %T"));
insert into user_rating values(1, 186, 302, 4, STR_TO_DATE("1970-01-12 15:41:57", "%Y-%m-%d %T"));
insert into user_rating values(2, 22, 377, 3, STR_TO_DATE("1970-01-12 12:08:07", "%Y-%m-%d %T"));
insert into user_rating values(3, 244, 51, 1, STR_TO_DATE("1970-01-12 12:36:46", "%Y-%m-%d %T"));
insert into user_rating values(4, 166, 346, 1, STR_TO_DATE("1970-01-12 14:13:17", "%Y-%m-%d %T"));
insert into user_rating values(5, 298, 474, 4, STR_TO_DATE("1970-01-12 13:36:22", "%Y-%m-%d %T"));
insert into user_rating values(6, 115, 265, 2, STR_TO_DATE("1970-01-12 12:46:11", "%Y-%m-%d %T"));
insert into user_rating values(7, 253, 465, 5, STR_TO_DATE("1970-01-12 15:40:28", "%Y-%m-%d %T"));
insert into user_rating values(8, 305, 451, 3, STR_TO_DATE("1970-01-12 14:12:04", "%Y-%m-%d %T"));


b. 重新执行user_rating_import任务

要先删除user_rating_stage路径下的内容:

hadoop fs -rm -r /user/hadoop/movielens/user_rating_stage

重新执行任务:

sqoop job --exec user_rating_import --meta-connect jdbc:hsqldb:hsql://hadoop01:16000/sqoop

输入Mysql的密码,即可开始同步数据。

查看数据:

hadoop fs -cat /user/hadoop/movielens/user_rating_stage/*


image.png


只有我们插入的那几条数据!

c. 将新增的数据插入到评分记录表中:

INSERT INTO TABLE user_rating PARTITION(year, month, day)
SELECT id, user_id, movie_id, rating, dt_time, year(dt_time) as year, month(dt_time) as month, day(dt_time) as day
FROM user_rating_stage;


此时评分记录表含有100000+9=1000009条记录

select count(*) from user_rating;

image.png


d. 评分记录表重新操作可以得到评分的实时表:

操作之前看一下我们Mysql的记录,其实我们是已经插入了另外一条了,等一下我们要进行去重操作,最后才会得到我们的评分事实表(user_rating_fact):


image.png


e. 去重操作:

WITH t1 as (
  SELECT 
    id, 
    user_id, 
    movie_id, 
    rating, 
    dt_time, 
    year,
    month,
    day,
    ROW_NUMBER() OVER (PARTITION BY user_id, movie_id ORDER BY dt_time DESC) as rn
  FROM user_rating
)
INSERT OVERWRITE TABLE user_rating_fact PARTITION(year, month, day)
SELECT id, user_id, movie_id, rating, dt_time, year, month, day FROM t1 WHERE rn = 1;


select count(*) from user_rating_fact;

select * from user_rating_fact where id = 1;

image.png


发现,我们id为1的这条记录的评分,是拿了最新的那个时间的,即评分为4的。


4. 构建Hive表关联HDFS(users表)

a. 前提已启动Sqoop的metastore服务,否则:

sqoop metastore &

b. 创建一个sqoop job

前提:创建之前确保作业已删除,否则请先删除:

sqoop job --delete user_import --meta-connect jdbc:hsqldb:hsql://hadoop01:16000/sqoop


sqoop job --create user_import \
--meta-connect jdbc:hsqldb:hsql://hadoop01:16000/sqoop \
-- import --connect jdbc:mysql://hadoop01:3306/movie \
--username root --password root \
-m 5 --split-by last_modified \
--incremental lastmodified --check-column last_modified \
--query 'SELECT user.id, user.gender, user.zip_code, user.age, occupation.occupation, user.last_modified
FROM user JOIN occupation ON user.occupation_id = occupation.id WHERE $CONDITIONS' \
--fields-terminated-by "\t" \
--target-dir /user/hadoop/movielens/user_stage 


c. 执行user_import任务

前提:确保路径下没内容,如果有要先删除。

hadoop fs -rm -r /user/hadoop/movielens/user_stage

执行任务:

sqoop job --exec user_import --meta-connect jdbc:hsqldb:hsql://hadoop01:16000/sqoop

输入Mysql的密码,即可开始同步数据。

查看数据:

hadoop fs -ls /user/hadoop/movielens/user_stage


image.png


d. 创建一张临时用户表user_stage:

DROP TABLE IF EXISTS user_stage;
CREATE EXTERNAL TABLE user_stage (
    id INT,
    gender STRING,
    zip_code STRING,
    age INT,
    occupation STRING,
    last_modified STRING
)
ROW FORMAT DELIMITED
       FIELDS TERMINATED BY '\t'
LOCATION '/user/hadoop/movielens/user_stage';


e. 创建用户历史表user_rating_history:

CREATE EXTERNAL TABLE IF NOT EXISTS user_history (
    id INT,
    gender STRING,
    zip_code STRING,
    age INT,
    occupation STRING,
    last_modified STRING
)
PARTITIONED BY (year INT, month INT, day INT)
STORED AS AVRO;


f. 将临时表的数据先插入到user_history表:

INSERT INTO TABLE user_history PARTITION(year, month, day)
SELECT *, year(last_modified) as year, month(last_modified) as month, day(last_modified) as day FROM user_stage;


执行完,我们的user_history表的数据就是Mysql未改变前的数据。

g. 创建一张用户表users(不要用user,因为可能会跟内置的用户冲突):


CREATE EXTERNAL TABLE IF NOT EXISTS users (
    id INT,
    gender STRING,
    zip_code STRING,
    age INT,
    occupation STRING,
    last_modified STRING
)
STORED AS PARQUET;


合并修改更新后的user(左连接:排除users表中修改过的数据,等下直接拿user_stage倒进来就可以了)

INSERT OVERWRITE TABLE users 
SELECT users.id, users.gender, users.zip_code, users.age, users.occupation, users.last_modified 
  FROM users LEFT JOIN user_stage ON users.id = user_stage.id WHERE user_stage.id IS NULL;


INSERT INTO TABLE users SELECT id, gender, zip_code, age, occupation, last_modified FROM user_stage;

第一次没有修改,也没有添加,所以users表数据与user_stage一样。


4.1 模拟修改与增量数据操作

a. 更新Mysql的数据:


update user set gender='F', last_modified=now() where id=1;
insert into user values(944, 'F', "345642", 20, 19, now());


b. 同步Mysql数据到HDFS:

hadoop fs -rm -r /user/hadoop/movielens/user_stage

sqoop job --exec user_import --meta-connect jdbc:hsqldb:hsql://hadoop01:16000/sqoop

输入Mysql的密码即可同步!

hadoop fs -cat /user/hadoop/movielens/user_stage/*


image.png


c. 插入数据到user_history:

INSERT INTO TABLE user_history PARTITION(year, month, day) SELECT *, year(last_modified) as year, month(last_modified) as month, day(last_modified) as day FROM user_stage;


d. 合并修改更新后的user

INSERT OVERWRITE TABLE users 
SELECT users.id, users.gender, users.zip_code, users.age, users.occupation, users.last_modified 
  FROM users LEFT JOIN user_stage ON users.id = user_stage.id WHERE user_stage.id IS NULL;


image.png


原本943条条记录,进过上面命令,排除了1条修改过的记录,还有943-1=942条。

e. 插入临时表里面的数据:

INSERT INTO TABLE users SELECT id, gender, zip_code, age, occupation, last_modified FROM user_stage;


image.png


942+2=944条记录。


0xFF 总结


  1. 过程繁琐,写教程难度大,写了好几个钟,辛苦了。
  2. 本教程的特色是加上我们的增量、已经修改增量,比较贴切企业开发每天的数据。
相关实践学习
如何在云端创建MySQL数据库
开始实验后,系统会自动创建一台自建MySQL的 源数据库 ECS 实例和一台 目标数据库 RDS。
全面了解阿里云能为你做什么
阿里云在全球各地部署高效节能的绿色数据中心,利用清洁计算为万物互联的新世界提供源源不断的能源动力,目前开服的区域包括中国(华北、华东、华南、香港)、新加坡、美国(美东、美西)、欧洲、中东、澳大利亚、日本。目前阿里云的产品涵盖弹性计算、数据库、存储与CDN、分析与搜索、云通信、网络、管理与监控、应用服务、互联网中间件、移动服务、视频服务等。通过本课程,来了解阿里云能够为你的业务带来哪些帮助 &nbsp; &nbsp; 相关的阿里云产品:云服务器ECS 云服务器 ECS(Elastic Compute Service)是一种弹性可伸缩的计算服务,助您降低 IT 成本,提升运维效率,使您更专注于核心业务创新。产品详情: https://www.aliyun.com/product/ecs
相关文章
|
16天前
|
监控 关系型数据库 MySQL
数据库优化:MySQL索引策略与查询性能调优实战
【10月更文挑战第27天】本文深入探讨了MySQL的索引策略和查询性能调优技巧。通过介绍B-Tree索引、哈希索引和全文索引等不同类型,以及如何创建和维护索引,结合实战案例分析查询执行计划,帮助读者掌握提升查询性能的方法。定期优化索引和调整查询语句是提高数据库性能的关键。
82 1
|
21天前
|
NoSQL 关系型数据库 MySQL
MySQL与Redis协同作战:优化百万数据查询的实战经验
【10月更文挑战第13天】 在处理大规模数据集时,传统的关系型数据库如MySQL可能会遇到性能瓶颈。为了提升数据处理的效率,我们可以结合使用MySQL和Redis,利用两者的优势来优化数据查询。本文将分享一次实战经验,探讨如何通过MySQL与Redis的协同工作来优化百万级数据统计。
48 5
|
1月前
|
架构师 关系型数据库 MySQL
MySQL最左前缀优化原则:深入解析与实战应用
【10月更文挑战第12天】在数据库架构设计与优化中,索引的使用是提升查询性能的关键手段之一。其中,MySQL的最左前缀优化原则(Leftmost Prefix Principle)是复合索引(Composite Index)应用中的核心策略。作为资深架构师,深入理解并掌握这一原则,对于平衡数据库性能与维护成本至关重要。本文将详细解读最左前缀优化原则的功能特点、业务场景、优缺点、底层原理,并通过Java示例展示其实现方式。
69 1
|
1月前
|
SQL 分布式计算 关系型数据库
Hadoop-21 Sqoop 数据迁移工具 简介与环境配置 云服务器 ETL工具 MySQL与Hive数据互相迁移 导入导出
Hadoop-21 Sqoop 数据迁移工具 简介与环境配置 云服务器 ETL工具 MySQL与Hive数据互相迁移 导入导出
50 3
|
16天前
|
监控 关系型数据库 MySQL
数据库优化:MySQL索引策略与查询性能调优实战
【10月更文挑战第26天】数据库作为现代应用系统的核心组件,其性能优化至关重要。本文主要探讨MySQL的索引策略与查询性能调优。通过合理创建索引(如B-Tree、复合索引)和优化查询语句(如使用EXPLAIN、优化分页查询),可以显著提升数据库的响应速度和稳定性。实践中还需定期审查慢查询日志,持续优化性能。
47 0
|
1月前
|
XML 关系型数据库 MySQL
MySQL 导出某些数据的技术详解
MySQL 导出某些数据的技术详解
127 2
|
1月前
|
SQL 分布式计算 关系型数据库
Hadoop-24 Sqoop迁移 MySQL到Hive 与 Hive到MySQL SQL生成数据 HDFS集群 Sqoop import jdbc ETL MapReduce
Hadoop-24 Sqoop迁移 MySQL到Hive 与 Hive到MySQL SQL生成数据 HDFS集群 Sqoop import jdbc ETL MapReduce
84 0
|
6月前
|
SQL 分布式计算 监控
Sqoop数据迁移工具使用与优化技巧:面试经验与必备知识点解析
【4月更文挑战第9天】本文深入解析Sqoop的使用、优化及面试策略。内容涵盖Sqoop基础,包括安装配置、命令行操作、与Hadoop生态集成和连接器配置。讨论数据迁移优化技巧,如数据切分、压缩编码、转换过滤及性能监控。此外,还涉及面试中对Sqoop与其他ETL工具的对比、实际项目挑战及未来发展趋势的讨论。通过代码示例展示了从MySQL到HDFS的数据迁移。本文旨在帮助读者在面试中展现Sqoop技术实力。
467 2
|
数据采集 SQL 分布式计算
数据处理 、大数据、数据抽取 ETL 工具 DataX 、Kettle、Sqoop
数据处理 、大数据、数据抽取 ETL 工具 DataX 、Kettle、Sqoop
1416 0
|
1月前
|
SQL 分布式计算 关系型数据库
Hadoop-23 Sqoop 数据MySQL到HDFS(部分) SQL生成数据 HDFS集群 Sqoop import jdbc ETL MapReduce
Hadoop-23 Sqoop 数据MySQL到HDFS(部分) SQL生成数据 HDFS集群 Sqoop import jdbc ETL MapReduce
37 0