HBase数据迁移详解!

简介: 笔记

一、数据迁移方案


数据迁移,更多的场景是外部的数据源如何将数据写入到HBase

1.数据库RDBMS

1)sqoop 
2)kettle ETL工具
3)其他方式
  **写程序
  **导出文件加载

2.数据文件(log)

1)flume:实时数据收集,将数据的数据插入到HBase
  source,channel,sink
2)MapReduce
  input file -> mr  -> hbase table
3)completebulkload(常用)
  input file -> mr  -> hfile -> completebulkload -> hbase table


二、数据迁移实施


(1)通过importtsv命令

通过importtsv命令,将文件直接导入到HBase

importtsv -Dimporttsv.columns=a,b,c <tablename> <inputdir>

准备数据:stu.tsv

0001    henry   20      city-1
0002    cherry  30      city-2
0003    alex    29      city-3

将数据放在HDFS上

bin/hdfs dfs -put /opt/datas/stu.tsv /user/caizhengjie/datas
export HADOOP_HOME=/opt/modules/hadoop
export HBASE_HOME=/opt/modules/hbase
HADOOP_CLASSPATH=`${HBASE_HOME}/bin/hbase classpath` \
${HADOOP_HOME}/bin/hadoop jar ${HBASE_HOME}/lib/hbase-server-1.2.0-cdh5.9.3.jar \
importtsv \
-Dimporttsv.columns=HBASE_ROW_KEY,info:username,info:age,info:address \
stutsv \
hdfs://bigdata-pro-m01:9000/user/caizhengjie/datas/stu.tsv

运行结果:

hbase(main):004:0> scan 'stutsv'
ROW                                  COLUMN+CELL                                                                                            
 0001                                column=info:address, timestamp=1605020174889, value=city-1                                             
 0001                                column=info:age, timestamp=1605020174889, value=20                                                     
 0001                                column=info:username, timestamp=1605020174889, value=henry                                             
 0002                                column=info:address, timestamp=1605020174889, value=city-2                                             
 0002                                column=info:age, timestamp=1605020174889, value=30                                                     
 0002                                column=info:username, timestamp=1605020174889, value=cherry                                            
 0003                                column=info:address, timestamp=1605020174889, value=city-3                                             
 0003                                column=info:age, timestamp=1605020174889, value=29                                                     
 0003                                column=info:username, timestamp=1605020174889, value=alex    


(2)通过importtsv命令+completebulkload

importtsv  -Dimporttsv.bulk.output=/path/for/output

通过此命令,我们可以将外部的数据文件直接生成一个HFfile文件,然后通过completebulkload直接加载到HBase数据表中

执行流程:

log文件  ->  HFfile文件  ->HBase table表中

第一步:

export HADOOP_HOME=/opt/modules/hadoop
export HBASE_HOME=/opt/modules/hbase
HADOOP_CLASSPATH=`${HBASE_HOME}/bin/hbase classpath` \
${HADOOP_HOME}/bin/hadoop jar ${HBASE_HOME}/lib/hbase-server-1.2.0-cdh5.9.3.jar \
importtsv \
-Dimporttsv.columns=HBASE_ROW_KEY,info:username,info:age,info:address \
-Dimporttsv.bulk.output=hdfs://bigdata-pro-m01:9000/user/caizhengjie/hfoutput \
stutsv \
hdfs://bigdata-pro-m01:9000/user/caizhengjie/datas/stu.tsv

这时在HDFS上会生成HFfile文件,会放在hfoutput里面

第二步:

export HADOOP_HOME=/opt/modules/hadoop
export HBASE_HOME=/opt/modules/hbase
HADOOP_CLASSPATH=`${HBASE_HOME}/bin/hbase classpath` \
${HADOOP_HOME}/bin/hadoop jar ${HBASE_HOME}/lib/hbase-server-1.2.0-cdh5.9.3.jar completebulkload \
hdfs://bigdata-pro-m01:9000/user/caizhengjie/hfoutput \
stutsv

这时HDFS上的HFfile文件会写入到HBase中

运行结果:

hbase(main):003:0> scan 'stutsv'
ROW                                  COLUMN+CELL                                                                                            
 0001                                column=info:address, timestamp=1605091109579, value=city-1                                             
 0001                                column=info:age, timestamp=1605091109579, value=20                                                     
 0001                                column=info:username, timestamp=1605091109579, value=henry                                             
 0002                                column=info:address, timestamp=1605091109579, value=city-2                                             
 0002                                column=info:age, timestamp=1605091109579, value=30                                                     
 0002                                column=info:username, timestamp=1605091109579, value=cherry                                            
 0003                                column=info:address, timestamp=1605091109579, value=city-3                                             
 0003                                column=info:age, timestamp=1605091109579, value=29                                                     
 0003                                column=info:username, timestamp=1605091109579, value=alex                                              
3 row(s) in 0.1060 seconds


(3)不同文件中数据分割符的处理

在实际业务中,我们的数据不可能都是tab键分割开来的,也会出现csv格式的文件,那么我们会根据不同的业务场景,对不同文件中数据分割符做处理。

'-Dimporttsv.separator=|'

准备数据:按照逗号分隔的数据

0001,henry,20,city-1
0002,cherry,30,city-2
0003,alex,28,city-3
0004,lili,35,city-4
0005,jack,18,city-5

将数据上传到HDFS

bin/hdfs dfs -put /opt/datas/stu.csv /user/caizhengjie/datas

第一步:生成HFfile文件

export HADOOP_HOME=/opt/modules/hadoop
export HBASE_HOME=/opt/modules/hbase
HADOOP_CLASSPATH=`${HBASE_HOME}/bin/hbase classpath` \
${HADOOP_HOME}/bin/hadoop jar ${HBASE_HOME}/lib/hbase-server-1.2.0-cdh5.9.3.jar \
importtsv \
-Dimporttsv.columns=HBASE_ROW_KEY,info:username,info:age,info:address \
-Dimporttsv.bulk.output=hdfs://bigdata-pro-m01:9000/user/caizhengjie/hfcsv \
-Dimporttsv.separator=, \
stutsv \
hdfs://bigdata-pro-m01:9000/user/caizhengjie/datas/stu.csv

HFfile文件会放在HDFS的/user/caizhengjie/hfcsv目录下

第二步:加载数据,将HFfile文件写入到HBase中

export HADOOP_HOME=/opt/modules/hadoop
export HBASE_HOME=/opt/modules/hbase
HADOOP_CLASSPATH=`${HBASE_HOME}/bin/hbase classpath` \
${HADOOP_HOME}/bin/hadoop jar ${HBASE_HOME}/lib/hbase-server-1.2.0-cdh5.9.3.jar completebulkload \
hdfs://bigdata-pro-m01:9000/user/caizhengjie/hfcsv \
stutsv

运行结果:

hbase(main):007:0> scan 'stutsv'
ROW                                  COLUMN+CELL                                                                                            
 0001                                column=info:address, timestamp=1605096866374, value=city-1                                             
 0001                                column=info:age, timestamp=1605096866374, value=20                                                     
 0001                                column=info:username, timestamp=1605096866374, value=henry                                             
 0002                                column=info:address, timestamp=1605096866374, value=city-2                                             
 0002                                column=info:age, timestamp=1605096866374, value=30                                                     
 0002                                column=info:username, timestamp=1605096866374, value=cherry                                            
 0003                                column=info:address, timestamp=1605096866374, value=city-3                                             
 0003                                column=info:age, timestamp=1605096866374, value=28                                                     
 0003                                column=info:username, timestamp=1605096866374, value=alex                                              
 0004                                column=info:address, timestamp=1605096866374, value=city-4                                             
 0004                                column=info:age, timestamp=1605096866374, value=35                                                     
 0004                                column=info:username, timestamp=1605096866374, value=lili                                              
 0005                                column=info:address, timestamp=1605096866374, value=city-5                                             
 0005                                column=info:age, timestamp=1605096866374, value=18                                                     
 0005                                column=info:username, timestamp=1605096866374, value=jack    


(4)自定义MR程序生成HFfile文件(企业常用的方案)

第一步:编写MR数据迁移程序

log文件  ->  HFfile文件
package com.kfk.hbase;
/**
 * @author : 蔡政洁
 * @email :caizhengjie888@icloud.com
 * @date : 2020/11/10
 * @time : 3:31 下午
 */
public class HBaseConstant {
    public static String HBASE_TABLE = "stu";
    public static String HBASE_CF_INFO = "info";
}
package com.kfk.hbase;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import java.io.IOException;
/**
 * @author : 蔡政洁
 * @email :caizhengjie888@icloud.com
 * @date : 2020/10/9
 * @time : 7:07 下午
 */
public class HBaseMRHF extends Configured implements Tool {
    /**
     * map
     * TODO
     */
    public static class MyMapper extends Mapper<LongWritable, Text, ImmutableBytesWritable, Put>{
        // rowkey,username,age,addres
        // 0001,henry,20,city-1
        String[] COLUMN = new String[]{
            "rowkey","username","age","addres"
        };
        // ImmutableBytesWritable为KEYOUT输出的Key的类型, Put为VALUEOUT输出的Value的类型
        ImmutableBytesWritable rowkey = new ImmutableBytesWritable();
        @Override
        public void map(LongWritable key, Text lines, Context context) throws IOException, InterruptedException {
            // 将每一行数据按逗号分开,放入数组中
            String[] values = lines.toString().split(",");
            // set rowkey
            rowkey.set(Bytes.toBytes(values[0]));
            Put put = new Put(rowkey.get());
            for (int index = 1;index < values.length;index++){
                put.addImmutable(Bytes.toBytes(HBaseConstant.HBASE_CF_INFO),Bytes.toBytes(COLUMN[index]),Bytes.toBytes(values[index]));
            }
            context.write(rowkey,put);
        }
    }
    /**
     * run
     * @param args
     * @return
     * @throws IOException
     * @throws ClassNotFoundException
     * @throws InterruptedException
     */
    public int run(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        // 1) get conf
        Configuration configuration = this.getConf();
        // 2) create job
        Job job = Job.getInstance(configuration,this.getClass().getSimpleName());
        job.setJarByClass(this.getClass());
        // 3.1) input,指定job的输入
        Path path = new Path(args[0]);
        FileInputFormat.addInputPath(job,path);
        // 3.2) map,指定job的mapper和输出的类型
        job.setMapperClass(MyMapper.class);
        job.setMapOutputKeyClass(ImmutableBytesWritable.class);
        job.setMapOutputValueClass(Put.class);
        // 3.4) output,指定job的输出
        Path outpath = new Path(args[1]);
        FileOutputFormat.setOutputPath(job,outpath);
        TableName tableName = TableName.valueOf("stu");
        // 创建一个链接
        Connection connection = ConnectionFactory.createConnection();
        // 获取数据表
        Table table = connection.getTable(tableName);
        RegionLocator regionLocator = connection.getRegionLocator(tableName);
        HFileOutputFormat2.configureIncrementalLoad(job,table,regionLocator);
        // 4) commit,执行job
        boolean isSuccess = job.waitForCompletion(true);
        // 如果正常执行返回0,否则返回1
        return (isSuccess) ? 0 : 1;
    }
    public static void main(String[] args) {
        // 添加输入,输入参数
//        args = new String[]{
//            "hdfs://bigdata-pro-m01:9000/user/caizhengjie/datas/stu.csv",
//            "hdfs://bigdata-pro-m01:9000/user/caizhengjie/datas/hfcsv-output"
//        };
        Configuration configuration = HBaseConfiguration.create();
        try {
            // 判断输出的文件存不存在,如果存在就将它删除
            Path fileOutPath = new Path(args[1]);
            FileSystem fileSystem = FileSystem.get(configuration);
            if (fileSystem.exists(fileOutPath)){
                fileSystem.delete(fileOutPath,true);
            }
            // 调用run方法
            int status = ToolRunner.run(configuration,new HBaseMRHF(),args);
            // 退出程序
            System.exit(status);
        } catch (IOException e) {
            e.printStackTrace();
        } catch (ClassNotFoundException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

打包并上传至服务器

第二步:生成HFfile文件

export HADOOP_HOME=/opt/modules/hadoop
export HBASE_HOME=/opt/modules/hbase
HADOOP_CLASSPATH=`${HBASE_HOME}/bin/hbase classpath` \
${HADOOP_HOME}/bin/hadoop jar /opt/jars/hbase_mrhf.jar \
hdfs://bigdata-pro-m01:9000/user/caizhengjie/datas/stu.csv hdfs://bigdata-pro-m01:9000/user/caizhengjie/datas/hfcsv-output

第三步:加载数据,将HFfile文件写入到HBase中

export HADOOP_HOME=/opt/modules/hadoop
export HBASE_HOME=/opt/modules/hbase
HADOOP_CLASSPATH=`${HBASE_HOME}/bin/hbase classpath` \
${HADOOP_HOME}/bin/hadoop jar ${HBASE_HOME}/lib/hbase-server-1.2.0-cdh5.9.3.jar completebulkload \
hdfs://bigdata-pro-m01:9000/user/caizhengjie/datas/hfcsv-output \
stutsv

运行结果:

hbase(main):007:0> scan 'stutsv'
ROW                                     COLUMN+CELL                                                                                                      
 0001                                   column=info:addres, timestamp=1605149649495, value=city-1                                                        
 0001                                   column=info:age, timestamp=1605149649495, value=20                                                               
 0001                                   column=info:username, timestamp=1605149649495, value=henry                                                       
 0002                                   column=info:addres, timestamp=1605149649495, value=city-2                                                        
 0002                                   column=info:age, timestamp=1605149649495, value=30                                                               
 0002                                   column=info:username, timestamp=1605149649495, value=cherry                                                      
 0003                                   column=info:addres, timestamp=1605149649495, value=city-3                                                        
 0003                                   column=info:age, timestamp=1605149649495, value=28                                                               
 0003                                   column=info:username, timestamp=1605149649495, value=alex                                                        
 0004                                   column=info:addres, timestamp=1605149649495, value=city-4                                                        
 0004                                   column=info:age, timestamp=1605149649495, value=35                                                               
 0004                                   column=info:username, timestamp=1605149649495, value=lili                                                        
 0005                                   column=info:addres, timestamp=1605149649495, value=city-5                                                        
 0005                                   column=info:age, timestamp=1605149649495, value=18                                                               
 0005                                   column=info:username, timestamp=1605149649495, value=jack                                                        
5 row(s) in 0.3760 seconds


相关实践学习
lindorm多模间数据无缝流转
展现了Lindorm多模融合能力——用kafka API写入,无缝流转在各引擎内进行数据存储和计算的实验。
云数据库HBase版使用教程
&nbsp; 相关的阿里云产品:云数据库 HBase 版 面向大数据领域的一站式NoSQL服务,100%兼容开源HBase并深度扩展,支持海量数据下的实时存储、高并发吞吐、轻SQL分析、全文检索、时序时空查询等能力,是风控、推荐、广告、物联网、车联网、Feeds流、数据大屏等场景首选数据库,是为淘宝、支付宝、菜鸟等众多阿里核心业务提供关键支撑的数据库。 了解产品详情:&nbsp;https://cn.aliyun.com/product/hbase &nbsp; ------------------------------------------------------------------------- 阿里云数据库体验:数据库上云实战 开发者云会免费提供一台带自建MySQL的源数据库&nbsp;ECS 实例和一台目标数据库&nbsp;RDS实例。跟着指引,您可以一步步实现将ECS自建数据库迁移到目标数据库RDS。 点击下方链接,领取免费ECS&amp;RDS资源,30分钟完成数据库上云实战!https://developer.aliyun.com/adc/scenario/51eefbd1894e42f6bb9acacadd3f9121?spm=a2c6h.13788135.J_3257954370.9.4ba85f24utseFl
相关文章
|
4月前
|
机器学习/深度学习 分布式计算 Hadoop
一种HBase表数据迁移方法的优化
一种HBase表数据迁移方法的优化
80 0
|
存储 分布式计算 Hadoop
Hbase 数据迁移闭坑指南
Hbase 数据迁移闭坑指南
536 0
|
运维 监控 分布式数据库
BDS - HBase数据迁移同步方案的设计与实践
目前在阿里云上,BDS是如何进行HBase集群之间的数据迁移和数据的实时同步的
6859 0
BDS - HBase数据迁移同步方案的设计与实践
|
存储 分布式计算 Shell
EMR(hadoop/hbase/phoenix夸集群数据迁移采坑记录)
一、概述: Hbase(Phoenix)数据迁移方案主要分为 Hadoop层面(distcp)、及Hbase层面(copyTable、export/import、snapshot) 二、以下针对distcp方案详细说明(以亲测阿里EMR为例): st...
2570 0
|
存储 分布式计算 监控
HBase存储剖析与数据迁移
1.概述   HBase的存储结构和关系型数据库不一样,HBase面向半结构化数据进行存储。所以,对于结构化的SQL语言查询,HBase自身并没有接口支持。在大数据应用中,虽然也有SQL查询引擎可以查询HBase,比如Phoenix、Drill这类。
1513 0
|
分布式计算 Hadoop 分布式数据库
|
4月前
|
Java Shell 分布式数据库
【大数据技术Hadoop+Spark】HBase数据模型、Shell操作、Java API示例程序讲解(附源码 超详细)
【大数据技术Hadoop+Spark】HBase数据模型、Shell操作、Java API示例程序讲解(附源码 超详细)
131 0
|
3月前
|
存储 分布式计算 Hadoop
Hadoop节点文件存储HBase设计目的
【6月更文挑战第2天】
47 6
|
3月前
|
存储 分布式计算 Hadoop
Hadoop节点文件存储Hbase高可靠性
【6月更文挑战第2天】
59 2