Sqoop操作实践

本文涉及的产品
云原生数据库 PolarDB MySQL 版,通用型 2核4GB 50GB
云原生数据库 PolarDB PostgreSQL 版,标准版 2核4GB 50GB
简介: 版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/qq1010885678/article/details/50504925 Sqoo...
版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/qq1010885678/article/details/50504925

Sqoop操作实践

@(Hadoop)


Sqoop常用参命令

序号 命令/command 说明
1 impor ImportTool 从关系型数据库中导入数据(来自表或者查询语句)到HDFS中
2 export ExportTool 将HDFS中的数据导入到关系型数据库中
3 codegen CodeGenTool 获取数据库中某张表数据生成Java并打成jar包
4 create-hive-table CreateHiveTableTool 创建Hive表
5 eval EvalSqlTool 查看SQL执行结果
6 import-all-tables ImportAllTablesTool 导入某个数据库下所有表到HDFS中
7 job JobTool
8 list-databases ListDatabasesTool 列出所有数据库名
9 list-tables ListTablesTool 列出某个数据库下所有表
10 merge MergeTool
11 metastore MetastoreTool
12 help HelpTool 查看帮助
13 version VersionTool 查看版本

通用参数

序号 参数 说明 样例
1 connect 连接关系型数据库的URL jdbc:mysql://localhost/sqoop_datas
2 connection-manager 连接管理类,一般不用
3 driver 连接驱动
4 hadoop-home hadoop目录 /home/guoyun/hadoop
5 help 查看帮助信息
6 password 连接关系型数据库的密码
7 username 链接关系型数据库的用户名
8 verbose 查看更多的信息,其实是将日志级别调低

–where和–query导入部分表数据

Sqoop的–import使用–table指定表之后,默认是导入该表的全部数据,有时候我们可能只需要表其中一部分的数据,或者仅仅是导入小部分数据作为测试,那么可以使用–where和–query参数来进行条件限定。

–where:例如”id<100”,只导入该表id小于100的数据,和sql的where条件是一样的。

–query:引号中的是SQL语句,SQL执行的结果就是要导入的数据,必须和–target-dir一起使用。

增量导入

以下三个参数必须同时指定:

–check-column (col):检查指定的列,根据此列判断哪些记录是新数据且需要导入的,列不能是字符相关类型(CHAR/NCHAR/VARCHAR/VARNCHAR/ LONGVARCHAR/LONGNVARCHAR),一般为数据库中的关键字。
–incremental (mode):指定增量模式,mode包含两种方式,append和lastmodified。

  • 当表中的记录是以id持续增加导入新的记录的时候,可以使用append模式,–check-column id 用于检查id。
  • lastmodified: 表有时候也会执行更新操作,此时可以使用lastmodified导入。

–last-value (value): –check-column的某个值,将大于该值的检查列记录导入,以确定仅将新的或者更新后的记录导入新的文件系统。

和–append参数的区别:
–append导入的时候不会再次创建新的HDFS目录(使用普通命令的话,会创建一个新的HDFS目录,如果该目录已存在则会失败),该命令会直接在已存在的目录下继续导入数据,但是不管数据是否重复。

-import-all-tables导入多表

导入的每个表数据被分别存储在以表名命名的HDFS上的不同目录中。
使用该命令以下三个条件必须同时满足:

  • 1、每个表必须都只有一个列作为主键;
  • 2、必须将每个表中所有的数据导入,而不是部分;
  • 3、必须使用默认分隔列,且WHERE子句无任何强加的条件

–table, –split-by, –columns, 和 –where参数在sqoop-import-all-tables命令中是不合法的。

也就是说,使用-import-all-tables就无法使用增量导入和部分导入了。

–exclude-tables:默认是导入该数据库的全部表,如果只想导入部分表,可以使用该参数将不想导入的表排除掉。

map并行任务数

Sqoop并行导入原理:
默认情况下map的任务数是4,假设导入的表主键为id,那么Sqoop会先进行下面这样一个查询。

select max(id) as max, select min(id) as min from table [where 如果指定了where子句];

通过这个查询,获取到需要拆分字段(id)的最大值和最小值,假设分别是1和1000。

然后,Sqoop会根据需要并行导入的数量,进行拆分查询,比如上面的这个例子,并行导入将拆分为如下4条SQL同时执行:

select * from table where 0 <= id < 250;
select * from table where 250 <= id < 500;
select * from table where 500 <= id < 750;
select * from table where 750 <= id < 1000;

注意,这个拆分的字段需要是整数,使用–split-by参数进行指定。
从上面的例子可以看出,如果需要导入的表没有主键,我们应该如何手动选取一个合适的拆分字段,以及选择合适的并行数。

map的任务数不超过集群可以用的mr并行度(节点数),不超过数据库能性能影响的极值。

测试

#测试增量导入
#第一次导入RECORD_NO<100的数据
sqoop import --connect jdbc:oracle:thin:@ip:port:database --username uid --password pwd --target-dir /oracle/$database -m 1 --table tableName  --fields-terminated-by '\001' --where "RECORD_NO<100"

#第二次增量导入RECORD_NO<200的数据,从RECORD_NO=99开始
sqoop import --check-column RECORD_NO --incremental append --last-value 99 --connect jdbc:oracle:thin:@ip:port:database --username uid --password pwd --target-dir /oracle/$database -m 1 --table tableName  --fields-terminated-by '\001' --where "RECORD_NO<200"

#第三次增量导入全部数据,从RECORD_NO=199开始
sqoop import --check-column RECORD_NO --incremental append --last-value 199 --connect jdbc:oracle:thin:@ip:port:database --username uid --password pwd --target-dir /oracle/$database -m 1 --table tableName  --fields-terminated-by '\001' --split-by RECORD_NO

#测试多表导入
sqoop import-all-tables --connect jdbc:oracle:thin:@ip:port:database --username uid --password pwd --target-dir /oracle/$database -m 8 --fields-terminated-by '\001' --exclude-tables excludeTablesName

#单表导入全部数据,使用并行导入,指定分割列
sqoop import --connect jdbc:oracle:thin:@ip:port:database --username uid --password pwd --target-dir /oracle/$database -m 1 --table tableName  --fields-terminated-by '\001' --split-by PAYMENT_ID

测试失败,待重试

定时导入脚本实现

#!/bin/bash

#Oracle的连接字符串,其中包含了Oracle的地址,SID,和端口号
url=jdbc:oracle:thin:@ip:port:database
#使用的用户名
uid=username
#使用的密码
pwd=password
#需要从Oracle中导入的表名
tableName=tableName
#需要从Oracle中导入的表中的字段名
columns=columns
#将Oracle中的数据导入到HDFS后的存放路径
hdfsPath=path/$tableName

#执行导入逻辑。将Oracle中的数据导入到HDFS中
sqoop import --connect $url --username $uid --password $pwd --target-dir $hdfsPath  --m 1 --table $tableName --columns $columns --fields-terminated-by '\001'

设置定时执行

#编辑cron文件,设置定时执行
crontab -e

#文件内容,每天凌晨1点执行数据导入脚本
* 1 * * * 脚本所在路径

#保存退出之后即可

关于crontab一些知识请看:
linux设置定制器自动执行任务

注意事项

  • 导入关系型数据库的数据时,确保集群上的所有节点都能连接到对应数据库服务器的IP和端口号!
  • 用户名和表名一定要大写!

可以先使用list-tables测试能否连通再执行mr导入数据
统计导入的数据行数:

hadoop fs -cat  /导入的文件 | wc -l 

导入HBase示例

#使用query参数自定义结果集,全量导入测试通过: 
sqoop import  --connect jdbc:oracle:thin:@ip:port:database --username UID --password pwd --query 'select reverse(t.op) as ROWKEY from  table t WHERE (1=1)  and $CONDITIONS ' --hbase-table test  --column-family test  --hbase-row-key ROWKEY --hbase-create-table -m 8 --split-by ORDER_ID

增量导入测试通过:
sqoop import  --connect jdbc:oracle:thin:@ip:port:database --username UID --password pwd --query 'select reverse(t.op) as ROWKEY from  table t WHERE (1=1)  and $CONDITIONS and t.ID<23' --hbase-table testapp  --column-family test  --hbase-row-key ROWKEY --hbase-create-table -m 8 --split-by ID --hbase-create-table

sqoop import  --connect jdbc:oracle:thin:@ip:port:database --username UID --password pwd --query 'select reverse(t.op) as ROWKEY from  table t WHERE (1=1)  and $CONDITIONS and t.ID<227840' --hbase-table test  --column-family test  --hbase-row-key ROWKEY --hbase-create-table -m 8 --split-by ID --check-column ID --incremental append --last-value 22

注意事项:

  • 使用query参数时,如果sql中包含单引号,那么整个sql需要用双引号包裹起来,$CONDITIONS参数需要用\进行转义,变为\$CONDITIONS,否则会报错。
  • 增量导入的时候check-column指定为数字类型的字符串失败,需要使用数字类型。

1.27更新

检查发现导入hdfs的很多数据和oracle中的对不上,排查了很久发现是-m 8,这个并行量设置的问题(-m 1使用一个map进行导入是正确的数据)。

猜测,可能是–split-by设置的字段造成导入的时候有的数据没有导入,有的数据重复导入,但是我使用的是rownum这个内置的变量,理应是没错的,不得其解。

作者:@小黑

相关实践学习
lindorm多模间数据无缝流转
展现了Lindorm多模融合能力——用kafka API写入,无缝流转在各引擎内进行数据存储和计算的实验。
云数据库HBase版使用教程
&nbsp; 相关的阿里云产品:云数据库 HBase 版 面向大数据领域的一站式NoSQL服务,100%兼容开源HBase并深度扩展,支持海量数据下的实时存储、高并发吞吐、轻SQL分析、全文检索、时序时空查询等能力,是风控、推荐、广告、物联网、车联网、Feeds流、数据大屏等场景首选数据库,是为淘宝、支付宝、菜鸟等众多阿里核心业务提供关键支撑的数据库。 了解产品详情:&nbsp;https://cn.aliyun.com/product/hbase &nbsp; ------------------------------------------------------------------------- 阿里云数据库体验:数据库上云实战 开发者云会免费提供一台带自建MySQL的源数据库&nbsp;ECS 实例和一台目标数据库&nbsp;RDS实例。跟着指引,您可以一步步实现将ECS自建数据库迁移到目标数据库RDS。 点击下方链接,领取免费ECS&amp;RDS资源,30分钟完成数据库上云实战!https://developer.aliyun.com/adc/scenario/51eefbd1894e42f6bb9acacadd3f9121?spm=a2c6h.13788135.J_3257954370.9.4ba85f24utseFl
相关文章
|
8月前
|
SQL 关系型数据库 MySQL
Sqoop【付诸实践 01】Sqoop1最新版 MySQL与HDFS\Hive\HBase 核心导入导出案例分享+多个WRAN及Exception问题处理(一篇即可学会在日常工作中使用Sqoop)
【2月更文挑战第9天】Sqoop【付诸实践 01】Sqoop1最新版 MySQL与HDFS\Hive\HBase 核心导入导出案例分享+多个WRAN及Exception问题处理(一篇即可学会在日常工作中使用Sqoop)
312 7
|
SQL Oracle 关系型数据库
Sqoop 数据导入导出实践
Sqoop是一个用来将hadoop和关系型数据库中的数据相互转移的工具,可以将一个关系型数据库(例如:mysql,oracle,等)中的数据导入到hadoop的HDFS中,也可以将HDFS的数据导入到关系型数据库中。
1930 0
|
关系型数据库 MySQL Java
sqoop操作与使用
版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/qq1010885678/article/details/44543931 sqoop只要安装到集群中的一台节点就可以了 1.
1063 0
|
8月前
|
SQL 分布式计算 监控
Sqoop数据迁移工具使用与优化技巧:面试经验与必备知识点解析
【4月更文挑战第9天】本文深入解析Sqoop的使用、优化及面试策略。内容涵盖Sqoop基础,包括安装配置、命令行操作、与Hadoop生态集成和连接器配置。讨论数据迁移优化技巧,如数据切分、压缩编码、转换过滤及性能监控。此外,还涉及面试中对Sqoop与其他ETL工具的对比、实际项目挑战及未来发展趋势的讨论。通过代码示例展示了从MySQL到HDFS的数据迁移。本文旨在帮助读者在面试中展现Sqoop技术实力。
582 2
|
数据采集 SQL 分布式计算
数据处理 、大数据、数据抽取 ETL 工具 DataX 、Kettle、Sqoop
数据处理 、大数据、数据抽取 ETL 工具 DataX 、Kettle、Sqoop
1480 0
|
3月前
|
SQL 分布式计算 关系型数据库
Hadoop-21 Sqoop 数据迁移工具 简介与环境配置 云服务器 ETL工具 MySQL与Hive数据互相迁移 导入导出
Hadoop-21 Sqoop 数据迁移工具 简介与环境配置 云服务器 ETL工具 MySQL与Hive数据互相迁移 导入导出
108 3
|
3月前
|
SQL 分布式计算 关系型数据库
Hadoop-24 Sqoop迁移 MySQL到Hive 与 Hive到MySQL SQL生成数据 HDFS集群 Sqoop import jdbc ETL MapReduce
Hadoop-24 Sqoop迁移 MySQL到Hive 与 Hive到MySQL SQL生成数据 HDFS集群 Sqoop import jdbc ETL MapReduce
116 0
|
3月前
|
SQL 分布式计算 关系型数据库
Hadoop-23 Sqoop 数据MySQL到HDFS(部分) SQL生成数据 HDFS集群 Sqoop import jdbc ETL MapReduce
Hadoop-23 Sqoop 数据MySQL到HDFS(部分) SQL生成数据 HDFS集群 Sqoop import jdbc ETL MapReduce
54 0