使用Sqoop导出Mysql数据到Hive(实战案例)

本文涉及的产品
RDS AI 助手,专业版
RDS DuckDB + QuickBI 企业套餐,8核32GB + QuickBI 专业版
RDS MySQL DuckDB 分析主实例,基础系列 4核8GB
简介: 使用Sqoop导出Mysql数据到Hive(实战案例)

0x00 教程内容


  1. SQL文件准备
  2. 导出Mysql数据到Hive

前提:

  1. 安装好了Hive
  2. 安装好了Sqoop


0x01 SQL文件准备


1. 准备sql脚本

a. 准备建表与添加数据sql文件


image.png


2. 执行sql脚本

source /home/hadoop/scripts/sql/init_exec.sql

PS:文件多的话,可能执行起来会有点慢。


0x02 导出Mysql数据到Hive


1. 导出数据到HDFS

a. 将Mysql中带有电影种类信息的电影数据导入到HDFS中

sqoop import --connect jdbc:mysql://hadoop01:3306/movie \
--username root --password root \
--query 'SELECT movie.*, group_concat(genre.name) FROM movie
JOIN movie_genre ON (movie.id = movie_genre.movie_id)
JOIN genre ON (movie_genre.genre_id = genre.id)
WHERE $CONDITIONS
GROUP BY movie.id' \
--split-by movie.id \
--fields-terminated-by "\t" \
--target-dir /user/hadoop/movielens/movie \
--delete-target-dir


执行后,/user/hadoop/movielens/movie目录下就有数据了。

hadoop fs -ls /user/hadoop/movielens/movie


image.png


2. 构建Hive表关联HDFS(movie表)

a. 创建一个movielens数据库:

CREATE DATABASE IF NOT EXISTS movielens;

b. 创建一张临时存放数据的表:


CREATE EXTERNAL TABLE IF NOT EXISTS movielens.movie_temp(
  id INT, 
  title STRING,
  release_date STRING,
  video_release_date STRING,
  imdb_url STRING, 
  genres ARRAY<STRING>
)
ROW FORMAT DELIMITED
      FIELDS TERMINATED BY '\t'
      COLLECTION ITEMS TERMINATED BY ','
LOCATION '/user/hadoop/movielens/movie';


建完表,movie_temp表就已经有数据了!

select count(*) from movie_temp;


image.png


c. 创建一张真实存放数据的表(将临时表的数据导入到这里来)


CREATE EXTERNAL TABLE IF NOT EXISTS movielens.movie(
  id INT, 
  title STRING,
  release_date STRING,
  video_release_date STRING,
  imdb_url STRING, 
  genres ARRAY<STRING>)
STORED AS PARQUET;


d. 将临时表的数据导入到这里来

INSERT OVERWRITE TABLE movielens.movie SELECT * FROM movielens.movie_temp;

查看movie表有没有数据:

select count(*) from movie;


image.png


e. 最后删除临时表

DROP TABLE movielens.movie_temp;


3. 构建Hive表关联HDFS(user_rating表)

a. 启动Sqoop的metastore服务:

sqoop metastore &

b. 创建一个sqoop job

前提:创建之前确保作业已删除,否则请先删除:

sqoop job --delete user_rating_import --meta-connect jdbc:hsqldb:hsql://hadoop01:16000/sqoop


## 增量的将user_rating的数据从mysql中导入到hdfs中
sqoop job --create user_rating_import \
--meta-connect jdbc:hsqldb:hsql://hadoop01:16000/sqoop \
-- import --connect jdbc:mysql://hadoop01:3306/movie \
--username root --password root \
--table user_rating -m 5 --split-by dt_time \
--incremental append --check-column dt_time \
--fields-terminated-by "\t" \
--target-dir /user/hadoop/movielens/user_rating_stage


报错:

image.png


解决:

拷贝java-json.jar包放于$SQOOP_HOME/lib路径下:

image.png


c. 执行user_rating_import任务

前提:确保路径下没内容,如果有要先删除。

hadoop fs -rm -r /user/hadoop/movielens/user_rating_stage

执行任务:

sqoop job --exec user_rating_import --meta-connect jdbc:hsqldb:hsql://hadoop01:16000/sqoop

输入Mysql的密码,即可开始同步数据。

查看数据:

hadoop fs -ls /user/hadoop/movielens/user_rating_stage


image.png

d. 创建一张临时存放数据的表:


DROP TABLE IF EXISTS user_rating_stage;
CREATE EXTERNAL TABLE user_rating_stage (
  id INT,
  user_id INT,
  movie_id INT,
  rating INT,
  dt_time STRING
)
ROW FORMAT DELIMITED
       FIELDS TERMINATED BY '\t'
LOCATION '/user/hadoop/movielens/user_rating_stage';


select count(*) from user_rating_stage;

image.png


e. 创建一张以后会含有重复评分的user_rating表

CREATE EXTERNAL TABLE IF NOT EXISTS user_rating (
  id INT,
  user_id INT,
  movie_id INT,
  rating INT,
  dt_time STRING
)
PARTITIONED BY (year INT, month INT, day INT)
STORED AS AVRO;


f. 挑选user_rating_stage的相应字段插入到user_rating表:

SET hive.exec.dynamic.partition.mode=nonstrict;
INSERT INTO TABLE user_rating PARTITION(year, month, day)
SELECT id, user_id, movie_id, rating, dt_time, year(dt_time) as year, month(dt_time) as month, day(dt_time) as day
FROM user_rating_stage;


查看一下是否有数据:

select count(*) from user_rating;

image.png


g. 创建评分事实表(没有重复数据的)

CREATE EXTERNAL TABLE IF NOT EXISTS user_rating_fact (
  id INT,
  user_id INT,
  movie_id INT,
  rating INT,
  dt_time STRING
)
PARTITIONED BY (year INT, month INT, day INT)
STORED AS PARQUET;


将user_rating表进行去重操作:

WITH t1 as (
  SELECT 
    id, 
    user_id, 
    movie_id, 
    rating, 
    dt_time, 
    year,
    month,
    day,
    ROW_NUMBER() OVER (PARTITION BY user_id, movie_id ORDER BY dt_time DESC) as rn
  FROM user_rating
)
INSERT OVERWRITE TABLE user_rating_fact PARTITION(year, month, day)
SELECT id, user_id, movie_id, rating, dt_time, year, month, day FROM t1 WHERE rn = 1;


查看数据,因为表数据没改变,所以都是10w条:

select count(*) from user_rating_fact;

image.png


3.1 模拟增量添加数据操作

a. 打开一个终端进入Mysql,插入数据:

insert into user_rating values(0, 196, 242, 5, STR_TO_DATE("1970-01-12 12:47:30", "%Y-%m-%d %T"));
insert into user_rating values(1, 186, 302, 4, STR_TO_DATE("1970-01-12 15:41:57", "%Y-%m-%d %T"));
insert into user_rating values(2, 22, 377, 3, STR_TO_DATE("1970-01-12 12:08:07", "%Y-%m-%d %T"));
insert into user_rating values(3, 244, 51, 1, STR_TO_DATE("1970-01-12 12:36:46", "%Y-%m-%d %T"));
insert into user_rating values(4, 166, 346, 1, STR_TO_DATE("1970-01-12 14:13:17", "%Y-%m-%d %T"));
insert into user_rating values(5, 298, 474, 4, STR_TO_DATE("1970-01-12 13:36:22", "%Y-%m-%d %T"));
insert into user_rating values(6, 115, 265, 2, STR_TO_DATE("1970-01-12 12:46:11", "%Y-%m-%d %T"));
insert into user_rating values(7, 253, 465, 5, STR_TO_DATE("1970-01-12 15:40:28", "%Y-%m-%d %T"));
insert into user_rating values(8, 305, 451, 3, STR_TO_DATE("1970-01-12 14:12:04", "%Y-%m-%d %T"));


b. 重新执行user_rating_import任务

要先删除user_rating_stage路径下的内容:

hadoop fs -rm -r /user/hadoop/movielens/user_rating_stage

重新执行任务:

sqoop job --exec user_rating_import --meta-connect jdbc:hsqldb:hsql://hadoop01:16000/sqoop

输入Mysql的密码,即可开始同步数据。

查看数据:

hadoop fs -cat /user/hadoop/movielens/user_rating_stage/*


image.png


只有我们插入的那几条数据!

c. 将新增的数据插入到评分记录表中:

INSERT INTO TABLE user_rating PARTITION(year, month, day)
SELECT id, user_id, movie_id, rating, dt_time, year(dt_time) as year, month(dt_time) as month, day(dt_time) as day
FROM user_rating_stage;


此时评分记录表含有100000+9=1000009条记录

select count(*) from user_rating;

image.png


d. 评分记录表重新操作可以得到评分的实时表:

操作之前看一下我们Mysql的记录,其实我们是已经插入了另外一条了,等一下我们要进行去重操作,最后才会得到我们的评分事实表(user_rating_fact):


image.png


e. 去重操作:

WITH t1 as (
  SELECT 
    id, 
    user_id, 
    movie_id, 
    rating, 
    dt_time, 
    year,
    month,
    day,
    ROW_NUMBER() OVER (PARTITION BY user_id, movie_id ORDER BY dt_time DESC) as rn
  FROM user_rating
)
INSERT OVERWRITE TABLE user_rating_fact PARTITION(year, month, day)
SELECT id, user_id, movie_id, rating, dt_time, year, month, day FROM t1 WHERE rn = 1;


select count(*) from user_rating_fact;

select * from user_rating_fact where id = 1;

image.png


发现,我们id为1的这条记录的评分,是拿了最新的那个时间的,即评分为4的。


4. 构建Hive表关联HDFS(users表)

a. 前提已启动Sqoop的metastore服务,否则:

sqoop metastore &

b. 创建一个sqoop job

前提:创建之前确保作业已删除,否则请先删除:

sqoop job --delete user_import --meta-connect jdbc:hsqldb:hsql://hadoop01:16000/sqoop


sqoop job --create user_import \
--meta-connect jdbc:hsqldb:hsql://hadoop01:16000/sqoop \
-- import --connect jdbc:mysql://hadoop01:3306/movie \
--username root --password root \
-m 5 --split-by last_modified \
--incremental lastmodified --check-column last_modified \
--query 'SELECT user.id, user.gender, user.zip_code, user.age, occupation.occupation, user.last_modified
FROM user JOIN occupation ON user.occupation_id = occupation.id WHERE $CONDITIONS' \
--fields-terminated-by "\t" \
--target-dir /user/hadoop/movielens/user_stage 


c. 执行user_import任务

前提:确保路径下没内容,如果有要先删除。

hadoop fs -rm -r /user/hadoop/movielens/user_stage

执行任务:

sqoop job --exec user_import --meta-connect jdbc:hsqldb:hsql://hadoop01:16000/sqoop

输入Mysql的密码,即可开始同步数据。

查看数据:

hadoop fs -ls /user/hadoop/movielens/user_stage


image.png


d. 创建一张临时用户表user_stage:

DROP TABLE IF EXISTS user_stage;
CREATE EXTERNAL TABLE user_stage (
    id INT,
    gender STRING,
    zip_code STRING,
    age INT,
    occupation STRING,
    last_modified STRING
)
ROW FORMAT DELIMITED
       FIELDS TERMINATED BY '\t'
LOCATION '/user/hadoop/movielens/user_stage';


e. 创建用户历史表user_rating_history:

CREATE EXTERNAL TABLE IF NOT EXISTS user_history (
    id INT,
    gender STRING,
    zip_code STRING,
    age INT,
    occupation STRING,
    last_modified STRING
)
PARTITIONED BY (year INT, month INT, day INT)
STORED AS AVRO;


f. 将临时表的数据先插入到user_history表:

INSERT INTO TABLE user_history PARTITION(year, month, day)
SELECT *, year(last_modified) as year, month(last_modified) as month, day(last_modified) as day FROM user_stage;


执行完,我们的user_history表的数据就是Mysql未改变前的数据。

g. 创建一张用户表users(不要用user,因为可能会跟内置的用户冲突):


CREATE EXTERNAL TABLE IF NOT EXISTS users (
    id INT,
    gender STRING,
    zip_code STRING,
    age INT,
    occupation STRING,
    last_modified STRING
)
STORED AS PARQUET;


合并修改更新后的user(左连接:排除users表中修改过的数据,等下直接拿user_stage倒进来就可以了)

INSERT OVERWRITE TABLE users 
SELECT users.id, users.gender, users.zip_code, users.age, users.occupation, users.last_modified 
  FROM users LEFT JOIN user_stage ON users.id = user_stage.id WHERE user_stage.id IS NULL;


INSERT INTO TABLE users SELECT id, gender, zip_code, age, occupation, last_modified FROM user_stage;

第一次没有修改,也没有添加,所以users表数据与user_stage一样。


4.1 模拟修改与增量数据操作

a. 更新Mysql的数据:


update user set gender='F', last_modified=now() where id=1;
insert into user values(944, 'F', "345642", 20, 19, now());


b. 同步Mysql数据到HDFS:

hadoop fs -rm -r /user/hadoop/movielens/user_stage

sqoop job --exec user_import --meta-connect jdbc:hsqldb:hsql://hadoop01:16000/sqoop

输入Mysql的密码即可同步!

hadoop fs -cat /user/hadoop/movielens/user_stage/*


image.png


c. 插入数据到user_history:

INSERT INTO TABLE user_history PARTITION(year, month, day) SELECT *, year(last_modified) as year, month(last_modified) as month, day(last_modified) as day FROM user_stage;


d. 合并修改更新后的user

INSERT OVERWRITE TABLE users 
SELECT users.id, users.gender, users.zip_code, users.age, users.occupation, users.last_modified 
  FROM users LEFT JOIN user_stage ON users.id = user_stage.id WHERE user_stage.id IS NULL;


image.png


原本943条条记录,进过上面命令,排除了1条修改过的记录,还有943-1=942条。

e. 插入临时表里面的数据:

INSERT INTO TABLE users SELECT id, gender, zip_code, age, occupation, last_modified FROM user_stage;


image.png


942+2=944条记录。


0xFF 总结


  1. 过程繁琐,写教程难度大,写了好几个钟,辛苦了。
  2. 本教程的特色是加上我们的增量、已经修改增量,比较贴切企业开发每天的数据。
相关实践学习
每个IT人都想学的“Web应用上云经典架构”实战
本实验从Web应用上云这个最基本的、最普遍的需求出发,帮助IT从业者们通过“阿里云Web应用上云解决方案”,了解一个企业级Web应用上云的常见架构,了解如何构建一个高可用、可扩展的企业级应用架构。
MySQL数据库入门学习
本课程通过最流行的开源数据库MySQL带你了解数据库的世界。 &nbsp; 相关的阿里云产品:云数据库RDS MySQL 版 阿里云关系型数据库RDS(Relational Database Service)是一种稳定可靠、可弹性伸缩的在线数据库服务,提供容灾、备份、恢复、迁移等方面的全套解决方案,彻底解决数据库运维的烦恼。 了解产品详情:&nbsp;https://www.aliyun.com/product/rds/mysql&nbsp;
相关文章
|
SQL 存储 分布式数据库
分布式存储数据恢复—hbase和hive数据库数据恢复案例
分布式存储数据恢复环境: 16台某品牌R730xd服务器节点,每台服务器节点上有数台虚拟机。 虚拟机上部署Hbase和Hive数据库。 分布式存储故障: 数据库底层文件被误删除,数据库不能使用。要求恢复hbase和hive数据库。
519 12
|
SQL 分布式计算 关系型数据库
Hadoop-21 Sqoop 数据迁移工具 简介与环境配置 云服务器 ETL工具 MySQL与Hive数据互相迁移 导入导出
Hadoop-21 Sqoop 数据迁移工具 简介与环境配置 云服务器 ETL工具 MySQL与Hive数据互相迁移 导入导出
439 3
|
SQL 分布式计算 关系型数据库
Hadoop-24 Sqoop迁移 MySQL到Hive 与 Hive到MySQL SQL生成数据 HDFS集群 Sqoop import jdbc ETL MapReduce
Hadoop-24 Sqoop迁移 MySQL到Hive 与 Hive到MySQL SQL生成数据 HDFS集群 Sqoop import jdbc ETL MapReduce
438 0
|
SQL 分布式计算 关系型数据库
Hadoop-23 Sqoop 数据MySQL到HDFS(部分) SQL生成数据 HDFS集群 Sqoop import jdbc ETL MapReduce
Hadoop-23 Sqoop 数据MySQL到HDFS(部分) SQL生成数据 HDFS集群 Sqoop import jdbc ETL MapReduce
269 0
|
SQL 分布式计算 关系型数据库
Hadoop-22 Sqoop 数据MySQL到HDFS(全量) SQL生成数据 HDFS集群 Sqoop import jdbc ETL MapReduce
Hadoop-22 Sqoop 数据MySQL到HDFS(全量) SQL生成数据 HDFS集群 Sqoop import jdbc ETL MapReduce
358 0
|
8月前
|
缓存 关系型数据库 BI
使用MYSQL Report分析数据库性能(下)
使用MYSQL Report分析数据库性能
536 158
|
8月前
|
关系型数据库 MySQL 数据库
自建数据库如何迁移至RDS MySQL实例
数据库迁移是一项复杂且耗时的工程,需考虑数据安全、完整性及业务中断影响。使用阿里云数据传输服务DTS,可快速、平滑完成迁移任务,将应用停机时间降至分钟级。您还可通过全量备份自建数据库并恢复至RDS MySQL实例,实现间接迁移上云。
|
8月前
|
关系型数据库 MySQL 数据库
阿里云数据库RDS费用价格:MySQL、SQL Server、PostgreSQL和MariaDB引擎收费标准
阿里云RDS数据库支持MySQL、SQL Server、PostgreSQL、MariaDB,多种引擎优惠上线!MySQL倚天版88元/年,SQL Server 2核4G仅299元/年,PostgreSQL 227元/年起。高可用、可弹性伸缩,安全稳定。详情见官网活动页。
1422 152
|
8月前
|
关系型数据库 MySQL 数据库
阿里云数据库RDS支持MySQL、SQL Server、PostgreSQL和MariaDB引擎
阿里云数据库RDS支持MySQL、SQL Server、PostgreSQL和MariaDB引擎,提供高性价比、稳定安全的云数据库服务,适用于多种行业与业务场景。
1027 156
|
8月前
|
缓存 监控 关系型数据库
使用MYSQL Report分析数据库性能(中)
使用MYSQL Report分析数据库性能
577 156

热门文章

最新文章