HBase——使用Put迁移MySql数据到Hbase

本文涉及的产品
RDS AI 助手,专业版
RDS MySQL DuckDB 分析主实例,基础系列 4核8GB
RDS DuckDB + QuickBI 企业套餐,8核32GB + QuickBI 专业版
简介: 先上code:/** * 功能:迁移mysql上电池历史数据到hbase * Created by liuhuichao on 2016/12/6. */public class MySqlToHBase { /** * 获取表 * @param...



先上code:



/**
 * 功能:迁移mysql上电池历史数据到hbase
 * Created by liuhuichao on 2016/12/6.
 */
public class MySqlToHBase {

    /**
     * 获取表
     * @param tableName
     * @return
     * @throws IOException
     */
    private  HTable connectHBase(String tableName) throws IOException{
        HTable table=null;
        Configuration conf= HBaseConfiguration.create();
        conf.set("hbase.zookeeper.quorum","lhc-centos");
        table=new HTable(conf,tableName);
        return table;
    }

    /**
     * 获取mysql连接
     * @return
     * @throws Exception
     */
    private  java.sql.Connection connectDB() throws  Exception{
        String userName="root";
        String password="root";
        String url="jdbc:mysql://10.0.1.42:3306/energy?useUnicode=true&characterEncoding=UTF-8";
        Class.forName("com.mysql.jdbc.Driver").newInstance();
        java.sql.Connection conn=DriverManager .getConnection(url,userName,password);
        return  conn;
    }

    /**
     * 将电池历史数据表中输入导入HBase
     * @throws Exception
     */
    @Test
    public void exportFromMySqlToHBase() throws Exception{
        java.sql.Connection dbConn=null;
        HTable table=null;

        Statement stmt=null;
        String strQuery="SELECT * FROM `res_battery_data_history` limit 10000,1000000";
        dbConn=connectDB();//连接mysql
        table=connectHBase("batteryDataHistory");//连接HBase
        try{

            stmt=dbConn.createStatement();
            ResultSet rs=stmt.executeQuery(strQuery);
            long beginTime=System.currentTimeMillis();//开始
            System.out.println( "beginTime:---"+beginTime);
            while (rs.next()) {
                UUID uuid = UUID.randomUUID();
                String rowKey = uuid.toString();//作为行健
                Put put = new Put(Bytes.toBytes(rowKey));
                /**
                 * family:baseData
                 */
                // Integer id=rs.getInt("id");
                String batteryNo = rs.getString("battery_no");
                Integer batteryType = rs.getInt("battery_type");
                Float voltageDeviation = rs.getFloat("voltage_deviation");
                Float totalVoltage = rs.getFloat("total_voltage");
                Float temprature1 = rs.getFloat("temprature1");
                Float temprature2 = rs.getFloat("temprature2");
                Float chargeNum = rs.getFloat("charge_num");
                Float longtitude = rs.getFloat("longtitude");
                Float latitude = rs.getFloat("latitude");
                Float totalCurrent = rs.getFloat("total_current");
                Float soc = rs.getFloat("soc");

                put.add(Bytes.toBytes("baseData"), Bytes.toBytes("batteryNo"), Bytes.toBytes(batteryNo));
                put.add(Bytes.toBytes("baseData"), Bytes.toBytes("batteryType"), Bytes.toBytes(batteryType));
                put.add(Bytes.toBytes("baseData"), Bytes.toBytes("voltageDeviation"), Bytes.toBytes(voltageDeviation));
                put.add(Bytes.toBytes("baseData"), Bytes.toBytes("totalVoltage"), Bytes.toBytes(totalVoltage));
                put.add(Bytes.toBytes("baseData"), Bytes.toBytes("temprature1"), Bytes.toBytes(temprature1));
                put.add(Bytes.toBytes("baseData"), Bytes.toBytes("temprature2"), Bytes.toBytes(temprature2));
                put.add(Bytes.toBytes("baseData"), Bytes.toBytes("chargeNum"), Bytes.toBytes(chargeNum));
                put.add(Bytes.toBytes("baseData"), Bytes.toBytes("longtitude"), Bytes.toBytes(longtitude));
                put.add(Bytes.toBytes("baseData"), Bytes.toBytes("latitude"), Bytes.toBytes(latitude));
                put.add(Bytes.toBytes("baseData"), Bytes.toBytes("totalCurrent"), Bytes.toBytes(totalCurrent));
                put.add(Bytes.toBytes("baseData"), Bytes.toBytes("soc"), Bytes.toBytes(soc));
                /**
                 * family:volumnData
                 */
                Float vol1 = rs.getFloat("vol1");
                Float vol2 = rs.getFloat("vol2");
                Float vol3 = rs.getFloat("vol3");
                Float vol4 = rs.getFloat("vol4");
                Float vol5 = rs.getFloat("vol5");
                Float vol6 = rs.getFloat("vol6");
                Float vol7 = rs.getFloat("vol7");
                Float vol8 = rs.getFloat("vol8");
                Float vol9 = rs.getFloat("vol9");
                Float vol10 = rs.getFloat("vol10");
                Float vol11 = rs.getFloat("vol11");
                Float vol12 = rs.getFloat("vol12");
                Float vol13 = rs.getFloat("vol13");
                Float vol14 = rs.getFloat("vol14");
                Float vol15 = rs.getFloat("vol15");
                Float vol16 = rs.getFloat("vol16");
                Float vol17 = rs.getFloat("vol17");
                Float vol18 = rs.getFloat("vol18");
                Float vol19 = rs.getFloat("vol19");
                put.add(Bytes.toBytes("volumnData"), Bytes.toBytes("v1"), Bytes.toBytes(vol1));
                put.add(Bytes.toBytes("volumnData"), Bytes.toBytes("v2"), Bytes.toBytes(vol2));
                put.add(Bytes.toBytes("volumnData"), Bytes.toBytes("v3"), Bytes.toBytes(vol3));
                put.add(Bytes.toBytes("volumnData"), Bytes.toBytes("v4"), Bytes.toBytes(vol4));
                put.add(Bytes.toBytes("volumnData"), Bytes.toBytes("v5"), Bytes.toBytes(vol5));
                put.add(Bytes.toBytes("volumnData"), Bytes.toBytes("v6"), Bytes.toBytes(vol6));
                put.add(Bytes.toBytes("volumnData"), Bytes.toBytes("v7"), Bytes.toBytes(vol7));
                put.add(Bytes.toBytes("volumnData"), Bytes.toBytes("v8"), Bytes.toBytes(vol8));
                put.add(Bytes.toBytes("volumnData"), Bytes.toBytes("v9"), Bytes.toBytes(vol9));
                put.add(Bytes.toBytes("volumnData"), Bytes.toBytes("v10"), Bytes.toBytes(vol10));
                put.add(Bytes.toBytes("volumnData"), Bytes.toBytes("v11"), Bytes.toBytes(vol11));
                put.add(Bytes.toBytes("volumnData"), Bytes.toBytes("v12"), Bytes.toBytes(vol12));
                put.add(Bytes.toBytes("volumnData"), Bytes.toBytes("v13"), Bytes.toBytes(vol13));
                put.add(Bytes.toBytes("volumnData"), Bytes.toBytes("v14"), Bytes.toBytes(vol14));
                put.add(Bytes.toBytes("volumnData"), Bytes.toBytes("v15"), Bytes.toBytes(vol15));
                put.add(Bytes.toBytes("volumnData"), Bytes.toBytes("v16"), Bytes.toBytes(vol16));
                put.add(Bytes.toBytes("volumnData"), Bytes.toBytes("v17"), Bytes.toBytes(vol17));
                put.add(Bytes.toBytes("volumnData"), Bytes.toBytes("v18"), Bytes.toBytes(vol18));
                put.add(Bytes.toBytes("volumnData"), Bytes.toBytes("v19"), Bytes.toBytes(vol19));
                /**
                 * family:extraData
                 */
                String remarks = rs.getString("remarks");
                String testUserName = rs.getString("test_user_name");
                String createTime = rs.getString("create_time");
                Integer createUser = rs.getInt("create_user");
                Integer source = rs.getInt("source");
                put.add(Bytes.toBytes("extraData"), Bytes.toBytes("remarks"), Bytes.toBytes(remarks));
                put.add(Bytes.toBytes("extraData"), Bytes.toBytes("testUserName"), Bytes.toBytes(testUserName));
                put.add(Bytes.toBytes("extraData"), Bytes.toBytes("createTime"), Bytes.toBytes(createTime));
                put.add(Bytes.toBytes("extraData"), Bytes.toBytes("createUser"), Bytes.toBytes(createUser));
                put.add(Bytes.toBytes("extraData"), Bytes.toBytes("source"), Bytes.toBytes(source));
                table.put(put);
            }
        }catch (Exception e){
            e.printStackTrace();
        }
        finally {
            try{
                if(stmt !=null){
                    stmt.close();
                }
                if(dbConn !=null){
                    dbConn.close();
                }
                if(table!=null){
                    table.close();
                }
                long endTime=System.currentTimeMillis();//开始
                System.out.println( "endTime:---"+endTime);
            }catch(Exception e){
                e.printStackTrace();
            }
        }








    }
}




   结果:10000 row(s) in 218.0310 seconds

      (我是单机版的HBase,未使用HDFS做底层存储。)

     刚开始的sql没有加limit,直接上来把测试表里面的三百多万条直接插出来,结果还没查完,就堆栈溢出了;额。。。后来先塞了一万条到HBase,额,还好,速度很快,但是感觉这种导入方式不具有通用性,对于大量数据的导入是不ok的。






目录
相关文章
|
7月前
|
关系型数据库 MySQL 数据库
自建数据库如何迁移至RDS MySQL实例
数据库迁移是一项复杂且耗时的工程,需考虑数据安全、完整性及业务中断影响。使用阿里云数据传输服务DTS,可快速、平滑完成迁移任务,将应用停机时间降至分钟级。您还可通过全量备份自建数据库并恢复至RDS MySQL实例,实现间接迁移上云。
|
11月前
|
缓存 NoSQL 关系型数据库
美团面试:MySQL有1000w数据,redis只存20w的数据,如何做 缓存 设计?
美团面试:MySQL有1000w数据,redis只存20w的数据,如何做 缓存 设计?
美团面试:MySQL有1000w数据,redis只存20w的数据,如何做 缓存 设计?
|
8月前
|
存储 运维 关系型数据库
从MySQL到云数据库,数据库迁移真的有必要吗?
本文探讨了企业在业务增长背景下,是否应从 MySQL 迁移至云数据库的决策问题。分析了 MySQL 的优势与瓶颈,对比了云数据库在存储计算分离、自动化运维、多负载支持等方面的优势,并提出判断迁移必要性的五个关键问题及实施路径,帮助企业理性决策并落地迁移方案。
|
10月前
|
人工智能 运维 关系型数据库
数据库运维:mysql 数据库迁移方法-mysqldump
本文介绍了MySQL数据库迁移的方法与技巧,重点探讨了数据量大小对迁移方式的影响。对于10GB以下的小型数据库,推荐使用mysqldump进行逻辑导出和source导入;10GB以上可考虑mydumper与myloader工具;100GB以上则建议物理迁移。文中还提供了统计数据库及表空间大小的SQL语句,并讲解了如何使用mysqldump导出存储过程、函数和数据结构。通过结合实际应用场景选择合适的工具与方法,可实现高效的数据迁移。
1569 1
|
9月前
|
SQL 人工智能 关系型数据库
如何实现MySQL百万级数据的查询?
本文探讨了在MySQL中对百万级数据进行排序分页查询的优化策略。面对五百万条数据,传统的浅分页和深分页查询效率较低,尤其深分页因偏移量大导致性能显著下降。通过为排序字段添加索引、使用联合索引、手动回表等方法,有效提升了查询速度。最终建议根据业务需求选择合适方案:浅分页可加单列索引,深分页推荐联合索引或子查询优化,同时结合前端传递最后一条数据ID的方式实现高效翻页。
488 0
|
8月前
|
存储 关系型数据库 MySQL
在CentOS 8.x上安装Percona Xtrabackup工具备份MySQL数据步骤。
以上就是在CentOS8.x上通过Perconaxtabbackup工具对Mysql进行高效率、高可靠性、无锁定影响地实现在线快速全量及增加式数据库资料保存与恢复流程。通过以上流程可以有效地将Mysql相关资料按需求完成定期或不定期地保存与灾难恢复需求。
640 10
|
10月前
|
关系型数据库 MySQL 数据库
MySQL数据库上云迁移
本文介绍了将数据库迁移到RDS for Mysql的两种主要方法:停服迁移和不停服迁移。停服迁移适合可短暂中断服务的场景,通过mysqldump或DTS完成;不停服迁移适用于需保持业务连续性的场景,推荐使用DTS实现结构、全量及增量数据迁移。文中详细列出了每种方法的具体操作步骤,帮助企业根据需求选择合适的迁移方案。
332 1
MySQL数据库上云迁移
|
9月前
|
SQL 存储 缓存
MySQL 如何高效可靠处理持久化数据
本文详细解析了 MySQL 的 SQL 执行流程、crash-safe 机制及性能优化策略。内容涵盖连接器、分析器、优化器、执行器与存储引擎的工作原理,深入探讨 redolog 与 binlog 的两阶段提交机制,并分析日志策略、组提交、脏页刷盘等关键性能优化手段,帮助提升数据库稳定性与执行效率。
248 0
|
关系型数据库 MySQL Linux
在Linux环境下备份Docker中的MySQL数据并传输到其他服务器以实现数据级别的容灾
以上就是在Linux环境下备份Docker中的MySQL数据并传输到其他服务器以实现数据级别的容灾的步骤。这个过程就像是一场接力赛,数据从MySQL数据库中接力棒一样传递到备份文件,再从备份文件传递到其他服务器,最后再传递回MySQL数据库。这样,即使在灾难发生时,我们也可以快速恢复数据,保证业务的正常运行。
532 28
|
11月前
|
存储 SQL 缓存
mysql数据引擎有哪些
MySQL 提供了多种存储引擎,每种引擎都有其独特的特点和适用场景。以下是一些常见的 MySQL 存储引擎及其特点:
295 0

推荐镜像

更多