需求背景
一个数据同步的经典场景:MySQL Binlog日志可以通过MaxWell、Canal等工具同步,但是离线数据特别是大批量的数据怎么同步呢?这里就可以用到Sqoop(类似工具DataX等),在传统数据库和Hadoop生态存储系统之间的数据迁移,这是一个非常好用的工具。
产品目标
完成离线数据迁移,拉取业务数据库数据。
版本选择
Sqoop-1.4.6
JDK-1.8
MySQL-5.7
Hadoop-2.6.0
概述
Sqoop是Hadoop生态数据存储系统和传统关系型数据库之间进行数据传输的一种工具,其主要特点包含:
并行导入/导出。 基于Yarn框架,在并行性的基础上提供了容错功能;
所有主要RDBMS数据库的连接器都提供。
导入SQL查询的结果。 在HDFS中可以导入从SQL查询返回的结果。支持insert、update模式,可以选择参数,若内容存在就更新,若不存在就插入
递增加载数据。 可在更新时加载部分表内容,Sqoop提供增量加载功能。
全量加载。 可提供select * from table的全量数据加载功能。
Kerberos安全集成。 支持Kerberos身份验证。
数据可直接加载到HBase或Hive。
支持数据压缩。 可使用default(gzip)和-compress参数压缩数据,也可在Hive加载压缩表
主要使用场景为:“Hadoop和关系型数据库服务器之间传送数据”,用于数据的导入和导出。
导入数据:MySQL、Oracle导入数据到Hadoop的HDFS、Hive、HBase等;
导出数据:从Hadoop的文件系统导出数据到关系型数据库MySQL等。
对比类似的组件DataX,Sqoop的独特之处
DataX的主要特点
1、异构数据库和文件系统之间的数据交换;
2、采用Framework + plugin架构构建,Framework处理了缓冲,流控,并发,上下文加载等高速数据交换的大部分技术问题,提供了简单的接口与插件交互,插件仅需实现对数据处理系统的访问;
3、数据传输过程在单进程内完成,全内存操作,不读写磁盘,也没有IPC;
4、开放式的框架,开发者可以在极短的时间开发一个新插件以快速支持新的数据库/文件系统。
Sqoop和DataX的区别
1、sqoop采用map-reduce计算框架进行导入导出,而datax仅仅在运行datax的单台机器上进行数据的抽取和加载,速度比sqoop慢了许多;
2、sqoop只可以在关系型数据库和hadoop组件之间进行数据迁移,而在hadoop相关组件之间,比如hive和hbase之间就无法使用sqoop互相导入导出数据,同时在关系型数据库之间,比如mysql和oracle之间也无法通过sqoop导入导出数据。与之相反,datax能够分别实现关系型数据库和hadoop组件之间、关系型数据库之间、hadoop组件之间的数据迁移;
3、sqoop专门为hadoop而生,对hadoop支持度好,而datax可能会出现不支持高版本hadoop的现象;
4、sqoop只支持官方提供的指定几种关系型数据库和hadoop组件之间的数据交换,而在datax中,用户只需根据自身需求修改文件,生成相应rpm包,自行安装之后就可以使用自己定制的插件;
性能对比
从MySQl到HDFS传输5000w条数据,分别用Sqoop和DataX导入HDFS。Sqoop的平均速率大概为20MB/s,总时间约100秒;DataX的平均速率大概为4.95MB/s,总时间约400秒。
概念和原理
Sqoop底层实现是MapReduce,Sqoop是依赖于Hadoop的,下图描述了Sqoop的工作流程(主要还是连通传统RDBMS和Hadoop存储系统,在这两个生态之间进行的迁移):
Sqoop导入原理
将数据从关系型数据库导入到Hadoop中:
Sqoop与数据库Server通信,获取数据库表的元数据信息;
Sqoop启动一个Map-Only的MR作业,利用元数据信息并行将数据写入Hadoop。
Sqoop在import时,需要制定split-by参数。Sqoop根据不同的split-by参数值来进行切分,然后将切分出来的区域分配到不同map中。
每个map中再处理数据库中获取的一行一行的值,写入到HDFS中。同时split-by根据不同的参数类型有不同的切分方法,如比较简单的int型,Sqoop会取最大和最小split-by字段值,然后根据传入的num-mappers来确定划分几个区域。 比如select max(split_by),min(split-by) from得到的max(split-by)和min(split-by)分别为1000和1,而num-mappers为2的话,则会分成两个区域(1,500)和(501-100),同时也会分成2个sql给2个map去进行导入操作,分别为select XXX from table where split-by >= 1 and split-by <= 500和select XXX from table where split-by >= 501 and split-by <= 1000。最后每个map各自获取各自SQL中数据进行导入。
大致流程
读取要导入数据的表结构,生成运行类,默认是QueryResult,打成jar包,然后提交给Hadoop
设置好job,就由Hadoop来执行MapReduce来执行Import命令了,
1)首先要对数据进行切分,也就是DataSplit,DataDrivenDBInputFormat.getSplits(JobContext job)
2)切分好范围后,写入范围,以便读取DataDrivenDBInputFormat.write(DataOutput output),这里是lowerBoundQuery and upperBoundQuery
3)读取以上2)写入的范围DataDrivenDBInputFormat.readFields(DataInput input)
4)然后创建RecordReader从数据库中读取数据DataDrivenDBInputFormat.createRecordReader(InputSplit split,TaskAttemptContext context)
5)创建Map,TextImportMapper.setup(Context context)
6)RecordReader一行一行从关系型数据库中读取数据,设置好Map的Key和Value,交给MapDBRecordReader.nextKeyValue()
7)运行Map,mapTextImportMapper.map(LongWritable key, SqoopRecord val, Context context),最后生成的Key是行数据,由QueryResult生成,Value是NullWritable.get() 。
【导入控制】
Sqoop不需要每次都导入整张表。例如,可以指定导入表的部分列。用户也可以在查询中加入WHERE子句(使用—where参数),以此来限定需要导入的记录。
【导入一致性】
在向HDFS导入数据时,重要的是要确保访问的是数据源的一致性快照。保证一致性的最好方法是在导入时不允许运行任何对表中现有数据进行更新的进程。
【增量导入】
定期运行导入时一种很常见的方式,这样做可以使HDFS的数据与数据库的数据保持同步。为此需要识别哪些是新数据。对于某一行来说,只有当特定列(由—check-column参数指定)的值大于指定值(通过—last-value设置)时,Sqoop才会导入该行数据。
【导入大对象】
“内联”存储大对象,它们会严重影响扫描的性能。因此将大对象与它们的行分开存储。由于大对象单条记录太大,无法在内存中实现物化。为了克服这个困难,当导入大对象数据大于阈值16M时(通过sqoop.inline.lob.length.max设置,以字节为单位),Sqoop将导入的大对象存储在LobFile格式的单独文件中。LobFile格式能够存储非常大的单条记录(使用了64位的地址空间),每条记录保存一个大对象。LobFile格式允许客户端持有对记录的引用,而不访问记录内容,对记录的访问通过java.io.InputStream(用于二进制对象)或java.io.Reader(用于字符对象)来实现的。在导入一条记录时,所有的“正常”字段会在一个文本文件中一起被物化,同时还生成一个指向保存CLOB或BLOB列的LobFile文件的引用。
Sqoop导出原理
Sqoop在执行导出操作之前,Sqoop会根据数据库连接字符串来选择一个导出方法,一般为jdbc。然后,Sqoop会根据目标表的定义生成一个java类。这个生成的类能够从文本文件中解析记录,并能够向表中插入类型合适的值。接着会启动一个MapReduce作业,从HDFS中读取源数据文件,使用生成的类解析记录,并且执行选定的导出方法。
基于jdbc的导出方法会产生一批insert语句,每条语句都会向目标表中插入多条记录。多个单独的线程被用于从HDFS读取数据并与数据库进行通信,以确保涉及不同系统的I/O操作能够尽可能重叠执行。
虽然HDFS读取数据的MapReduce作业大多根据所处理文件的数量和大小来选择并行度(Map任务的数量),但Sqoop的导出工具允许用户明确设定任务的数量。由于导出性能会受并行的数据库写入线程数量的影响,所以Sqoop使用CombineFileInput类将输入文件分组分配给少数几个Map任务去执行。
配置
解压sqoop-1.4.6-cdh5.7.0.tar.gz之后,首先需要把mysql的驱动jar包放入sqoop的lib目录下,还要添加环境变量SQOOP_HOME到/etc/profile,然后修改conf/sqoop-env.sh文件,添加各个组件的原始路径(此处附带自己的配置如下):
#Set path to where bin/hadoop is available export HADOOP_COMMON_HOME=/Users/hiwes/app/hadoop-2.6.0-cdh5.7.0 #Set path to where hadoop-*-core.jar is available export HADOOP_MAPRED_HOME=/Users/hiwes/app/hadoop-2.6.0-cdh5.7.0 #set the path to where bin/hbase is available export HBASE_HOME=/Users/hiwes/app/hbase-1.2.0-cdh5.7.0 #Set the path to where bin/hive is available export HIVE_HOME=/Users/hiwes/app/hive-1.1.0-cdh5.7.0 #Set the path for where zookeper config dir is export ZOOCFGDIR=/Users/hiwes/app/zookeeper-3.4.5-cdh5.7.0 export ACCUMULO_HOME=/Users/hiwes/app/sqoop-1.4.6-cdh5.7.0/tmp/accumulo export HCAT_HOME=/Users/hiwes/app/sqoop-1.4.6-cdh5.7.0/tmp/hcatalog export ZOOKEEPER_HOME=/Users/hiwes/app/zookeeper-3.4.5-cdh5.7.0
注意观察sqoop-site.xml文件,这个文件中其实很多都是默认配置选项,其实是不需要另外改动的。
使用
1、列出mysql数据库中所有数据库。 # bin/sqoop list-databases -connect jdbc:mysql://hiwes:3306/ -username root -password root 2、列出指定数据库下所有数据表。 # bin/sqoop list-tables -connect jdbc:mysql://hiwes:3306/test -username root -password root 3、复制关系型数据库中表结构到hive,只复制表结构不复制内容。 # bin/sqoop create-hive-table -connect jdbc:mysql://hiwes:3306/test -username root -password root -table user -hive-database test -hive-table user 4、从关系型数据库导入数据到hive中(hive可以不提前创建表)。 # bin/sqoop import -connect jdbc:mysql://hiwes:3306/test -username root -password root -table user -hive-import -hive-database test -hive-table user -fields-terminated-by ',' -hive-overrite -m 1 5、将hive数据导入到关系型数据库,在导入前mysql表需要提前创建。 # bin/sqoop export -connect jdbc:mysql://hiwes:3306/test_canal -username root -password root -database test_canal -table user --fields-terminated-by ',' -export-dir /user/hive/warehouse/test.db/user/part-m-00000 6、从数据库导出表的数据到HDFS上文件。 # bin/sqoop import -connect jdbc:mysql://hiwes:3306/test -username root -password root -table user -m 1 -hive-overwrite -fields-terminated-by '|' -target-dir /user/test 7、从数据库增量导入表数据到HDFS中。 # bin/sqoop import --connect jdbc:mysql://hiwes:3306/test -username root -password root -table HADOOP_USER_INFO -m 1 -target-dir /uset/test -check-column id -incremental append -last-value 5 # 最终会把最终值5之后的数据导入到hdfs。 其他使用待续,后续内容会追加
其他常见参数:
import:从MySQL导入到HDFS文件系统数据
–connect:数据库JDBC连接字符串
–username:数据库用户名
–password:数据库密码
–table:数据库表名
–columns:数据库列名
–where: 查询条件
–query: 指定查询sql
–delete-target-dir: 导入后删除hdfs的目录
–num-mappers n: 指定map数量=n,可以简写为 -m n
–hive-import: 导入hive
–hive-database: hive的database
–hive-table: hive表
–hive-partition-key: 分区字段
–hive-partition-value: 分区值
–hive-overwrite: 覆盖数据
实际上import命令,从MySQL导入到HDFS文的背后依然是执行的MapReduce。执行完map后,又执行了load data。
如果导出的数据库是mysql 则可以添加一个 属性 –direct ,加了 direct 属性在导出mysql数据库表中的数据会快一点 执行的是mysq自带的导出功能
常见错误
1、hdfs文件的权限问题。
根据个人权限来考虑问题,比如你只是hdfs使用者之一,那么就只能用hdfs用户执行命令,所以在~/.bash_profile文件中添加:
export HADOOP_USER_NAME=hdfs
2、文件格式问题。
hive sequencefile导入文件遇到FAILED: SemanticException Unable to load data to destination table. Error: The file that you are trying to load does not match the file format of the destination table.错误
原因:因为SequenceFile的表不能用load加载数据,只能导入Sequence类型的数据
解决办法:
① 先创建一个临时表(save as textfile),然后将数据导入;
② 然后再导入这个表里 insert into table test_sq select * from test_text;
3、报错:ERROR tool.ImportTool: Error during import: No primary key could be found for table TRANS_GJJY02. Please specify one with –split-by or perform a sequential import with ‘-m 1’.
解决方案:
① 老老实实在表里面加主键,然后再执行导入语句,就不会出错;
② 有的数据无法设置主键,比如监测记录数据找不到唯一值,就根据上面的提示处理:
2.1)将map个数设置为1:Sqoop默认为4;
-m 1
2.2)使用-split-by,后面跟上表的最后一列名字,从而对数据进行分行(推荐使用):
-split-by column1
4、Output directory already exists错误。
解决办法:增加配置参数 -delete-target-dir \
总结
Sqoop在对Hadoop生态和传统数据库之间的数据迁移做的是非常好的,但是代价也是仅仅基于Hadoop生态做得好。其实也很正常,这种组件你不可能要求所有的功能全都实现,一站式服务直接在所有存储组件里面进行数据迁移,基于不同的优先级,考虑最适合自己的组件即可,而且最关键的Sqoop简单啊,虽然底层是封装的MR任务,但是整体的速度还是比较快的。所以:当前场景下我们要同步业务库历史数据到Hive或HBase的时候,是可以直接使用Sqoop来完成迁移,效率还是能满足当前的业务需求的。