使用Sqoop导出Mysql数据到Hive(实战案例)

本文涉及的产品
RDS AI 助手,专业版
RDS MySQL DuckDB 分析主实例,集群系列 4核8GB
RDS Agent(兼容OpenClaw),2核4GB
简介: 使用Sqoop导出Mysql数据到Hive(实战案例)

0x00 教程内容


  1. SQL文件准备
  2. 导出Mysql数据到Hive

前提:

  1. 安装好了Hive
  2. 安装好了Sqoop


0x01 SQL文件准备


1. 准备sql脚本

a. 准备建表与添加数据sql文件


image.png


2. 执行sql脚本

source /home/hadoop/scripts/sql/init_exec.sql

PS:文件多的话,可能执行起来会有点慢。


0x02 导出Mysql数据到Hive


1. 导出数据到HDFS

a. 将Mysql中带有电影种类信息的电影数据导入到HDFS中

sqoop import --connect jdbc:mysql://hadoop01:3306/movie \
--username root --password root \
--query 'SELECT movie.*, group_concat(genre.name) FROM movie
JOIN movie_genre ON (movie.id = movie_genre.movie_id)
JOIN genre ON (movie_genre.genre_id = genre.id)
WHERE $CONDITIONS
GROUP BY movie.id' \
--split-by movie.id \
--fields-terminated-by "\t" \
--target-dir /user/hadoop/movielens/movie \
--delete-target-dir


执行后,/user/hadoop/movielens/movie目录下就有数据了。

hadoop fs -ls /user/hadoop/movielens/movie


image.png


2. 构建Hive表关联HDFS(movie表)

a. 创建一个movielens数据库:

CREATE DATABASE IF NOT EXISTS movielens;

b. 创建一张临时存放数据的表:


CREATE EXTERNAL TABLE IF NOT EXISTS movielens.movie_temp(
  id INT, 
  title STRING,
  release_date STRING,
  video_release_date STRING,
  imdb_url STRING, 
  genres ARRAY<STRING>
)
ROW FORMAT DELIMITED
      FIELDS TERMINATED BY '\t'
      COLLECTION ITEMS TERMINATED BY ','
LOCATION '/user/hadoop/movielens/movie';


建完表,movie_temp表就已经有数据了!

select count(*) from movie_temp;


image.png


c. 创建一张真实存放数据的表(将临时表的数据导入到这里来)


CREATE EXTERNAL TABLE IF NOT EXISTS movielens.movie(
  id INT, 
  title STRING,
  release_date STRING,
  video_release_date STRING,
  imdb_url STRING, 
  genres ARRAY<STRING>)
STORED AS PARQUET;


d. 将临时表的数据导入到这里来

INSERT OVERWRITE TABLE movielens.movie SELECT * FROM movielens.movie_temp;

查看movie表有没有数据:

select count(*) from movie;


image.png


e. 最后删除临时表

DROP TABLE movielens.movie_temp;


3. 构建Hive表关联HDFS(user_rating表)

a. 启动Sqoop的metastore服务:

sqoop metastore &

b. 创建一个sqoop job

前提:创建之前确保作业已删除,否则请先删除:

sqoop job --delete user_rating_import --meta-connect jdbc:hsqldb:hsql://hadoop01:16000/sqoop


## 增量的将user_rating的数据从mysql中导入到hdfs中
sqoop job --create user_rating_import \
--meta-connect jdbc:hsqldb:hsql://hadoop01:16000/sqoop \
-- import --connect jdbc:mysql://hadoop01:3306/movie \
--username root --password root \
--table user_rating -m 5 --split-by dt_time \
--incremental append --check-column dt_time \
--fields-terminated-by "\t" \
--target-dir /user/hadoop/movielens/user_rating_stage


报错:

image.png


解决:

拷贝java-json.jar包放于$SQOOP_HOME/lib路径下:

image.png


c. 执行user_rating_import任务

前提:确保路径下没内容,如果有要先删除。

hadoop fs -rm -r /user/hadoop/movielens/user_rating_stage

执行任务:

sqoop job --exec user_rating_import --meta-connect jdbc:hsqldb:hsql://hadoop01:16000/sqoop

输入Mysql的密码,即可开始同步数据。

查看数据:

hadoop fs -ls /user/hadoop/movielens/user_rating_stage


image.png

d. 创建一张临时存放数据的表:


DROP TABLE IF EXISTS user_rating_stage;
CREATE EXTERNAL TABLE user_rating_stage (
  id INT,
  user_id INT,
  movie_id INT,
  rating INT,
  dt_time STRING
)
ROW FORMAT DELIMITED
       FIELDS TERMINATED BY '\t'
LOCATION '/user/hadoop/movielens/user_rating_stage';


select count(*) from user_rating_stage;

image.png


e. 创建一张以后会含有重复评分的user_rating表

CREATE EXTERNAL TABLE IF NOT EXISTS user_rating (
  id INT,
  user_id INT,
  movie_id INT,
  rating INT,
  dt_time STRING
)
PARTITIONED BY (year INT, month INT, day INT)
STORED AS AVRO;


f. 挑选user_rating_stage的相应字段插入到user_rating表:

SET hive.exec.dynamic.partition.mode=nonstrict;
INSERT INTO TABLE user_rating PARTITION(year, month, day)
SELECT id, user_id, movie_id, rating, dt_time, year(dt_time) as year, month(dt_time) as month, day(dt_time) as day
FROM user_rating_stage;


查看一下是否有数据:

select count(*) from user_rating;

image.png


g. 创建评分事实表(没有重复数据的)

CREATE EXTERNAL TABLE IF NOT EXISTS user_rating_fact (
  id INT,
  user_id INT,
  movie_id INT,
  rating INT,
  dt_time STRING
)
PARTITIONED BY (year INT, month INT, day INT)
STORED AS PARQUET;


将user_rating表进行去重操作:

WITH t1 as (
  SELECT 
    id, 
    user_id, 
    movie_id, 
    rating, 
    dt_time, 
    year,
    month,
    day,
    ROW_NUMBER() OVER (PARTITION BY user_id, movie_id ORDER BY dt_time DESC) as rn
  FROM user_rating
)
INSERT OVERWRITE TABLE user_rating_fact PARTITION(year, month, day)
SELECT id, user_id, movie_id, rating, dt_time, year, month, day FROM t1 WHERE rn = 1;


查看数据,因为表数据没改变,所以都是10w条:

select count(*) from user_rating_fact;

image.png


3.1 模拟增量添加数据操作

a. 打开一个终端进入Mysql,插入数据:

insert into user_rating values(0, 196, 242, 5, STR_TO_DATE("1970-01-12 12:47:30", "%Y-%m-%d %T"));
insert into user_rating values(1, 186, 302, 4, STR_TO_DATE("1970-01-12 15:41:57", "%Y-%m-%d %T"));
insert into user_rating values(2, 22, 377, 3, STR_TO_DATE("1970-01-12 12:08:07", "%Y-%m-%d %T"));
insert into user_rating values(3, 244, 51, 1, STR_TO_DATE("1970-01-12 12:36:46", "%Y-%m-%d %T"));
insert into user_rating values(4, 166, 346, 1, STR_TO_DATE("1970-01-12 14:13:17", "%Y-%m-%d %T"));
insert into user_rating values(5, 298, 474, 4, STR_TO_DATE("1970-01-12 13:36:22", "%Y-%m-%d %T"));
insert into user_rating values(6, 115, 265, 2, STR_TO_DATE("1970-01-12 12:46:11", "%Y-%m-%d %T"));
insert into user_rating values(7, 253, 465, 5, STR_TO_DATE("1970-01-12 15:40:28", "%Y-%m-%d %T"));
insert into user_rating values(8, 305, 451, 3, STR_TO_DATE("1970-01-12 14:12:04", "%Y-%m-%d %T"));


b. 重新执行user_rating_import任务

要先删除user_rating_stage路径下的内容:

hadoop fs -rm -r /user/hadoop/movielens/user_rating_stage

重新执行任务:

sqoop job --exec user_rating_import --meta-connect jdbc:hsqldb:hsql://hadoop01:16000/sqoop

输入Mysql的密码,即可开始同步数据。

查看数据:

hadoop fs -cat /user/hadoop/movielens/user_rating_stage/*


image.png


只有我们插入的那几条数据!

c. 将新增的数据插入到评分记录表中:

INSERT INTO TABLE user_rating PARTITION(year, month, day)
SELECT id, user_id, movie_id, rating, dt_time, year(dt_time) as year, month(dt_time) as month, day(dt_time) as day
FROM user_rating_stage;


此时评分记录表含有100000+9=1000009条记录

select count(*) from user_rating;

image.png


d. 评分记录表重新操作可以得到评分的实时表:

操作之前看一下我们Mysql的记录,其实我们是已经插入了另外一条了,等一下我们要进行去重操作,最后才会得到我们的评分事实表(user_rating_fact):


image.png


e. 去重操作:

WITH t1 as (
  SELECT 
    id, 
    user_id, 
    movie_id, 
    rating, 
    dt_time, 
    year,
    month,
    day,
    ROW_NUMBER() OVER (PARTITION BY user_id, movie_id ORDER BY dt_time DESC) as rn
  FROM user_rating
)
INSERT OVERWRITE TABLE user_rating_fact PARTITION(year, month, day)
SELECT id, user_id, movie_id, rating, dt_time, year, month, day FROM t1 WHERE rn = 1;


select count(*) from user_rating_fact;

select * from user_rating_fact where id = 1;

image.png


发现,我们id为1的这条记录的评分,是拿了最新的那个时间的,即评分为4的。


4. 构建Hive表关联HDFS(users表)

a. 前提已启动Sqoop的metastore服务,否则:

sqoop metastore &

b. 创建一个sqoop job

前提:创建之前确保作业已删除,否则请先删除:

sqoop job --delete user_import --meta-connect jdbc:hsqldb:hsql://hadoop01:16000/sqoop


sqoop job --create user_import \
--meta-connect jdbc:hsqldb:hsql://hadoop01:16000/sqoop \
-- import --connect jdbc:mysql://hadoop01:3306/movie \
--username root --password root \
-m 5 --split-by last_modified \
--incremental lastmodified --check-column last_modified \
--query 'SELECT user.id, user.gender, user.zip_code, user.age, occupation.occupation, user.last_modified
FROM user JOIN occupation ON user.occupation_id = occupation.id WHERE $CONDITIONS' \
--fields-terminated-by "\t" \
--target-dir /user/hadoop/movielens/user_stage 


c. 执行user_import任务

前提:确保路径下没内容,如果有要先删除。

hadoop fs -rm -r /user/hadoop/movielens/user_stage

执行任务:

sqoop job --exec user_import --meta-connect jdbc:hsqldb:hsql://hadoop01:16000/sqoop

输入Mysql的密码,即可开始同步数据。

查看数据:

hadoop fs -ls /user/hadoop/movielens/user_stage


image.png


d. 创建一张临时用户表user_stage:

DROP TABLE IF EXISTS user_stage;
CREATE EXTERNAL TABLE user_stage (
    id INT,
    gender STRING,
    zip_code STRING,
    age INT,
    occupation STRING,
    last_modified STRING
)
ROW FORMAT DELIMITED
       FIELDS TERMINATED BY '\t'
LOCATION '/user/hadoop/movielens/user_stage';


e. 创建用户历史表user_rating_history:

CREATE EXTERNAL TABLE IF NOT EXISTS user_history (
    id INT,
    gender STRING,
    zip_code STRING,
    age INT,
    occupation STRING,
    last_modified STRING
)
PARTITIONED BY (year INT, month INT, day INT)
STORED AS AVRO;


f. 将临时表的数据先插入到user_history表:

INSERT INTO TABLE user_history PARTITION(year, month, day)
SELECT *, year(last_modified) as year, month(last_modified) as month, day(last_modified) as day FROM user_stage;


执行完,我们的user_history表的数据就是Mysql未改变前的数据。

g. 创建一张用户表users(不要用user,因为可能会跟内置的用户冲突):


CREATE EXTERNAL TABLE IF NOT EXISTS users (
    id INT,
    gender STRING,
    zip_code STRING,
    age INT,
    occupation STRING,
    last_modified STRING
)
STORED AS PARQUET;


合并修改更新后的user(左连接:排除users表中修改过的数据,等下直接拿user_stage倒进来就可以了)

INSERT OVERWRITE TABLE users 
SELECT users.id, users.gender, users.zip_code, users.age, users.occupation, users.last_modified 
  FROM users LEFT JOIN user_stage ON users.id = user_stage.id WHERE user_stage.id IS NULL;


INSERT INTO TABLE users SELECT id, gender, zip_code, age, occupation, last_modified FROM user_stage;

第一次没有修改,也没有添加,所以users表数据与user_stage一样。


4.1 模拟修改与增量数据操作

a. 更新Mysql的数据:


update user set gender='F', last_modified=now() where id=1;
insert into user values(944, 'F', "345642", 20, 19, now());


b. 同步Mysql数据到HDFS:

hadoop fs -rm -r /user/hadoop/movielens/user_stage

sqoop job --exec user_import --meta-connect jdbc:hsqldb:hsql://hadoop01:16000/sqoop

输入Mysql的密码即可同步!

hadoop fs -cat /user/hadoop/movielens/user_stage/*


image.png


c. 插入数据到user_history:

INSERT INTO TABLE user_history PARTITION(year, month, day) SELECT *, year(last_modified) as year, month(last_modified) as month, day(last_modified) as day FROM user_stage;


d. 合并修改更新后的user

INSERT OVERWRITE TABLE users 
SELECT users.id, users.gender, users.zip_code, users.age, users.occupation, users.last_modified 
  FROM users LEFT JOIN user_stage ON users.id = user_stage.id WHERE user_stage.id IS NULL;


image.png


原本943条条记录,进过上面命令,排除了1条修改过的记录,还有943-1=942条。

e. 插入临时表里面的数据:

INSERT INTO TABLE users SELECT id, gender, zip_code, age, occupation, last_modified FROM user_stage;


image.png


942+2=944条记录。


0xFF 总结


  1. 过程繁琐,写教程难度大,写了好几个钟,辛苦了。
  2. 本教程的特色是加上我们的增量、已经修改增量,比较贴切企业开发每天的数据。
相关实践学习
每个IT人都想学的“Web应用上云经典架构”实战
本实验从Web应用上云这个最基本的、最普遍的需求出发,帮助IT从业者们通过“阿里云Web应用上云解决方案”,了解一个企业级Web应用上云的常见架构,了解如何构建一个高可用、可扩展的企业级应用架构。
MySQL数据库入门学习
本课程通过最流行的开源数据库MySQL带你了解数据库的世界。 &nbsp; 相关的阿里云产品:云数据库RDS MySQL 版 阿里云关系型数据库RDS(Relational Database Service)是一种稳定可靠、可弹性伸缩的在线数据库服务,提供容灾、备份、恢复、迁移等方面的全套解决方案,彻底解决数据库运维的烦恼。 了解产品详情:&nbsp;https://www.aliyun.com/product/rds/mysql&nbsp;
相关文章
|
负载均衡 算法 关系型数据库
大数据大厂之MySQL数据库课程设计:揭秘MySQL集群架构负载均衡核心算法:从理论到Java代码实战,让你的数据库性能飙升!
本文聚焦 MySQL 集群架构中的负载均衡算法,阐述其重要性。详细介绍轮询、加权轮询、最少连接、加权最少连接、随机、源地址哈希等常用算法,分析各自优缺点及适用场景。并提供 Java 语言代码实现示例,助力直观理解。文章结构清晰,语言通俗易懂,对理解和应用负载均衡算法具有实用价值和参考价值。
大数据大厂之MySQL数据库课程设计:揭秘MySQL集群架构负载均衡核心算法:从理论到Java代码实战,让你的数据库性能飙升!
|
8月前
|
SQL 关系型数据库 MySQL
Mysql数据恢复—Mysql数据库delete删除后数据恢复案例
本地服务器,操作系统为windows server。服务器上部署mysql单实例,innodb引擎,独立表空间。未进行数据库备份,未开启binlog。 人为误操作使用Delete命令删除数据时未添加where子句,导致全表数据被删除。删除后未对该表进行任何操作。需要恢复误删除的数据。 在本案例中的mysql数据库未进行备份,也未开启binlog日志,无法直接还原数据库。
|
关系型数据库 MySQL 大数据
大数据新视界--大数据大厂之MySQL 数据库课程设计:MySQL 数据库 SQL 语句调优的进阶策略与实际案例(2-2)
本文延续前篇,深入探讨 MySQL 数据库 SQL 语句调优进阶策略。包括优化索引使用,介绍多种索引类型及避免索引失效等;调整数据库参数,如缓冲池、连接数和日志参数;还有分区表、垂直拆分等其他优化方法。通过实际案例分析展示调优效果。回顾与数据库课程设计相关文章,强调全面认识 MySQL 数据库重要性。为读者提供综合调优指导,确保数据库高效运行。
|
SQL 存储 关系型数据库
MySQL秘籍之索引与查询优化实战指南
最左前缀原则。不冗余原则。最大选择性原则。所谓前缀索引,说白了就是对文本的前几个字符建立索引(具体是几个字符在建立索引时去指定),比如以产品名称的前 10 位来建索引,这样建立起来的索引更小,查询效率更快!
499 22
 MySQL秘籍之索引与查询优化实战指南
|
存储 SQL 关系型数据库
服务器数据恢复—云服务器上mysql数据库数据恢复案例
某ECS网站服务器,linux操作系统+mysql数据库。mysql数据库采用innodb作为默认存储引擎。 在执行数据库版本更新测试时,操作人员误误将在本来应该在测试库执行的sql脚本在生产库上执行,导致生产库上部分表被truncate,还有部分表中少量数据被delete。
350 25
|
SQL 关系型数据库 MySQL
数据库数据恢复——MySQL简介和数据恢复案例
MySQL数据库数据恢复环境&故障: 本地服务器,安装的windows server操作系统。 操作系统上部署MySQL单实例,引擎类型为innodb,表空间类型为独立表空间。该MySQL数据库没有备份,未开启binlog。 人为误操作,在用Delete命令删除数据时未添加where子句进行筛选导致全表数据被删除,删除后未对该表进行任何操作。
|
存储 关系型数据库 MySQL
10个案例告诉你mysql不使用子查询的原因
大家好,我是V哥。上周与朋友讨论数据库子查询问题,深受启发。为此,我整理了10个案例,详细说明如何通过优化子查询提升MySQL性能。主要问题包括性能瓶颈、索引失效、查询优化器复杂度及数据传输开销等。解决方案涵盖使用EXISTS、JOIN、IN操作符、窗口函数、临时表及索引优化等。希望通过这些案例,帮助大家在实际开发中选择更高效的查询方式,提升系统性能。关注V哥,一起探讨技术,欢迎点赞支持!
687 5
|
关系型数据库 MySQL 数据库
数据库数据恢复—MYSQL数据库文件损坏的数据恢复案例
mysql数据库文件ibdata1、MYI、MYD损坏。 故障表现:1、数据库无法进行查询等操作;2、使用mysqlcheck和myisamchk无法修复数据库。
|
安全 关系型数据库 MySQL
PHP与MySQL动态网站开发实战指南####
——深入探索LAMP栈下的高效数据交互与处理技巧 ####
|
关系型数据库 MySQL PHP
PHP与MySQL动态网站开发实战指南####
深入探索PHP与MySQL的协同工作机制,本文旨在通过一系列实战案例,揭示构建高效、稳定且用户友好的动态网站的秘诀。从环境搭建到数据交互,再到最佳实践分享,本文为开发者提供了一条清晰的学习路径,助力其在LAMP(Linux, Apache, MySQL, PHP/Perl/Python)栈上实现技术飞跃。 ####

热门文章

最新文章

推荐镜像

更多