HBase——使用Put迁移MySql数据到Hbase

本文涉及的产品
云数据库 RDS MySQL,集群系列 2核4GB
推荐场景:
搭建个人博客
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
RDS MySQL Serverless 高可用系列,价值2615元额度,1个月
简介: 先上code:/** * 功能:迁移mysql上电池历史数据到hbase * Created by liuhuichao on 2016/12/6. */public class MySqlToHBase { /** * 获取表 * @param...



先上code:



/**
 * 功能:迁移mysql上电池历史数据到hbase
 * Created by liuhuichao on 2016/12/6.
 */
public class MySqlToHBase {

    /**
     * 获取表
     * @param tableName
     * @return
     * @throws IOException
     */
    private  HTable connectHBase(String tableName) throws IOException{
        HTable table=null;
        Configuration conf= HBaseConfiguration.create();
        conf.set("hbase.zookeeper.quorum","lhc-centos");
        table=new HTable(conf,tableName);
        return table;
    }

    /**
     * 获取mysql连接
     * @return
     * @throws Exception
     */
    private  java.sql.Connection connectDB() throws  Exception{
        String userName="root";
        String password="root";
        String url="jdbc:mysql://10.0.1.42:3306/energy?useUnicode=true&characterEncoding=UTF-8";
        Class.forName("com.mysql.jdbc.Driver").newInstance();
        java.sql.Connection conn=DriverManager .getConnection(url,userName,password);
        return  conn;
    }

    /**
     * 将电池历史数据表中输入导入HBase
     * @throws Exception
     */
    @Test
    public void exportFromMySqlToHBase() throws Exception{
        java.sql.Connection dbConn=null;
        HTable table=null;

        Statement stmt=null;
        String strQuery="SELECT * FROM `res_battery_data_history` limit 10000,1000000";
        dbConn=connectDB();//连接mysql
        table=connectHBase("batteryDataHistory");//连接HBase
        try{

            stmt=dbConn.createStatement();
            ResultSet rs=stmt.executeQuery(strQuery);
            long beginTime=System.currentTimeMillis();//开始
            System.out.println( "beginTime:---"+beginTime);
            while (rs.next()) {
                UUID uuid = UUID.randomUUID();
                String rowKey = uuid.toString();//作为行健
                Put put = new Put(Bytes.toBytes(rowKey));
                /**
                 * family:baseData
                 */
                // Integer id=rs.getInt("id");
                String batteryNo = rs.getString("battery_no");
                Integer batteryType = rs.getInt("battery_type");
                Float voltageDeviation = rs.getFloat("voltage_deviation");
                Float totalVoltage = rs.getFloat("total_voltage");
                Float temprature1 = rs.getFloat("temprature1");
                Float temprature2 = rs.getFloat("temprature2");
                Float chargeNum = rs.getFloat("charge_num");
                Float longtitude = rs.getFloat("longtitude");
                Float latitude = rs.getFloat("latitude");
                Float totalCurrent = rs.getFloat("total_current");
                Float soc = rs.getFloat("soc");

                put.add(Bytes.toBytes("baseData"), Bytes.toBytes("batteryNo"), Bytes.toBytes(batteryNo));
                put.add(Bytes.toBytes("baseData"), Bytes.toBytes("batteryType"), Bytes.toBytes(batteryType));
                put.add(Bytes.toBytes("baseData"), Bytes.toBytes("voltageDeviation"), Bytes.toBytes(voltageDeviation));
                put.add(Bytes.toBytes("baseData"), Bytes.toBytes("totalVoltage"), Bytes.toBytes(totalVoltage));
                put.add(Bytes.toBytes("baseData"), Bytes.toBytes("temprature1"), Bytes.toBytes(temprature1));
                put.add(Bytes.toBytes("baseData"), Bytes.toBytes("temprature2"), Bytes.toBytes(temprature2));
                put.add(Bytes.toBytes("baseData"), Bytes.toBytes("chargeNum"), Bytes.toBytes(chargeNum));
                put.add(Bytes.toBytes("baseData"), Bytes.toBytes("longtitude"), Bytes.toBytes(longtitude));
                put.add(Bytes.toBytes("baseData"), Bytes.toBytes("latitude"), Bytes.toBytes(latitude));
                put.add(Bytes.toBytes("baseData"), Bytes.toBytes("totalCurrent"), Bytes.toBytes(totalCurrent));
                put.add(Bytes.toBytes("baseData"), Bytes.toBytes("soc"), Bytes.toBytes(soc));
                /**
                 * family:volumnData
                 */
                Float vol1 = rs.getFloat("vol1");
                Float vol2 = rs.getFloat("vol2");
                Float vol3 = rs.getFloat("vol3");
                Float vol4 = rs.getFloat("vol4");
                Float vol5 = rs.getFloat("vol5");
                Float vol6 = rs.getFloat("vol6");
                Float vol7 = rs.getFloat("vol7");
                Float vol8 = rs.getFloat("vol8");
                Float vol9 = rs.getFloat("vol9");
                Float vol10 = rs.getFloat("vol10");
                Float vol11 = rs.getFloat("vol11");
                Float vol12 = rs.getFloat("vol12");
                Float vol13 = rs.getFloat("vol13");
                Float vol14 = rs.getFloat("vol14");
                Float vol15 = rs.getFloat("vol15");
                Float vol16 = rs.getFloat("vol16");
                Float vol17 = rs.getFloat("vol17");
                Float vol18 = rs.getFloat("vol18");
                Float vol19 = rs.getFloat("vol19");
                put.add(Bytes.toBytes("volumnData"), Bytes.toBytes("v1"), Bytes.toBytes(vol1));
                put.add(Bytes.toBytes("volumnData"), Bytes.toBytes("v2"), Bytes.toBytes(vol2));
                put.add(Bytes.toBytes("volumnData"), Bytes.toBytes("v3"), Bytes.toBytes(vol3));
                put.add(Bytes.toBytes("volumnData"), Bytes.toBytes("v4"), Bytes.toBytes(vol4));
                put.add(Bytes.toBytes("volumnData"), Bytes.toBytes("v5"), Bytes.toBytes(vol5));
                put.add(Bytes.toBytes("volumnData"), Bytes.toBytes("v6"), Bytes.toBytes(vol6));
                put.add(Bytes.toBytes("volumnData"), Bytes.toBytes("v7"), Bytes.toBytes(vol7));
                put.add(Bytes.toBytes("volumnData"), Bytes.toBytes("v8"), Bytes.toBytes(vol8));
                put.add(Bytes.toBytes("volumnData"), Bytes.toBytes("v9"), Bytes.toBytes(vol9));
                put.add(Bytes.toBytes("volumnData"), Bytes.toBytes("v10"), Bytes.toBytes(vol10));
                put.add(Bytes.toBytes("volumnData"), Bytes.toBytes("v11"), Bytes.toBytes(vol11));
                put.add(Bytes.toBytes("volumnData"), Bytes.toBytes("v12"), Bytes.toBytes(vol12));
                put.add(Bytes.toBytes("volumnData"), Bytes.toBytes("v13"), Bytes.toBytes(vol13));
                put.add(Bytes.toBytes("volumnData"), Bytes.toBytes("v14"), Bytes.toBytes(vol14));
                put.add(Bytes.toBytes("volumnData"), Bytes.toBytes("v15"), Bytes.toBytes(vol15));
                put.add(Bytes.toBytes("volumnData"), Bytes.toBytes("v16"), Bytes.toBytes(vol16));
                put.add(Bytes.toBytes("volumnData"), Bytes.toBytes("v17"), Bytes.toBytes(vol17));
                put.add(Bytes.toBytes("volumnData"), Bytes.toBytes("v18"), Bytes.toBytes(vol18));
                put.add(Bytes.toBytes("volumnData"), Bytes.toBytes("v19"), Bytes.toBytes(vol19));
                /**
                 * family:extraData
                 */
                String remarks = rs.getString("remarks");
                String testUserName = rs.getString("test_user_name");
                String createTime = rs.getString("create_time");
                Integer createUser = rs.getInt("create_user");
                Integer source = rs.getInt("source");
                put.add(Bytes.toBytes("extraData"), Bytes.toBytes("remarks"), Bytes.toBytes(remarks));
                put.add(Bytes.toBytes("extraData"), Bytes.toBytes("testUserName"), Bytes.toBytes(testUserName));
                put.add(Bytes.toBytes("extraData"), Bytes.toBytes("createTime"), Bytes.toBytes(createTime));
                put.add(Bytes.toBytes("extraData"), Bytes.toBytes("createUser"), Bytes.toBytes(createUser));
                put.add(Bytes.toBytes("extraData"), Bytes.toBytes("source"), Bytes.toBytes(source));
                table.put(put);
            }
        }catch (Exception e){
            e.printStackTrace();
        }
        finally {
            try{
                if(stmt !=null){
                    stmt.close();
                }
                if(dbConn !=null){
                    dbConn.close();
                }
                if(table!=null){
                    table.close();
                }
                long endTime=System.currentTimeMillis();//开始
                System.out.println( "endTime:---"+endTime);
            }catch(Exception e){
                e.printStackTrace();
            }
        }








    }
}




   结果:10000 row(s) in 218.0310 seconds

      (我是单机版的HBase,未使用HDFS做底层存储。)

     刚开始的sql没有加limit,直接上来把测试表里面的三百多万条直接插出来,结果还没查完,就堆栈溢出了;额。。。后来先塞了一万条到HBase,额,还好,速度很快,但是感觉这种导入方式不具有通用性,对于大量数据的导入是不ok的。






相关实践学习
lindorm多模间数据无缝流转
展现了Lindorm多模融合能力——用kafka API写入,无缝流转在各引擎内进行数据存储和计算的实验。
云数据库HBase版使用教程
  相关的阿里云产品:云数据库 HBase 版 面向大数据领域的一站式NoSQL服务,100%兼容开源HBase并深度扩展,支持海量数据下的实时存储、高并发吞吐、轻SQL分析、全文检索、时序时空查询等能力,是风控、推荐、广告、物联网、车联网、Feeds流、数据大屏等场景首选数据库,是为淘宝、支付宝、菜鸟等众多阿里核心业务提供关键支撑的数据库。 了解产品详情: https://cn.aliyun.com/product/hbase   ------------------------------------------------------------------------- 阿里云数据库体验:数据库上云实战 开发者云会免费提供一台带自建MySQL的源数据库 ECS 实例和一台目标数据库 RDS实例。跟着指引,您可以一步步实现将ECS自建数据库迁移到目标数据库RDS。 点击下方链接,领取免费ECS&RDS资源,30分钟完成数据库上云实战!https://developer.aliyun.com/adc/scenario/51eefbd1894e42f6bb9acacadd3f9121?spm=a2c6h.13788135.J_3257954370.9.4ba85f24utseFl
目录
相关文章
|
18天前
|
安全 关系型数据库 MySQL
如何将数据从MySQL同步到其他系统
【10月更文挑战第17天】如何将数据从MySQL同步到其他系统
106 0
|
24天前
|
SQL 前端开发 关系型数据库
全表数据核对 ,行数据核对,列数据核对,Mysql 8.0 实例(sample database classicmodels _No.3 )
全表数据核对 ,行数据核对,列数据核对,Mysql 8.0 实例(sample database classicmodels _No.3 )
41 0
全表数据核对 ,行数据核对,列数据核对,Mysql 8.0 实例(sample database classicmodels _No.3 )
|
29天前
|
关系型数据库 MySQL 数据库
mysql 里创建表并插入数据
【10月更文挑战第5天】
110 1
|
1月前
|
分布式计算 关系型数据库 MySQL
大数据-88 Spark 集群 案例学习 Spark Scala 案例 SuperWordCount 计算结果数据写入MySQL
大数据-88 Spark 集群 案例学习 Spark Scala 案例 SuperWordCount 计算结果数据写入MySQL
47 3
|
3天前
|
SQL 前端开发 关系型数据库
SpringBoot使用mysql查询昨天、今天、过去一周、过去半年、过去一年数据
SpringBoot使用mysql查询昨天、今天、过去一周、过去半年、过去一年数据
31 9
|
14天前
|
SQL Java 关系型数据库
java连接mysql查询数据(基础版,无框架)
【10月更文挑战第12天】该示例展示了如何使用Java通过JDBC连接MySQL数据库并查询数据。首先在项目中引入`mysql-connector-java`依赖,然后通过`JdbcUtil`类中的`main`方法实现数据库连接、执行SQL查询及结果处理,最后关闭相关资源。
|
11天前
|
SQL 关系型数据库 MySQL
定时任务频繁插入数据导致锁表问题 -> 查询mysql进程
定时任务频繁插入数据导致锁表问题 -> 查询mysql进程
26 1
|
12天前
|
SQL 关系型数据库 MySQL
mysql数据误删后的数据回滚
【11月更文挑战第1天】本文介绍了四种恢复误删数据的方法:1. 使用事务回滚,通过 `pymysql` 库在 Python 中实现;2. 使用备份恢复,通过 `mysqldump` 命令备份和恢复数据;3. 使用二进制日志恢复,通过 `mysqlbinlog` 工具恢复特定位置的事件;4. 使用延迟复制从副本恢复,通过停止和重启从库复制来恢复数据。每种方法都有详细的步骤和示例代码。
|
17天前
|
运维 关系型数据库 Java
DataKit6.0将MySQL8.0迁移至openGauss6.0
DataKit6.0将MySQL8.0迁移至openGauss6.0
|
1月前
|
SQL 分布式计算 关系型数据库
Hadoop-21 Sqoop 数据迁移工具 简介与环境配置 云服务器 ETL工具 MySQL与Hive数据互相迁移 导入导出
Hadoop-21 Sqoop 数据迁移工具 简介与环境配置 云服务器 ETL工具 MySQL与Hive数据互相迁移 导入导出
49 3