使用Sqoop导出Mysql数据到Hive(实战案例)

本文涉及的产品
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
云数据库 RDS MySQL,集群系列 2核4GB
推荐场景:
搭建个人博客
云数据库 RDS PostgreSQL,集群系列 2核4GB
简介: 使用Sqoop导出Mysql数据到Hive(实战案例)

0x00 教程内容


  1. SQL文件准备
  2. 导出Mysql数据到Hive

前提:

  1. 安装好了Hive
  2. 安装好了Sqoop


0x01 SQL文件准备


1. 准备sql脚本

a. 准备建表与添加数据sql文件


image.png


2. 执行sql脚本

source /home/hadoop/scripts/sql/init_exec.sql

PS:文件多的话,可能执行起来会有点慢。


0x02 导出Mysql数据到Hive


1. 导出数据到HDFS

a. 将Mysql中带有电影种类信息的电影数据导入到HDFS中

sqoop import --connect jdbc:mysql://hadoop01:3306/movie \
--username root --password root \
--query 'SELECT movie.*, group_concat(genre.name) FROM movie
JOIN movie_genre ON (movie.id = movie_genre.movie_id)
JOIN genre ON (movie_genre.genre_id = genre.id)
WHERE $CONDITIONS
GROUP BY movie.id' \
--split-by movie.id \
--fields-terminated-by "\t" \
--target-dir /user/hadoop/movielens/movie \
--delete-target-dir


执行后,/user/hadoop/movielens/movie目录下就有数据了。

hadoop fs -ls /user/hadoop/movielens/movie


image.png


2. 构建Hive表关联HDFS(movie表)

a. 创建一个movielens数据库:

CREATE DATABASE IF NOT EXISTS movielens;

b. 创建一张临时存放数据的表:


CREATE EXTERNAL TABLE IF NOT EXISTS movielens.movie_temp(
  id INT, 
  title STRING,
  release_date STRING,
  video_release_date STRING,
  imdb_url STRING, 
  genres ARRAY<STRING>
)
ROW FORMAT DELIMITED
      FIELDS TERMINATED BY '\t'
      COLLECTION ITEMS TERMINATED BY ','
LOCATION '/user/hadoop/movielens/movie';


建完表,movie_temp表就已经有数据了!

select count(*) from movie_temp;


image.png


c. 创建一张真实存放数据的表(将临时表的数据导入到这里来)


CREATE EXTERNAL TABLE IF NOT EXISTS movielens.movie(
  id INT, 
  title STRING,
  release_date STRING,
  video_release_date STRING,
  imdb_url STRING, 
  genres ARRAY<STRING>)
STORED AS PARQUET;


d. 将临时表的数据导入到这里来

INSERT OVERWRITE TABLE movielens.movie SELECT * FROM movielens.movie_temp;

查看movie表有没有数据:

select count(*) from movie;


image.png


e. 最后删除临时表

DROP TABLE movielens.movie_temp;


3. 构建Hive表关联HDFS(user_rating表)

a. 启动Sqoop的metastore服务:

sqoop metastore &

b. 创建一个sqoop job

前提:创建之前确保作业已删除,否则请先删除:

sqoop job --delete user_rating_import --meta-connect jdbc:hsqldb:hsql://hadoop01:16000/sqoop


## 增量的将user_rating的数据从mysql中导入到hdfs中
sqoop job --create user_rating_import \
--meta-connect jdbc:hsqldb:hsql://hadoop01:16000/sqoop \
-- import --connect jdbc:mysql://hadoop01:3306/movie \
--username root --password root \
--table user_rating -m 5 --split-by dt_time \
--incremental append --check-column dt_time \
--fields-terminated-by "\t" \
--target-dir /user/hadoop/movielens/user_rating_stage


报错:

image.png


解决:

拷贝java-json.jar包放于$SQOOP_HOME/lib路径下:

image.png


c. 执行user_rating_import任务

前提:确保路径下没内容,如果有要先删除。

hadoop fs -rm -r /user/hadoop/movielens/user_rating_stage

执行任务:

sqoop job --exec user_rating_import --meta-connect jdbc:hsqldb:hsql://hadoop01:16000/sqoop

输入Mysql的密码,即可开始同步数据。

查看数据:

hadoop fs -ls /user/hadoop/movielens/user_rating_stage


image.png

d. 创建一张临时存放数据的表:


DROP TABLE IF EXISTS user_rating_stage;
CREATE EXTERNAL TABLE user_rating_stage (
  id INT,
  user_id INT,
  movie_id INT,
  rating INT,
  dt_time STRING
)
ROW FORMAT DELIMITED
       FIELDS TERMINATED BY '\t'
LOCATION '/user/hadoop/movielens/user_rating_stage';


select count(*) from user_rating_stage;

image.png


e. 创建一张以后会含有重复评分的user_rating表

CREATE EXTERNAL TABLE IF NOT EXISTS user_rating (
  id INT,
  user_id INT,
  movie_id INT,
  rating INT,
  dt_time STRING
)
PARTITIONED BY (year INT, month INT, day INT)
STORED AS AVRO;


f. 挑选user_rating_stage的相应字段插入到user_rating表:

SET hive.exec.dynamic.partition.mode=nonstrict;
INSERT INTO TABLE user_rating PARTITION(year, month, day)
SELECT id, user_id, movie_id, rating, dt_time, year(dt_time) as year, month(dt_time) as month, day(dt_time) as day
FROM user_rating_stage;


查看一下是否有数据:

select count(*) from user_rating;

image.png


g. 创建评分事实表(没有重复数据的)

CREATE EXTERNAL TABLE IF NOT EXISTS user_rating_fact (
  id INT,
  user_id INT,
  movie_id INT,
  rating INT,
  dt_time STRING
)
PARTITIONED BY (year INT, month INT, day INT)
STORED AS PARQUET;


将user_rating表进行去重操作:

WITH t1 as (
  SELECT 
    id, 
    user_id, 
    movie_id, 
    rating, 
    dt_time, 
    year,
    month,
    day,
    ROW_NUMBER() OVER (PARTITION BY user_id, movie_id ORDER BY dt_time DESC) as rn
  FROM user_rating
)
INSERT OVERWRITE TABLE user_rating_fact PARTITION(year, month, day)
SELECT id, user_id, movie_id, rating, dt_time, year, month, day FROM t1 WHERE rn = 1;


查看数据,因为表数据没改变,所以都是10w条:

select count(*) from user_rating_fact;

image.png


3.1 模拟增量添加数据操作

a. 打开一个终端进入Mysql,插入数据:

insert into user_rating values(0, 196, 242, 5, STR_TO_DATE("1970-01-12 12:47:30", "%Y-%m-%d %T"));
insert into user_rating values(1, 186, 302, 4, STR_TO_DATE("1970-01-12 15:41:57", "%Y-%m-%d %T"));
insert into user_rating values(2, 22, 377, 3, STR_TO_DATE("1970-01-12 12:08:07", "%Y-%m-%d %T"));
insert into user_rating values(3, 244, 51, 1, STR_TO_DATE("1970-01-12 12:36:46", "%Y-%m-%d %T"));
insert into user_rating values(4, 166, 346, 1, STR_TO_DATE("1970-01-12 14:13:17", "%Y-%m-%d %T"));
insert into user_rating values(5, 298, 474, 4, STR_TO_DATE("1970-01-12 13:36:22", "%Y-%m-%d %T"));
insert into user_rating values(6, 115, 265, 2, STR_TO_DATE("1970-01-12 12:46:11", "%Y-%m-%d %T"));
insert into user_rating values(7, 253, 465, 5, STR_TO_DATE("1970-01-12 15:40:28", "%Y-%m-%d %T"));
insert into user_rating values(8, 305, 451, 3, STR_TO_DATE("1970-01-12 14:12:04", "%Y-%m-%d %T"));


b. 重新执行user_rating_import任务

要先删除user_rating_stage路径下的内容:

hadoop fs -rm -r /user/hadoop/movielens/user_rating_stage

重新执行任务:

sqoop job --exec user_rating_import --meta-connect jdbc:hsqldb:hsql://hadoop01:16000/sqoop

输入Mysql的密码,即可开始同步数据。

查看数据:

hadoop fs -cat /user/hadoop/movielens/user_rating_stage/*


image.png


只有我们插入的那几条数据!

c. 将新增的数据插入到评分记录表中:

INSERT INTO TABLE user_rating PARTITION(year, month, day)
SELECT id, user_id, movie_id, rating, dt_time, year(dt_time) as year, month(dt_time) as month, day(dt_time) as day
FROM user_rating_stage;


此时评分记录表含有100000+9=1000009条记录

select count(*) from user_rating;

image.png


d. 评分记录表重新操作可以得到评分的实时表:

操作之前看一下我们Mysql的记录,其实我们是已经插入了另外一条了,等一下我们要进行去重操作,最后才会得到我们的评分事实表(user_rating_fact):


image.png


e. 去重操作:

WITH t1 as (
  SELECT 
    id, 
    user_id, 
    movie_id, 
    rating, 
    dt_time, 
    year,
    month,
    day,
    ROW_NUMBER() OVER (PARTITION BY user_id, movie_id ORDER BY dt_time DESC) as rn
  FROM user_rating
)
INSERT OVERWRITE TABLE user_rating_fact PARTITION(year, month, day)
SELECT id, user_id, movie_id, rating, dt_time, year, month, day FROM t1 WHERE rn = 1;


select count(*) from user_rating_fact;

select * from user_rating_fact where id = 1;

image.png


发现,我们id为1的这条记录的评分,是拿了最新的那个时间的,即评分为4的。


4. 构建Hive表关联HDFS(users表)

a. 前提已启动Sqoop的metastore服务,否则:

sqoop metastore &

b. 创建一个sqoop job

前提:创建之前确保作业已删除,否则请先删除:

sqoop job --delete user_import --meta-connect jdbc:hsqldb:hsql://hadoop01:16000/sqoop


sqoop job --create user_import \
--meta-connect jdbc:hsqldb:hsql://hadoop01:16000/sqoop \
-- import --connect jdbc:mysql://hadoop01:3306/movie \
--username root --password root \
-m 5 --split-by last_modified \
--incremental lastmodified --check-column last_modified \
--query 'SELECT user.id, user.gender, user.zip_code, user.age, occupation.occupation, user.last_modified
FROM user JOIN occupation ON user.occupation_id = occupation.id WHERE $CONDITIONS' \
--fields-terminated-by "\t" \
--target-dir /user/hadoop/movielens/user_stage 


c. 执行user_import任务

前提:确保路径下没内容,如果有要先删除。

hadoop fs -rm -r /user/hadoop/movielens/user_stage

执行任务:

sqoop job --exec user_import --meta-connect jdbc:hsqldb:hsql://hadoop01:16000/sqoop

输入Mysql的密码,即可开始同步数据。

查看数据:

hadoop fs -ls /user/hadoop/movielens/user_stage


image.png


d. 创建一张临时用户表user_stage:

DROP TABLE IF EXISTS user_stage;
CREATE EXTERNAL TABLE user_stage (
    id INT,
    gender STRING,
    zip_code STRING,
    age INT,
    occupation STRING,
    last_modified STRING
)
ROW FORMAT DELIMITED
       FIELDS TERMINATED BY '\t'
LOCATION '/user/hadoop/movielens/user_stage';


e. 创建用户历史表user_rating_history:

CREATE EXTERNAL TABLE IF NOT EXISTS user_history (
    id INT,
    gender STRING,
    zip_code STRING,
    age INT,
    occupation STRING,
    last_modified STRING
)
PARTITIONED BY (year INT, month INT, day INT)
STORED AS AVRO;


f. 将临时表的数据先插入到user_history表:

INSERT INTO TABLE user_history PARTITION(year, month, day)
SELECT *, year(last_modified) as year, month(last_modified) as month, day(last_modified) as day FROM user_stage;


执行完,我们的user_history表的数据就是Mysql未改变前的数据。

g. 创建一张用户表users(不要用user,因为可能会跟内置的用户冲突):


CREATE EXTERNAL TABLE IF NOT EXISTS users (
    id INT,
    gender STRING,
    zip_code STRING,
    age INT,
    occupation STRING,
    last_modified STRING
)
STORED AS PARQUET;


合并修改更新后的user(左连接:排除users表中修改过的数据,等下直接拿user_stage倒进来就可以了)

INSERT OVERWRITE TABLE users 
SELECT users.id, users.gender, users.zip_code, users.age, users.occupation, users.last_modified 
  FROM users LEFT JOIN user_stage ON users.id = user_stage.id WHERE user_stage.id IS NULL;


INSERT INTO TABLE users SELECT id, gender, zip_code, age, occupation, last_modified FROM user_stage;

第一次没有修改,也没有添加,所以users表数据与user_stage一样。


4.1 模拟修改与增量数据操作

a. 更新Mysql的数据:


update user set gender='F', last_modified=now() where id=1;
insert into user values(944, 'F', "345642", 20, 19, now());


b. 同步Mysql数据到HDFS:

hadoop fs -rm -r /user/hadoop/movielens/user_stage

sqoop job --exec user_import --meta-connect jdbc:hsqldb:hsql://hadoop01:16000/sqoop

输入Mysql的密码即可同步!

hadoop fs -cat /user/hadoop/movielens/user_stage/*


image.png


c. 插入数据到user_history:

INSERT INTO TABLE user_history PARTITION(year, month, day) SELECT *, year(last_modified) as year, month(last_modified) as month, day(last_modified) as day FROM user_stage;


d. 合并修改更新后的user

INSERT OVERWRITE TABLE users 
SELECT users.id, users.gender, users.zip_code, users.age, users.occupation, users.last_modified 
  FROM users LEFT JOIN user_stage ON users.id = user_stage.id WHERE user_stage.id IS NULL;


image.png


原本943条条记录,进过上面命令,排除了1条修改过的记录,还有943-1=942条。

e. 插入临时表里面的数据:

INSERT INTO TABLE users SELECT id, gender, zip_code, age, occupation, last_modified FROM user_stage;


image.png


942+2=944条记录。


0xFF 总结


  1. 过程繁琐,写教程难度大,写了好几个钟,辛苦了。
  2. 本教程的特色是加上我们的增量、已经修改增量,比较贴切企业开发每天的数据。
相关实践学习
如何在云端创建MySQL数据库
开始实验后,系统会自动创建一台自建MySQL的 源数据库 ECS 实例和一台 目标数据库 RDS。
全面了解阿里云能为你做什么
阿里云在全球各地部署高效节能的绿色数据中心,利用清洁计算为万物互联的新世界提供源源不断的能源动力,目前开服的区域包括中国(华北、华东、华南、香港)、新加坡、美国(美东、美西)、欧洲、中东、澳大利亚、日本。目前阿里云的产品涵盖弹性计算、数据库、存储与CDN、分析与搜索、云通信、网络、管理与监控、应用服务、互联网中间件、移动服务、视频服务等。通过本课程,来了解阿里云能够为你的业务带来哪些帮助 &nbsp; &nbsp; 相关的阿里云产品:云服务器ECS 云服务器 ECS(Elastic Compute Service)是一种弹性可伸缩的计算服务,助您降低 IT 成本,提升运维效率,使您更专注于核心业务创新。产品详情: https://www.aliyun.com/product/ecs
相关文章
|
22天前
|
分布式计算 关系型数据库 MySQL
大数据-88 Spark 集群 案例学习 Spark Scala 案例 SuperWordCount 计算结果数据写入MySQL
大数据-88 Spark 集群 案例学习 Spark Scala 案例 SuperWordCount 计算结果数据写入MySQL
44 3
|
22天前
|
SQL 关系型数据库 MySQL
案例剖析:MySQL唯一索引并发插入导致死锁!
案例剖析:MySQL唯一索引并发插入导致死锁!
案例剖析:MySQL唯一索引并发插入导致死锁!
|
22天前
|
SQL 关系型数据库 MySQL
案例剖析,MySQL共享锁引发的死锁问题!
案例剖析,MySQL共享锁引发的死锁问题!
|
22天前
|
消息中间件 关系型数据库 MySQL
大数据-117 - Flink DataStream Sink 案例:写出到MySQL、写出到Kafka
大数据-117 - Flink DataStream Sink 案例:写出到MySQL、写出到Kafka
93 0
|
5天前
|
关系型数据库 MySQL 数据库
一个 MySQL 数据库死锁的案例和解决方案
本文介绍了一个 MySQL 数据库死锁的案例和解决方案。
15 3
|
8天前
|
存储 关系型数据库 MySQL
基于案例分析 MySQL 权限认证中的具体优先原则
【10月更文挑战第26天】本文通过具体案例分析了MySQL权限认证中的优先原则,包括全局权限、数据库级别权限和表级别权限的设置与优先级。全局权限优先于数据库级别权限,后者又优先于表级别权限。在权限冲突时,更严格的权限将被优先执行,确保数据库的安全性与资源合理分配。
|
23天前
|
SQL 分布式计算 关系型数据库
Hadoop-21 Sqoop 数据迁移工具 简介与环境配置 云服务器 ETL工具 MySQL与Hive数据互相迁移 导入导出
Hadoop-21 Sqoop 数据迁移工具 简介与环境配置 云服务器 ETL工具 MySQL与Hive数据互相迁移 导入导出
43 3
|
22天前
|
SQL 分布式计算 关系型数据库
Hadoop-24 Sqoop迁移 MySQL到Hive 与 Hive到MySQL SQL生成数据 HDFS集群 Sqoop import jdbc ETL MapReduce
Hadoop-24 Sqoop迁移 MySQL到Hive 与 Hive到MySQL SQL生成数据 HDFS集群 Sqoop import jdbc ETL MapReduce
61 0
|
分布式计算 关系型数据库 MySQL
E-Mapreduce如何处理RDS的数据
目前网站的一些业务数据存在了数据库中,这些数据往往需要做进一步的分析,如:需要跟一些日志数据关联分析,或者需要进行一些如机器学习的分析。在阿里云上,目前E-Mapreduce可以满足这类进一步分析的需求。
4970 0
|
16天前
|
存储 关系型数据库 MySQL
Mysql(4)—数据库索引
数据库索引是用于提高数据检索效率的数据结构,类似于书籍中的索引。它允许用户快速找到数据,而无需扫描整个表。MySQL中的索引可以显著提升查询速度,使数据库操作更加高效。索引的发展经历了从无索引、简单索引到B-树、哈希索引、位图索引、全文索引等多个阶段。
50 3
Mysql(4)—数据库索引

热门文章

最新文章