使用Sqoop导出Mysql数据到Hive(实战案例)

本文涉及的产品
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
云数据库 RDS MySQL,高可用系列 2核4GB
RDS MySQL Serverless 高可用系列,价值2615元额度,1个月
简介: 使用Sqoop导出Mysql数据到Hive(实战案例)

0x00 教程内容


  1. SQL文件准备
  2. 导出Mysql数据到Hive

前提:

  1. 安装好了Hive
  2. 安装好了Sqoop


0x01 SQL文件准备


1. 准备sql脚本

a. 准备建表与添加数据sql文件


image.png


2. 执行sql脚本

source /home/hadoop/scripts/sql/init_exec.sql

PS:文件多的话,可能执行起来会有点慢。


0x02 导出Mysql数据到Hive


1. 导出数据到HDFS

a. 将Mysql中带有电影种类信息的电影数据导入到HDFS中

sqoop import --connect jdbc:mysql://hadoop01:3306/movie \
--username root --password root \
--query 'SELECT movie.*, group_concat(genre.name) FROM movie
JOIN movie_genre ON (movie.id = movie_genre.movie_id)
JOIN genre ON (movie_genre.genre_id = genre.id)
WHERE $CONDITIONS
GROUP BY movie.id' \
--split-by movie.id \
--fields-terminated-by "\t" \
--target-dir /user/hadoop/movielens/movie \
--delete-target-dir


执行后,/user/hadoop/movielens/movie目录下就有数据了。

hadoop fs -ls /user/hadoop/movielens/movie


image.png


2. 构建Hive表关联HDFS(movie表)

a. 创建一个movielens数据库:

CREATE DATABASE IF NOT EXISTS movielens;

b. 创建一张临时存放数据的表:


CREATE EXTERNAL TABLE IF NOT EXISTS movielens.movie_temp(
  id INT, 
  title STRING,
  release_date STRING,
  video_release_date STRING,
  imdb_url STRING, 
  genres ARRAY<STRING>
)
ROW FORMAT DELIMITED
      FIELDS TERMINATED BY '\t'
      COLLECTION ITEMS TERMINATED BY ','
LOCATION '/user/hadoop/movielens/movie';


建完表,movie_temp表就已经有数据了!

select count(*) from movie_temp;


image.png


c. 创建一张真实存放数据的表(将临时表的数据导入到这里来)


CREATE EXTERNAL TABLE IF NOT EXISTS movielens.movie(
  id INT, 
  title STRING,
  release_date STRING,
  video_release_date STRING,
  imdb_url STRING, 
  genres ARRAY<STRING>)
STORED AS PARQUET;


d. 将临时表的数据导入到这里来

INSERT OVERWRITE TABLE movielens.movie SELECT * FROM movielens.movie_temp;

查看movie表有没有数据:

select count(*) from movie;


image.png


e. 最后删除临时表

DROP TABLE movielens.movie_temp;


3. 构建Hive表关联HDFS(user_rating表)

a. 启动Sqoop的metastore服务:

sqoop metastore &

b. 创建一个sqoop job

前提:创建之前确保作业已删除,否则请先删除:

sqoop job --delete user_rating_import --meta-connect jdbc:hsqldb:hsql://hadoop01:16000/sqoop


## 增量的将user_rating的数据从mysql中导入到hdfs中
sqoop job --create user_rating_import \
--meta-connect jdbc:hsqldb:hsql://hadoop01:16000/sqoop \
-- import --connect jdbc:mysql://hadoop01:3306/movie \
--username root --password root \
--table user_rating -m 5 --split-by dt_time \
--incremental append --check-column dt_time \
--fields-terminated-by "\t" \
--target-dir /user/hadoop/movielens/user_rating_stage


报错:

image.png


解决:

拷贝java-json.jar包放于$SQOOP_HOME/lib路径下:

image.png


c. 执行user_rating_import任务

前提:确保路径下没内容,如果有要先删除。

hadoop fs -rm -r /user/hadoop/movielens/user_rating_stage

执行任务:

sqoop job --exec user_rating_import --meta-connect jdbc:hsqldb:hsql://hadoop01:16000/sqoop

输入Mysql的密码,即可开始同步数据。

查看数据:

hadoop fs -ls /user/hadoop/movielens/user_rating_stage


image.png

d. 创建一张临时存放数据的表:


DROP TABLE IF EXISTS user_rating_stage;
CREATE EXTERNAL TABLE user_rating_stage (
  id INT,
  user_id INT,
  movie_id INT,
  rating INT,
  dt_time STRING
)
ROW FORMAT DELIMITED
       FIELDS TERMINATED BY '\t'
LOCATION '/user/hadoop/movielens/user_rating_stage';


select count(*) from user_rating_stage;

image.png


e. 创建一张以后会含有重复评分的user_rating表

CREATE EXTERNAL TABLE IF NOT EXISTS user_rating (
  id INT,
  user_id INT,
  movie_id INT,
  rating INT,
  dt_time STRING
)
PARTITIONED BY (year INT, month INT, day INT)
STORED AS AVRO;


f. 挑选user_rating_stage的相应字段插入到user_rating表:

SET hive.exec.dynamic.partition.mode=nonstrict;
INSERT INTO TABLE user_rating PARTITION(year, month, day)
SELECT id, user_id, movie_id, rating, dt_time, year(dt_time) as year, month(dt_time) as month, day(dt_time) as day
FROM user_rating_stage;


查看一下是否有数据:

select count(*) from user_rating;

image.png


g. 创建评分事实表(没有重复数据的)

CREATE EXTERNAL TABLE IF NOT EXISTS user_rating_fact (
  id INT,
  user_id INT,
  movie_id INT,
  rating INT,
  dt_time STRING
)
PARTITIONED BY (year INT, month INT, day INT)
STORED AS PARQUET;


将user_rating表进行去重操作:

WITH t1 as (
  SELECT 
    id, 
    user_id, 
    movie_id, 
    rating, 
    dt_time, 
    year,
    month,
    day,
    ROW_NUMBER() OVER (PARTITION BY user_id, movie_id ORDER BY dt_time DESC) as rn
  FROM user_rating
)
INSERT OVERWRITE TABLE user_rating_fact PARTITION(year, month, day)
SELECT id, user_id, movie_id, rating, dt_time, year, month, day FROM t1 WHERE rn = 1;


查看数据,因为表数据没改变,所以都是10w条:

select count(*) from user_rating_fact;

image.png


3.1 模拟增量添加数据操作

a. 打开一个终端进入Mysql,插入数据:

insert into user_rating values(0, 196, 242, 5, STR_TO_DATE("1970-01-12 12:47:30", "%Y-%m-%d %T"));
insert into user_rating values(1, 186, 302, 4, STR_TO_DATE("1970-01-12 15:41:57", "%Y-%m-%d %T"));
insert into user_rating values(2, 22, 377, 3, STR_TO_DATE("1970-01-12 12:08:07", "%Y-%m-%d %T"));
insert into user_rating values(3, 244, 51, 1, STR_TO_DATE("1970-01-12 12:36:46", "%Y-%m-%d %T"));
insert into user_rating values(4, 166, 346, 1, STR_TO_DATE("1970-01-12 14:13:17", "%Y-%m-%d %T"));
insert into user_rating values(5, 298, 474, 4, STR_TO_DATE("1970-01-12 13:36:22", "%Y-%m-%d %T"));
insert into user_rating values(6, 115, 265, 2, STR_TO_DATE("1970-01-12 12:46:11", "%Y-%m-%d %T"));
insert into user_rating values(7, 253, 465, 5, STR_TO_DATE("1970-01-12 15:40:28", "%Y-%m-%d %T"));
insert into user_rating values(8, 305, 451, 3, STR_TO_DATE("1970-01-12 14:12:04", "%Y-%m-%d %T"));


b. 重新执行user_rating_import任务

要先删除user_rating_stage路径下的内容:

hadoop fs -rm -r /user/hadoop/movielens/user_rating_stage

重新执行任务:

sqoop job --exec user_rating_import --meta-connect jdbc:hsqldb:hsql://hadoop01:16000/sqoop

输入Mysql的密码,即可开始同步数据。

查看数据:

hadoop fs -cat /user/hadoop/movielens/user_rating_stage/*


image.png


只有我们插入的那几条数据!

c. 将新增的数据插入到评分记录表中:

INSERT INTO TABLE user_rating PARTITION(year, month, day)
SELECT id, user_id, movie_id, rating, dt_time, year(dt_time) as year, month(dt_time) as month, day(dt_time) as day
FROM user_rating_stage;


此时评分记录表含有100000+9=1000009条记录

select count(*) from user_rating;

image.png


d. 评分记录表重新操作可以得到评分的实时表:

操作之前看一下我们Mysql的记录,其实我们是已经插入了另外一条了,等一下我们要进行去重操作,最后才会得到我们的评分事实表(user_rating_fact):


image.png


e. 去重操作:

WITH t1 as (
  SELECT 
    id, 
    user_id, 
    movie_id, 
    rating, 
    dt_time, 
    year,
    month,
    day,
    ROW_NUMBER() OVER (PARTITION BY user_id, movie_id ORDER BY dt_time DESC) as rn
  FROM user_rating
)
INSERT OVERWRITE TABLE user_rating_fact PARTITION(year, month, day)
SELECT id, user_id, movie_id, rating, dt_time, year, month, day FROM t1 WHERE rn = 1;


select count(*) from user_rating_fact;

select * from user_rating_fact where id = 1;

image.png


发现,我们id为1的这条记录的评分,是拿了最新的那个时间的,即评分为4的。


4. 构建Hive表关联HDFS(users表)

a. 前提已启动Sqoop的metastore服务,否则:

sqoop metastore &

b. 创建一个sqoop job

前提:创建之前确保作业已删除,否则请先删除:

sqoop job --delete user_import --meta-connect jdbc:hsqldb:hsql://hadoop01:16000/sqoop


sqoop job --create user_import \
--meta-connect jdbc:hsqldb:hsql://hadoop01:16000/sqoop \
-- import --connect jdbc:mysql://hadoop01:3306/movie \
--username root --password root \
-m 5 --split-by last_modified \
--incremental lastmodified --check-column last_modified \
--query 'SELECT user.id, user.gender, user.zip_code, user.age, occupation.occupation, user.last_modified
FROM user JOIN occupation ON user.occupation_id = occupation.id WHERE $CONDITIONS' \
--fields-terminated-by "\t" \
--target-dir /user/hadoop/movielens/user_stage 


c. 执行user_import任务

前提:确保路径下没内容,如果有要先删除。

hadoop fs -rm -r /user/hadoop/movielens/user_stage

执行任务:

sqoop job --exec user_import --meta-connect jdbc:hsqldb:hsql://hadoop01:16000/sqoop

输入Mysql的密码,即可开始同步数据。

查看数据:

hadoop fs -ls /user/hadoop/movielens/user_stage


image.png


d. 创建一张临时用户表user_stage:

DROP TABLE IF EXISTS user_stage;
CREATE EXTERNAL TABLE user_stage (
    id INT,
    gender STRING,
    zip_code STRING,
    age INT,
    occupation STRING,
    last_modified STRING
)
ROW FORMAT DELIMITED
       FIELDS TERMINATED BY '\t'
LOCATION '/user/hadoop/movielens/user_stage';


e. 创建用户历史表user_rating_history:

CREATE EXTERNAL TABLE IF NOT EXISTS user_history (
    id INT,
    gender STRING,
    zip_code STRING,
    age INT,
    occupation STRING,
    last_modified STRING
)
PARTITIONED BY (year INT, month INT, day INT)
STORED AS AVRO;


f. 将临时表的数据先插入到user_history表:

INSERT INTO TABLE user_history PARTITION(year, month, day)
SELECT *, year(last_modified) as year, month(last_modified) as month, day(last_modified) as day FROM user_stage;


执行完,我们的user_history表的数据就是Mysql未改变前的数据。

g. 创建一张用户表users(不要用user,因为可能会跟内置的用户冲突):


CREATE EXTERNAL TABLE IF NOT EXISTS users (
    id INT,
    gender STRING,
    zip_code STRING,
    age INT,
    occupation STRING,
    last_modified STRING
)
STORED AS PARQUET;


合并修改更新后的user(左连接:排除users表中修改过的数据,等下直接拿user_stage倒进来就可以了)

INSERT OVERWRITE TABLE users 
SELECT users.id, users.gender, users.zip_code, users.age, users.occupation, users.last_modified 
  FROM users LEFT JOIN user_stage ON users.id = user_stage.id WHERE user_stage.id IS NULL;


INSERT INTO TABLE users SELECT id, gender, zip_code, age, occupation, last_modified FROM user_stage;

第一次没有修改,也没有添加,所以users表数据与user_stage一样。


4.1 模拟修改与增量数据操作

a. 更新Mysql的数据:


update user set gender='F', last_modified=now() where id=1;
insert into user values(944, 'F', "345642", 20, 19, now());


b. 同步Mysql数据到HDFS:

hadoop fs -rm -r /user/hadoop/movielens/user_stage

sqoop job --exec user_import --meta-connect jdbc:hsqldb:hsql://hadoop01:16000/sqoop

输入Mysql的密码即可同步!

hadoop fs -cat /user/hadoop/movielens/user_stage/*


image.png


c. 插入数据到user_history:

INSERT INTO TABLE user_history PARTITION(year, month, day) SELECT *, year(last_modified) as year, month(last_modified) as month, day(last_modified) as day FROM user_stage;


d. 合并修改更新后的user

INSERT OVERWRITE TABLE users 
SELECT users.id, users.gender, users.zip_code, users.age, users.occupation, users.last_modified 
  FROM users LEFT JOIN user_stage ON users.id = user_stage.id WHERE user_stage.id IS NULL;


image.png


原本943条条记录,进过上面命令,排除了1条修改过的记录,还有943-1=942条。

e. 插入临时表里面的数据:

INSERT INTO TABLE users SELECT id, gender, zip_code, age, occupation, last_modified FROM user_stage;


image.png


942+2=944条记录。


0xFF 总结


  1. 过程繁琐,写教程难度大,写了好几个钟,辛苦了。
  2. 本教程的特色是加上我们的增量、已经修改增量,比较贴切企业开发每天的数据。
相关实践学习
每个IT人都想学的“Web应用上云经典架构”实战
本实验从Web应用上云这个最基本的、最普遍的需求出发,帮助IT从业者们通过“阿里云Web应用上云解决方案”,了解一个企业级Web应用上云的常见架构,了解如何构建一个高可用、可扩展的企业级应用架构。
MySQL数据库入门学习
本课程通过最流行的开源数据库MySQL带你了解数据库的世界。 &nbsp; 相关的阿里云产品:云数据库RDS MySQL 版 阿里云关系型数据库RDS(Relational Database Service)是一种稳定可靠、可弹性伸缩的在线数据库服务,提供容灾、备份、恢复、迁移等方面的全套解决方案,彻底解决数据库运维的烦恼。 了解产品详情:&nbsp;https://www.aliyun.com/product/rds/mysql&nbsp;
相关文章
|
SQL 分布式计算 Hadoop
Hadoop-14-Hive HQL学习与测试 表连接查询 HDFS数据导入导出等操作 逻辑运算 函数查询 全表查询 WHERE GROUP BY ORDER BY(一)
Hadoop-14-Hive HQL学习与测试 表连接查询 HDFS数据导入导出等操作 逻辑运算 函数查询 全表查询 WHERE GROUP BY ORDER BY(一)
233 4
|
SQL 分布式计算 关系型数据库
Hadoop-21 Sqoop 数据迁移工具 简介与环境配置 云服务器 ETL工具 MySQL与Hive数据互相迁移 导入导出
Hadoop-21 Sqoop 数据迁移工具 简介与环境配置 云服务器 ETL工具 MySQL与Hive数据互相迁移 导入导出
345 3
|
SQL 分布式计算 关系型数据库
Hadoop-24 Sqoop迁移 MySQL到Hive 与 Hive到MySQL SQL生成数据 HDFS集群 Sqoop import jdbc ETL MapReduce
Hadoop-24 Sqoop迁移 MySQL到Hive 与 Hive到MySQL SQL生成数据 HDFS集群 Sqoop import jdbc ETL MapReduce
322 0
|
2月前
|
缓存 关系型数据库 BI
使用MYSQL Report分析数据库性能(下)
使用MYSQL Report分析数据库性能
128 3
|
2月前
|
关系型数据库 MySQL 数据库
自建数据库如何迁移至RDS MySQL实例
数据库迁移是一项复杂且耗时的工程,需考虑数据安全、完整性及业务中断影响。使用阿里云数据传输服务DTS,可快速、平滑完成迁移任务,将应用停机时间降至分钟级。您还可通过全量备份自建数据库并恢复至RDS MySQL实例,实现间接迁移上云。
|
3月前
|
存储 运维 关系型数据库
从MySQL到云数据库,数据库迁移真的有必要吗?
本文探讨了企业在业务增长背景下,是否应从 MySQL 迁移至云数据库的决策问题。分析了 MySQL 的优势与瓶颈,对比了云数据库在存储计算分离、自动化运维、多负载支持等方面的优势,并提出判断迁移必要性的五个关键问题及实施路径,帮助企业理性决策并落地迁移方案。
|
2月前
|
关系型数据库 MySQL 分布式数据库
阿里云PolarDB云原生数据库收费价格:MySQL和PostgreSQL详细介绍
阿里云PolarDB兼容MySQL、PostgreSQL及Oracle语法,支持集中式与分布式架构。标准版2核4G年费1116元起,企业版最高性能达4核16G,支持HTAP与多级高可用,广泛应用于金融、政务、互联网等领域,TCO成本降低50%。
|
2月前
|
关系型数据库 MySQL 数据库
阿里云数据库RDS费用价格:MySQL、SQL Server、PostgreSQL和MariaDB引擎收费标准
阿里云RDS数据库支持MySQL、SQL Server、PostgreSQL、MariaDB,多种引擎优惠上线!MySQL倚天版88元/年,SQL Server 2核4G仅299元/年,PostgreSQL 227元/年起。高可用、可弹性伸缩,安全稳定。详情见官网活动页。
|
2月前
|
关系型数据库 分布式数据库 数据库
阿里云数据库收费价格:MySQL、PostgreSQL、SQL Server和MariaDB引擎费用整理
阿里云数据库提供多种类型,包括关系型与NoSQL,主流如PolarDB、RDS MySQL/PostgreSQL、Redis等。价格低至21元/月起,支持按需付费与优惠套餐,适用于各类应用场景。
|
2月前
|
SQL 关系型数据库 MySQL
Mysql数据恢复—Mysql数据库delete删除后数据恢复案例
本地服务器,操作系统为windows server。服务器上部署mysql单实例,innodb引擎,独立表空间。未进行数据库备份,未开启binlog。 人为误操作使用Delete命令删除数据时未添加where子句,导致全表数据被删除。删除后未对该表进行任何操作。需要恢复误删除的数据。 在本案例中的mysql数据库未进行备份,也未开启binlog日志,无法直接还原数据库。
下一篇
oss云网关配置