实现HBase表和RDB表的转化(附Java源码资源)

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
检索分析服务 Elasticsearch 版,2核4GB开发者规格 1个月
实时数仓Hologres,5000CU*H 100GB 3个月
简介: 该文介绍了如何将数据从RDB转换为HBase表,主要涉及三个来源:RDB Table、Client API和Files。文章重点讲解了RDB到HBase的转换,通过批处理思想,利用RDB接口批量导出数据并转化为`List<Put>`,然后导入HBase。目录结构包括配置文件、RDB接口及实现类、HBase接口及实现类,以及一个通用转换器接口和实现。代码中,`RDBImpl`负责从RDB读取数据并构造`Put`对象,`HBaseImpl`则负责将`Put`写入HBase表。整个过程通过配置文件`transfer.properties`管理HBase和RDB的映射关系。

实现HBase表和RDB表的转化

image.png
image.png

一、引入

转化为HBase表的三大来源:RDB Table、Client API、Files
image.png

如何构造通用性的代码模板实现向HBase表的转换,是一个值得考虑的问题。这篇文章着重讲解RDB表向HBase表的转换。

首先,我们需要分别构造rdb和hbase的对象,根据批处理的思想,我们可以考虑批量将rdb中的数据导出,并且转化为List<Put>的格式,直接导入HBase表中,最后释放资源,伪代码模板如下:

rdb=...
hbase=...
rdb.init();
hbase.init();
while(rdb.hasNextBatch()){
   
   
    List<Put> batch = rdb.nextBatch();
    hbase.putBatch(batch);
}
hbase.close();
rdb.close();

二、代码讲解

1. 目录结构

image.png

f3041d94e63b3374665d8afe3e3.png#pic_center)

2. 具体实现
  • transfer.properties

内含HBase和RDB转换所有配置信息的配置文件,因为该配置文件是在启动时就需要进行配置,因此我们需要按以下图片进行配置导入配置文件:
image.png

  1. 在==Run/Debug Configurations==中,新建一个==Application==
  2. 配置好主类
  3. 配置好配置文件的具体路径
  • RDB 接口
public interface RDB extends Com {
   
   
    // 要提升性能,需要使用批处理
    boolean hasNextBatch() throws SQLException;// 是否存在下一个批次
    List<Put> nextBatch() throws SQLException;// 一个put代表往一个hbase表的一行的一个列族的一个列插入一条数据,对Hbase来说,批次就是List<Put>
}
  • RDB 实现类
public class RDBImpl implements RDB {
   
   
    private static Logger logger = Logger.getLogger(RDBImpl.class);
    // JDBC 的基本元素:连接对象(装载[驱动]、[URL]、[账号]、[密码])->执行对象(SQL语句)->结果集
    private Properties config;
    /**
     * 它们需要设置成全局变量的原因是它们需要共享
     */
    private Connection con;
    private PreparedStatement pst;
    private ResultSet rst;
    // 定义每个批次处理的记录数的最大数量
    private int batchSize;
    // hbase的行键对应rdb的列的列名
    private String hbaseRowKeyRdbCol;
    private Map<String,Map<String,String>> hbaseRdbColMapping;

    // RDB配置可以灵活地从外部传入(构造方法),从内部读取(config())
    public RDBImpl(Properties config) {
   
   
        this.config = config;
    }

    @Override
    public Properties config() {
   
   
        return config;
    }

    /**
     * 内部资源初始化
     */
    @Override
    public void init() throws Exception{
   
   
        con = getConnection();
        logger.info("RDB 创建 [ 连接 ] 对象成功");
        pst = getStatement(con);
        logger.info("RDB 创建 [ 执行 ] 对象成功");
        rst = getResult(pst);
        logger.info("RDB 创建 [ 结果集 ] 成功");
        batchSize = batchSize();
        hbaseRdbColMapping = hbaseRdbColumnsMapping();
    }

    @Override
    public void close() {
   
   
        closeAll(rst,pst,con);
    }


    private String driver(){
   
   
        return checkAndGetConfig("rdb.driver");
    }

    private String url(){
   
   
        return checkAndGetConfig("rdb.url");
    }

    private String username(){
   
   
        return checkAndGetConfig("rdb.username");
    }

    private String password(){
   
   
        return checkAndGetConfig("rdb.password");
    }

    private String sql(){
   
   
        return checkAndGetConfig("rdb.sql");
    }

    private int batchSize(){
   
   
        return Integer.parseInt(checkAndGetConfig("rdb.batchSize"));
    }

    // java.sql下的Connection
    private Connection getConnection() throws ClassNotFoundException, SQLException {
   
   
        // 装载驱动
        Class.forName(driver());
        // 获取并返回连接对象
        return DriverManager.getConnection(url(),username(),password());
    }
    private PreparedStatement getStatement(Connection con) throws SQLException {
   
   
        return con.prepareStatement(sql());
    }
    private ResultSet getResult(PreparedStatement statement) throws SQLException {
   
   
        return statement.executeQuery();
    }
    /**
     * hbase 列族和列与rdb中列的映射关系
     *             hbase列族   hbase列  rdb列
     * @return Map<String,Map<String,String>>
     */
    private Map<String, Map<String,String>> hbaseRdbColumnsMapping(){
   
   
        String mapping = checkAndGetConfig("rdb.hbase.columns.mapping");
        Map<String,Map<String,String>> map = new HashMap<>();
        String[] pss = mapping.split(",");
        for(String ps : pss){
   
   
            String[] pp = ps.split("->");
            String[] p = pp[0].split(":");
            String rdbCol = pp[1],hbaseColFamily,hbaseColName;
            if(p.length==1){
   
   
                hbaseRowKeyRdbCol = pp[1];
            }else {
   
   
                hbaseColFamily = p[0];
                hbaseColName = p[1];
                if(!map.containsKey(hbaseColFamily)){
   
   
                    map.put(hbaseColFamily,new HashMap<>());
                }
                map.get(hbaseColFamily).put(hbaseColName,rdbCol);
            }
        }
        return map;
    }

    /**
     * 将RDB的列转化为字节数组(需要确定列的数据类型)
     * @param rdbColumn
     * @return
     * @throws SQLException
     */

    private byte[] toBytesFromRdb(String rdbColumn) throws SQLException {
   
   
        Object obj = rst.getObject(rdbColumn);
        if(obj instanceof String){
   
   
            return Bytes.toBytes((String)obj);
        } else if(obj instanceof Float){
   
   
            return Bytes.toBytes(((Float)obj).floatValue());
        } else if(obj instanceof Double){
   
   
            return Bytes.toBytes(((Double)obj).doubleValue());
        } else if(obj instanceof BigDecimal){
   
   
            return Bytes.toBytes((BigDecimal)obj);
        } else if(obj instanceof Short){
   
   
            return Bytes.toBytes(((Short) obj).shortValue());
        } else if(obj instanceof Integer){
   
   
            return Bytes.toBytes(((Integer)obj).intValue());
        } else if(obj instanceof Boolean){
   
   
            return Bytes.toBytes((Boolean)((Boolean) obj).booleanValue());
        } else {
   
   
            throw new SQLException("HBase不支持转化为字节数组的类型:"+obj.getClass().getName());
        }
    }

    /**
     * 将HBase的列名或列族名转化为字节数组
     * @param name
     * @return
     */
    private byte[] toBytes(String name){
   
   
        return Bytes.toBytes(name);
    }

    // 最后一个批次的数据最少有一条
    @Override
    public boolean hasNextBatch() throws SQLException{
   
   
        return rst.next();
    }

    @Override
    public List<Put> nextBatch() throws SQLException{
   
   
        // 预先分配容量
        List<Put> list = new ArrayList<>(batchSize);
        int count = 0;
        do{
   
   
            /**
             * 如何将一行解析为多个put(结合配置文件)
             * 对每条数据,创建一个带行键的put,向put中放入HBase列族名,HBase列名,RDB列名
             */
            Put put = new Put(toBytesFromRdb(hbaseRowKeyRdbCol));
            for (Map.Entry<String, Map<String, String>> e : hbaseRdbColMapping.entrySet()) {
   
   
                String columnFamily = e.getKey();
                for (Map.Entry<String, String> s : e.getValue().entrySet()) {
   
   
                    String hbaseColumn = s.getKey();
                    String rdbColumn = s.getValue();
                    // 需要将内容转变为字节数组传入方法
                    put.addColumn(toBytes(columnFamily),toBytes(hbaseColumn),toBytesFromRdb(rdbColumn));
                }
            }
            list.add(put);
        }while(++count<batchSize && rst.next());
        return list;
    }

}

如何理解一行转化为多个put?
image.png

结果集的实质?
image.png

==rst.next()== 的两个作用

rst.next();
// 1.判定是否存在下一个有效行
// 2.若存在下一个有效行,则指向该有效行

a. 只通过config作为参数构造rdb
b. 以==JDBC==为核心,需要==连接对象(驱动,URL,账号,密码)=>执行对象(SQL)=>结果集==,这些都需要被设计为全局变量(因为需要被共享)
c. 既实现了RDB接口,还实现了RDB的继承接口Com中的init()、close()进行资源的初始化和释放,checkAndGetConfig()根据传入的配置文件获取配置信息并且赋值给全局变量。
d. 重点:我们还需要对RDB和HBase的映射关系进行解析,最终解析出==RDB列名,HBase列族名,HBase列名==,具体如何解析参考配置文件transfer.properties,并将解析出来的名字构造成一个==Put对象==,由于构造==Put对象==只能放字节数组,所以需要转化为字节数组的方法,又因为解析RDB的列名需要考虑列的数据类型,而解析HBase的列族或列名不需要考虑,因此需要有两个转换方法==ToBytesFromRDB()和ToBytes()==分别实现两种情况的字节数组转化。

  • HBase接口

    public interface HBase extends Com {
         
         
      // RDBImpl的nextBatch()返回的就是List<Put>,直接放入HBase表即可。
      void putBatch(List<Put> batch) throws IOException;
    }
    
  • HBase实现类

public class HBaseImpl implements HBase {
   
   
    private static Logger loggerHBase = Logger.getLogger(HBaseImpl.class);
    private Properties config;
    private Connection con;
    private Table hbaseTable;


    public HBaseImpl(Properties config) {
   
   
        this.config = config;
    }

    @Override
    public Properties config() {
   
   
        return config;
    }

    @Override
    public void init() throws Exception {
   
   
        con = getCon();
        loggerHBase.info("HBase 创建 [ 连接 ] 成功");
        hbaseTable = checkAndGetTable(con);
        loggerHBase.info("HBase 创建 [ 数据表 ] 成功");
    }

    @Override
    public void close() {
   
   
        closeAll(hbaseTable,con);
    }

    private String tableName(){
   
   
        return checkAndGetConfig("hbase.table.name");
    }
    private String zkUrl(){
   
   
        return checkAndGetConfig("hbase.zk");
    }

    private Connection getCon() throws IOException {
   
   
        // hadoop.conf的configuration
        Configuration config = HBaseConfiguration.create();
        config.set("hbase.zookeeper.quorum",zkUrl());
        return ConnectionFactory.createConnection(config);
    }

    private Table checkAndGetTable(Connection con) throws IOException {
   
   
        /**
         * Admin : HBase DDL
         */
        Admin admin = con.getAdmin();
        TableName tableName = TableName.valueOf(tableName());
        // 通过tableName判定表是否存在
        if(!admin.tableExists(tableName)){
   
   
            throw new IOException("HBase表不存在异常:"+tableName);
        }
        /**
         * Table : HBase DML & DQL
         */
        // 传入的参数可以是TableName tableName,ExecutorService pool(表操作可以并发)
        return con.getTable(tableName);
    }

    @Override
    public void putBatch(List<Put> batch) throws IOException{
   
   
        hbaseTable.put(batch);
    }
}

HBase的实现类和RDB的实现类也非常类似:
先重写HBase接口中的方法和Com接口中的方法,发现往里放数据需要构造一个==Table对象==,而Table对象的构建需要一个连接对象和TableName,因此在构造了两个方法tableName()获取配置信息中的TableName(注意:此时的TableName是字符串类型),zkUrl()获取zk.url作为配置构造连接对象。

  • Com接口
public interface Com {
   
   
    Logger logger = Logger.getLogger(Com.class);
    // 获取配置对象
    Properties config();

    // 初始化资源
    void init() throws Exception;

    // 释放资源
    void close();

    default String checkAndGetConfig(String key){
   
   
        if(!config().containsKey(key)){
   
   
            // 因为该方法可能被用于HBase和RDB
            throw new RuntimeException("配置项缺失异常:"+key);
        }
        String item = config().getProperty(key);
        logger.info(String.format("获取配置项 %s : %s",key,item));
        return item;
    }

    default void closeAll(AutoCloseable...acs){
   
   
        for (AutoCloseable ac : acs) {
   
   
            if (Objects.nonNull(ac)) {
   
   
                try {
   
   
                    ac.close();
                    logger.info(String.format("释放 %s 成功",ac.getClass().getName()));
                } catch (Exception e) {
   
   
                    logger.error("释放资源异常:"+e);
                }
            }
        }
    }
}

在Com接口中,设计了一些普通方法config()实现配置的导出,init()、close()资源的初始化和关闭;同样还设计了一些无需实现的默认方法便于实现init()和close()方法。这些方法适用于RDB和HBase的实现类。

  • RDBToHBase接口
public interface RDBToHBase {
   
   
    // 创建一个RDB对象
    void setRDB(RDB rdb);
    // 创建一个HBase对象
    void setHBase(HBase hbase);
    // 进行数据的传输
    void startTransfer();
}
  • RDBToHBase实现类

    public class RDBToHBaseImpl implements RDBToHBase {
         
         
      // 日志显示
      private static Logger loggerRH = Logger.getLogger(RDBToHBaseImpl.class);
      private RDB rdb;
      private HBase hbase;
    
      @Override
      public void setRDB(RDB rdb) {
         
         
          this.rdb = rdb;
      }
    
      @Override
      public void setHBase(HBase hbase) {
         
         
          this.hbase = hbase;
      }
    
      @Override
      public void startTransfer() {
         
         
          try {
         
         
              rdb.init();
              loggerRH.info("RDB 初始化成功");
              hbase.init();
              loggerRH.info("HBase 初始化成功");
              loggerRH.info("数据从 RDB 迁移至 HBase 开始...");
              int count = 0;
              while (rdb.hasNextBatch()) {
         
         
                  final List<Put> batch = rdb.nextBatch();
                  hbase.putBatch(batch);
                  loggerRH.info(String.format("第 %d 批:%d 条数据插入成功",++count,batch.size()));
              }
              loggerRH.info("数据从 RDB 迁移至 HBase 结束...");
          } catch (Exception e){
         
         
              loggerRH.error("将 RDB 数据批量迁移至 HBase 异常",e);
          } finally{
         
         
              hbase.close();
              rdb.close();
          }
      }
    }
    
  • AppRDBToHBase 实现类
public class AppRDBToHBase
{
   
   
    private static Logger logger = Logger.getLogger(AppRDBToHBase.class);
    private static void start(String[] args){
   
   
        try {
   
   
            if (Objects.isNull(args) || args.length == 0 || Objects.isNull(args[0])) {
   
   
                throw new NullPointerException("配置文件路径空指针异常");
            }
            final String PATH = args[0];
            final File file = new File(PATH);
            if (!file.exists() || file.length() == 0 || !file.canRead()) {
   
   
                throw new IOException("配置文件不存在、不可读、空白");
            }
            Properties config = new Properties();
            // final String path = args[0];
            config.load(new FileReader(file));

            RDB rdb = new RDBImpl(config);
            HBase hBase = new HBaseImpl(config);
            RDBToHBase rdbToHBase = new RDBToHBaseImpl();
            rdbToHBase.setRDB(rdb);
            rdbToHBase.setHBase(hBase);
            rdbToHBase.startTransfer();
        }catch(Exception e){
   
   
            logger.error("配置异常",e);
        }
    }
    public static void main( String[] args ) {
   
   
        start(args);
    }
}

对于传入的配置文件路径,既要检查路径本身,也要检查路径代表的文件本身。
通过流的方式将文件进行配置,并且利用该配置构造RDB和HBase并进行数据的传输

其他:日志文件系统Log.4j的应用
  • 准备:需要在Resources模块下配置==log4j.properties==文件
  • 注意:
    • 日志文件信息的输出方式有三种logger.error()、logger.info()、logger.warn() ,除了对错误信息进行输出之外,也要习惯于补充正常信息的输出,以增强代码的可读性。
    • log.4j除了在控制台打印日志信息之外,还能在磁盘下的日志文件中打印日志信息,因此在导入log4j.properties文件之后需要修改日志文件的路径。
    • 对于不同类或接口下的logger,需要注意进行名字的区分。
相关实践学习
lindorm多模间数据无缝流转
展现了Lindorm多模融合能力——用kafka API写入,无缝流转在各引擎内进行数据存储和计算的实验。
云数据库HBase版使用教程
&nbsp; 相关的阿里云产品:云数据库 HBase 版 面向大数据领域的一站式NoSQL服务,100%兼容开源HBase并深度扩展,支持海量数据下的实时存储、高并发吞吐、轻SQL分析、全文检索、时序时空查询等能力,是风控、推荐、广告、物联网、车联网、Feeds流、数据大屏等场景首选数据库,是为淘宝、支付宝、菜鸟等众多阿里核心业务提供关键支撑的数据库。 了解产品详情:&nbsp;https://cn.aliyun.com/product/hbase &nbsp; ------------------------------------------------------------------------- 阿里云数据库体验:数据库上云实战 开发者云会免费提供一台带自建MySQL的源数据库&nbsp;ECS 实例和一台目标数据库&nbsp;RDS实例。跟着指引,您可以一步步实现将ECS自建数据库迁移到目标数据库RDS。 点击下方链接,领取免费ECS&amp;RDS资源,30分钟完成数据库上云实战!https://developer.aliyun.com/adc/scenario/51eefbd1894e42f6bb9acacadd3f9121?spm=a2c6h.13788135.J_3257954370.9.4ba85f24utseFl
目录
相关文章
|
3天前
|
监控 Java 应用服务中间件
高级java面试---spring.factories文件的解析源码API机制
【11月更文挑战第20天】Spring Boot是一个用于快速构建基于Spring框架的应用程序的开源框架。它通过自动配置、起步依赖和内嵌服务器等特性,极大地简化了Spring应用的开发和部署过程。本文将深入探讨Spring Boot的背景历史、业务场景、功能点以及底层原理,并通过Java代码手写模拟Spring Boot的启动过程,特别是spring.factories文件的解析源码API机制。
14 2
|
18天前
|
Java 程序员
JAVA程序员的进阶之路:掌握URL与URLConnection,轻松玩转网络资源!
在Java编程中,网络资源的获取与处理至关重要。本文介绍了如何使用URL与URLConnection高效、准确地获取网络资源。首先,通过`java.net.URL`类定位网络资源;其次,利用`URLConnection`类实现资源的读取与写入。文章还提供了最佳实践,包括异常处理、连接池、超时设置和请求头与响应头的合理配置,帮助Java程序员提升技能,应对复杂网络编程场景。
41 9
|
1月前
|
Java Apache Maven
Java百项管理之新闻管理系统 熟悉java语法——大学生作业 有源码!!!可运行!!!
文章提供了使用Apache POI库在Java中创建和读取Excel文件的详细代码示例,包括写入数据到Excel和从Excel读取数据的方法。
59 6
Java百项管理之新闻管理系统 熟悉java语法——大学生作业 有源码!!!可运行!!!
|
28天前
|
Java
Java开发实现图片地址检验,如果无法找到资源则使用默认图片,如何编码?
【10月更文挑战第14天】Java开发实现图片地址检验,如果无法找到资源则使用默认图片,如何编码?
52 2
|
2月前
|
数据采集 运维 前端开发
【Java】全套云HIS源码包含EMR、LIS (医院信息化建设)
系统技术特点:采用前后端分离架构,前端由Angular、JavaScript开发;后端使用Java语言开发。
79 5
|
2月前
|
分布式计算 Java Hadoop
java使用hbase、hadoop报错举例
java使用hbase、hadoop报错举例
90 4
|
7天前
|
人工智能 监控 数据可视化
Java智慧工地信息管理平台源码 智慧工地信息化解决方案SaaS源码 支持二次开发
智慧工地系统是依托物联网、互联网、AI、可视化建立的大数据管理平台,是一种全新的管理模式,能够实现劳务管理、安全施工、绿色施工的智能化和互联网化。围绕施工现场管理的人、机、料、法、环五大维度,以及施工过程管理的进度、质量、安全三大体系为基础应用,实现全面高效的工程管理需求,满足工地多角色、多视角的有效监管,实现工程建设管理的降本增效,为监管平台提供数据支撑。
24 3
|
18天前
|
Java 开发者
JAVA高手必备:URL与URLConnection,解锁网络资源的终极秘籍!
在Java网络编程中,URL和URLConnection是两大关键技术,能够帮助开发者轻松处理网络资源。本文通过两个案例,深入解析了如何使用URL和URLConnection从网站抓取数据和发送POST请求上传数据,助力你成为真正的JAVA高手。
37 11
|
12天前
|
运维 自然语言处理 供应链
Java云HIS医院管理系统源码 病案管理、医保业务、门诊、住院、电子病历编辑器
通过门诊的申请,或者直接住院登记,通过”护士工作站“分配患者,完成后,进入医生患者列表,医生对应开具”长期医嘱“和”临时医嘱“,并在电子病历中,记录病情。病人出院时,停止长期医嘱,开具出院医嘱。进入出院审核,审核医嘱与住院通过后,病人结清缴费,完成出院。
41 3
|
18天前
|
JavaScript Java 项目管理
Java毕设学习 基于SpringBoot + Vue 的医院管理系统 持续给大家寻找Java毕设学习项目(附源码)
基于SpringBoot + Vue的医院管理系统,涵盖医院、患者、挂号、药物、检查、病床、排班管理和数据分析等功能。开发工具为IDEA和HBuilder X,环境需配置jdk8、Node.js14、MySQL8。文末提供源码下载链接。