什么是ETL:
1、ETL:E(抽取),T(转换与清洗),L(装载到HDFS/HIVE)
2、ETL很难做到自动化,只能靠工程师参与
3、ETL实现技术:
1)商业工具,开源工具(kettle、sqoop、flume)
2)SQL语句(select-->SQL函数-->insert)
3)Python/Java操作数据库/文件<-->HDFS/HIVE
4、Sqoop:RDBMS<-->HDFS/HIVE;Flume:(实时采集)日志文件-->HDFS/HIVE
sqoop环境搭建
进入root管理员用户 su root cd /usr/local
启动所有配置的环境变量:source /etc/profile
启动Hadoop:start-all.sh
复制压缩包到当前路径:cp /media/sf_linux_share/sqoop-1.4.7.bin__hadoop-2.6.0.tar.gz ./
复制/java-json.jar到本地/usr/local:cp /media/sf_linux_share/java-json.jar .
解压解包:tar zxf sqoop-1.4.7.bin__hadoop-2.6.0.tar.gz
起别名:mv sqoop-1.4.7.bin__hadoop-2.6.0 sqoop
配置环境变量:
gedit /etc/profile
加入下面代码:
Export SQOOP_HOME=/usr/local/sqoop
Export PATH=$SQOOP_HOME/bin:$PATH
export CLASSPATH=.:$JAVA_HOME/lib:$JRE_HOME/lib:/usr/local
再次刷新保存环境变量:source /etc/profile
复制架包到sqoop/lib 目录:
cp /media/sf_linux_share/mysql-connector-java-5.1.47.jar $SQOOP_HOME/lib
cp /media/sf_linux_share/java-json.jar $SQOOP_HOME/lib
复制sqoop文件模板:
cp sqoop/conf/sqoop-env-template.sh sqoop/conf/sqoop-env.sh
gedit /usr/local/sqoop/conf/sqoop-env.sh
export HADOOP_COMMON_HOME=/usr/local/hadoop3
export HADOOP_MAPRED_HOME=/usr/local/hadoop3
export HIVE_HOME=/usr/local/hive3
Sqoop进行全量导入
(敏捷迭代时构建数据仓库)
进入MySQL建库建表:
create database sqoop_pro_db;
use sqoop_pro_db;
create table append_test(id int(10),name varchar(255));
show tables;
desc append_test;
插入数据:
insert into append_test(id,name) values(1,'lxt001');
insert into append_test(id,name) values(2,'lxt002');
insert into append_test(id,name) values(3,'lxt003');
强制性递归删除:hdfs dfs -rm -f -r /user/root/append_test
全量导入:
先将数据导入hdfs,再通过hive下建表结构完成数据的导入
sqoop import --connect jdbc:mysql://localhost:3306/sqoop_pro_db --username root --password 123456 --table append_test --fields-terminated-by ',' -m 1 --bindir ./
在Hive中创建外部表:(将MySQL的数据都导入到HDFS的hive上面)
create external table append_test(id int,name string) row format delimited fields terminated by ',' location '/user/root/append_test';
Select * from append_test;
指定导入路径--target-dir:
sqoop import --connect jdbc:mysql://localhost:3306/sqoop_pro_db --username root --password 123456 --table append_test --fields-terminated-by ',' -m 1 --bindir ./ --target-dir /xhb001
每天产生的新数据就增量导入
Sqoop支持增量导入的参数:
--incremental append/lastmodified
--check-column 设置监控列
--last-value 设置导入的起点
基于递增列的增量数据导入(Append方式),不会导入更新的数据行。把新插入的数据当作增量,更新过的数据不会增量导入。(触发器等机制监控update)
sqoop import --connect jdbc:mysql://localhost:3306/sqoop_pro_db --username root --password 123456 --table append_test -m 1 --incremental append --check-column id --last-value 0
append方式增量导入问题:这是新增数据才能导入。修改的数据无法导入
基于时间列的数据增量导入(LastModified方式)( 必须要有时间戳列) lastmodified增量导入
1)MySQL中建表
CREATE TABLE lastModifyTest (id INT,name VARCHAR (20),last_mod TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP
);
2)insert into lastModifyTest(id,name) values(1,'enzo');
insert into lastModifyTest(id,name) values(2,'din');
insert into lastModifyTest(id,name) values(3,'fz');
insert into lastModifyTest(id,name) values(4,'dx');
insert into lastModifyTest(id,name) values(5,'ef');
3)导入到HDFS
sqoop import --connect jdbc:mysql://localhost:3306/sqoop_pro_db --username root --P --table lastModifyTest -m 1 --target-dir /bigbi/last_mod --columns id,name,last_mod --incremental lastmodified --check-column last_mod --last-value "2020-09-19 14:50:07" --append --bindir ./
--P:输入密码
--columns id,name,last_mod:设置抽取的列与顺序
--incremental lastmodified:
--append:设置为追加数据
--bindir ./:绑定目录
hdfs dfs -ls /bigbi/last_mod
hdfs dfs -cat /bigbi/last_mod/part-m-00000
在hive里创建一个外部表
create external table last_mod(id int,name string,last_modify string) row format delimited fields terminated by ',' location '/bigbi/last_mod';
查询:select * from last_mod;
查验:在MySQL端进行以下操作
删除:delete from lastModifyTest where id=2;
更新:update lastModifyTest set name='diaa' where id=1;
插入:insert into lastModifyTest(id,name) values(6,'lxt006');
查询:select * from lastModifyTest;
再次执行一次增量导入的命令
发现原数据会保留 二次执行的数据再后面以更新后的姿态显示
自动化增量导入
创建Sqoop job任务,名字increment_import_job(job会自动记录last-value)
新建job代码:
sqoop job --create increment_import_job -- import --connect jdbc:mysql://localhost:3306/sqoop_pro_db --username root --P --table lastModifyTest -m 1 --target-dir /bigbi/last_mod --columns id,name,last_mod --incremental lastmodified --check-column last_mod --last-value "2020-09-19 14:50:07" --append --bindir ./
job管理:
将job列出来“sqoop job --list
显示指定job任务的详细信息:sqoop job --show myjob
执行任务: sqoop job --exec increment_import_job
删除任务:sqoop job --delete increment_import_job
Shell脚本自动化增量导入
编写myjob.log:
生成空文件:touch myjob.log
编写shell脚本(gedit myjob.sh)
!/bin/sh (#!简单的注释,告诉我们是在哪里运行的shell)
current_time=$(date +%Y%m%d%H%M%S) ###获取系统当前时间
shell use absulute path
echo 'start import.............'>>/usr/local/sqoop_pro/myjob.log #####开始导入
echo $current_time >> /usr/local/sqoop_pro/myjob.log ###将获取到的时间重定向到日志文件
echo ............................>>/usr/local/sqoop_pro/myjob.log
echo 'import success.............'>>/usr/local/sqoop_pro/myjob.log #####导入成功
在shell脚本下添加以下命令,执行我们创建的sqoop job的任务:
/usr/local/sqoop/bin/sqoop job --exec increment_import_job
授予可执行权限:chmod 777 myjob.sh
在当前目录下执行:./myjob.sh
创建Sqoop任务时候的明文密码会被忽略,要手动输入密码,这个需要解决才能定时执行(无人值守)
设置conf/sqoop-site.xml文件, gedit /usr/local/sqoop1/conf/sqoop-site.xml添加:
<name>sqoop.metastore.client.record.password</name>
<value>true</value>
本地执行一次:./myjob.sh
会保存输入的密码,之后就无需输入密码了(验证方法:执行第一次./myjob.sh,然后第二次再次执行./myjob.sh无需让你输入密码表示成功设置。)
设置定时增量导入
增量导入每天都会产生新数据,人工导入过于麻烦,所以要用shell设置定时增量导入。
定时执行,自动化执行:
1:cron(myjob.cron):设置定时任务(cron最流行的)
在myjob.cron文件设置cron表达式去调用shell脚本
2:shell脚本(myjob.sh):(重点任务),把系统级别的一些任务,ETL任务可以放到脚本
Python脚本实现业务级别的任务,自动化增量导入用的是shell脚本。
3:日志文件(myjob.log):记录任务执行情况
编写cron文件来定时调度:
添加定时任务到系统:crontab myjob.cron
列出当前用户的定时任务:crontab -l
查看任务完成日志:cat myjob.log
删除当前用户的定时任务:crontab -r
编辑:gedit myjob.cron
1 45 12 * 、/bin/bash/usr/local/sqoop_pro/myjob.sh>>/usr/local/sqoop_pro/myjob.log 2>&1
12:45分执行
查看一下日志文件:cat myjob.jog
Sqoop导出Hive数据到MySQL
1.mysql> create table last_mod_3(id varchar(50),name varchar(50),last_mod varchar(50));
** 注意,为了兼容Hive的格式,需要将id、last_mod列改成varchar类型。
2.执行下面sqoop export命令会出现java.lang.ClassNotFoundException,拷贝Sqoop生成的jar文件到sqoop/lib目录中即可。
找到生成的jar包:find / -name last_mod_2.jar
拷贝到sqoop/lib目录可:cp /usr/local/last_mod_2.jar sqoop/lib
- 执行导出:
sqoop export --connect jdbc:mysql://localhost:3306/sqoop_pro_db --username root --password 123456 --table last_mod_3 --export-dir /bigbi/last_mod --input-fields-terminated-by ',' --input-lines-terminated-by '\n' --input-null-string '\N' --input-null-non-string '\N'
--table 设置接收数据的MySQL表last_mod_2
--export-dir 设置外部表对应的HDFS目录 /bigbi/last_mod
--input-lines-terminated-by '\n' 设置行分隔符
--input-null-string '\N' --input-null-non-string '\N' 设置空值处理(null)