HIVE自动刷表导数据

本文涉及的产品
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
云数据库 RDS MySQL,集群版 2核4GB 100GB
推荐场景:
搭建个人博客
云数据库 RDS MySQL,高可用版 2核4GB 50GB
简介: 代码实例package mysql.to.hdfs.hive;import java.sql.Connection;import java.

代码实例

package mysql.to.hdfs.hive;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
import java.sql.Statement;

//第一步,先把oracle数据抽取到hdfs目录,通过sqoop工具
//第二步,修改下面对应的链接,oracle链接即可
//第三步,执行程序,等程序执行完毕,数据都到了hyperbase表中,可验证
public class AutoImportDataToORC {
    private static String MYSQLUSERNAME = "root";
    private static String MYSQLPASSWORD = "Gepoint";
    private static String MYSQLDRIVER = "com.mysql.jdbc.Driver";
    private static String MYSQLURL = "jdbc:mysql://100.2.5.221:3307/dep_rk_db";

    private static String HIVEDRIVER = "org.apache.hive.jdbc.HiveDriver";
    private static String HIVEURL = "jdbc:hive2://100.2.5.2:10000/default";
    private static String HIVEUSERNAME = "hdfs";
    private static String HIVEPASSWORD = "d010";

    Connection mysqlconn = null;
    Statement mysqlpstm = null;
    ResultSet mysqlrs = null;

    Connection hiveconn = null;
    Statement hivepstm = null;
    ResultSet hivers = null;

    String sql1 = " ";
    String sql2 = " ";
    String sql3 = " ";
    String sql4 = " ";
    String sql5 = " ";
    String sql6 = " ";
    String MYSQLUSERNAME1 = MYSQLUSERNAME.replaceAll("001", "");

    public static void main(String[] args) throws Exception {
        AutoImportDataToORC aidth = new AutoImportDataToORC();
        aidth.getMYSQLConnection();
        aidth.MYSQLReleaseResource();
        aidth.getHiveConnection();
        aidth.HiveReleaseResource();
//      aidth.CreateExternalTable();
//      aidth.CreateOrcTable();
//      aidth.ImportDataToORC();
        System.out.println("程序已经执行完毕!请去waterdrop验证结果吧!!");
    }

    public void CreateExternalTable() {
        mysqlconn = getMYSQLConnection();
        hiveconn = getHiveConnection();
        String sql0 = "dfs -du /tmp/imp/" + MYSQLUSERNAME.toUpperCase();
        try {
            hivepstm = hiveconn.createStatement();
            mysqlpstm = mysqlconn.createStatement();
            hivers = hivepstm.executeQuery(sql0);
            int i = 0;
            while (hivers.next()) {
                String hdfsspace = hivers.getString(1);
                String a[] = hdfsspace.split("/", 2);
                String size = a[0];
                String tableNameAndFloder = a[1];
                String ss[] = tableNameAndFloder.split("/");
                String tableName = ss[3];
                if(Long.parseLong(size.trim()) !=  0L){
                i = i + 1;
                String tableName1 = tableName.replaceAll("\\$", "");
                String sql = "select c.table_name,\r\n" + " case when c.column_id=c.COLUMN_ID_MIN \r\n"
                        + " then 'create external table if not exists " + MYSQLUSERNAME1
                        + "_EX."+tableName1+" ( ' \r\n" + " else ', ' \r\n" + " end \r\n" + " ||c.sqltxt||\r\n"
                        + " case when c.column_id=c.COLUMN_ID_MAX \r\n"
                        + " then ') row format DELIMITED FIELDS terminated by ''\\001'' stored as textfile location ''/tmp/imp/' ||u.USERNAME||'/'|| \r\n"
                        + " c.table_name || \r\n" + " ''';' \r\n" + " else '' \r\n" + " end ,\r\n" + " c.column_id,\r\n"
                        + " c.COLUMN_ID_MIN,\r\n" + " c.COLUMN_ID_MAX\r\n" + "from ( \r\n" + "select table_name,\r\n"
                        + "'`' || column_name || '` ' || \r\n" + " case data_type\r\n"
                        + " when 'NUMBER' then ' decimal(' || data_length || ',' || nvl(data_scale,0) || ') '\r\n"
                        + " when 'FLOAT' then ' decimal(' || data_length || ',' || nvl(data_scale,0) || ') '\r\n"
                        + " when 'VARCHAR2' then ' string '\r\n" + " when 'NVARCHAR2' then ' string '\r\n"
                        + " when 'DATE' then ' string '\r\n" + " when 'INTEGER' then ' string '\r\n"
                        + " when 'CHAR' then ' string '\r\n" + " when 'CLOB' then ' string '\r\n"
                        + " when 'NCLOB' then ' string '\r\n" + " when 'BLOB' then ' string '\r\n"
                        + " when 'LONG RAW' then ' string '\r\n" + " when 'UNDEFINED' then ' string '\r\n"
                        + " when 'LONG' then ' string '\r\n" + " when 'Bit' then ' string '\r\n"
                        + " when 'TIMESTAMP(6)' then ' string '\r\n" + " when 'Boolean' then ' string '\r\n"
                        + " end sqlTxt\r\n" + " ,COLUMN_ID\r\n"
                        + " ,min(COLUMN_ID)over(partition by table_name) as COLUMN_ID_MIN\r\n"
                        + " ,max(COLUMN_ID)over(partition by table_name) as COLUMN_ID_MAX\r\n"
                        + " from user_tab_columns \r\n" + " where table_name = '" + tableName
                        + "' --order by COLUMN_ID asc \r\n" + " )c \r\n" + "left join user_users u on 1=1 \r\n"
                        + "order by c.table_name,c.COLUMN_ID asc";
                mysqlrs = mysqlpstm.executeQuery(sql);
                while (mysqlrs.next()) {
                    sql1 = mysqlrs.getString(2);
                    sql2 = sql2 + sql1;
                }
            }
            }
            sql3 = "create database IF NOT EXISTS " + MYSQLUSERNAME1 + "_ex";
            hivepstm.execute(sql3);
            System.out.println("-------------------建外表,一共" + i + "个表表结构的语句为:" + sql2);
            hivepstm.execute(sql2);
            System.out.println("----------------------------------------建外表已结束!!!!!!!!!!!!!!!!!!!!!!!!!!!!!");
            hivepstm.close();
            mysqlpstm.close();
        } catch (SQLException e) {
            e.printStackTrace();
        } finally {
            MYSQLReleaseResource();
            HiveReleaseResource();
        }
    }

    public void CreateOrcTable() {
        mysqlconn = getMYSQLConnection();
        hiveconn = getHiveConnection();
        String sql0 = "dfs -du /tmp/imp/" + MYSQLUSERNAME.toUpperCase();
        int i = 0;
        try {
            hivepstm = hiveconn.createStatement();
            mysqlpstm = mysqlconn.createStatement();
            hivers = hivepstm.executeQuery(sql0);
            while (hivers.next()) {
                String hdfsspace = hivers.getString(1);
                String a[] = hdfsspace.split("/", 2);
                String size = a[0];
                String tableNameAndFloder = a[1];
                String ss[] = tableNameAndFloder.split("/");
                String tableName = ss[3];
                if(Long.parseLong(size.trim()) !=  0L){

                Long buckets = 0L;
                buckets = Long.parseLong(size.trim()) / (60 * 1024 * 1024) + 1;
                String sql_0 = "select column_name from user_tab_columns where table_name = '" + tableName.toUpperCase()
                        + "' and column_id = 1";
                mysqlrs = mysqlpstm.executeQuery(sql_0);
                String tableName1 = tableName.replaceAll("\\$", "");
                i = i + 1;
                while (mysqlrs.next()) {
                    String columnname = mysqlrs.getString(1);
                    String sql = "select c.table_name,\r\n" + " case when c.column_id=c.COLUMN_ID_MIN \r\n"
                            + " then 'create table if not exists " + MYSQLUSERNAME1 + "."+ tableName1 +"( ' \r\n"
                            + " else ', ' \r\n" + " end \r\n" + " ||c.sqltxt||\r\n"
                            + " case when c.column_id=c.COLUMN_ID_MAX \r\n" + " then ') clustered by (" + columnname
                            + ") into " + buckets + " buckets STORED AS ORC ;' \r\n" + " else '' \r\n" + " end ,\r\n"
                            + " c.column_id,\r\n" + " c.COLUMN_ID_MIN,\r\n" + " c.COLUMN_ID_MAX\r\n" + "from ( \r\n"
                            + "select table_name,\r\n" + " '`' || column_name || '` ' ||\r\n" + " case data_type\r\n"
                            + " when 'NUMBER' then ' decimal(' || data_length || ',' || nvl(data_scale,0) || ') '\r\n"
                            + " when 'FLOAT' then ' decimal(' || data_length || ',' || nvl(data_scale,0) || ') '\r\n"
                            + " when 'VARCHAR2' then ' string '\r\n" + " when 'NVARCHAR2' then ' string '\r\n"
                            + " when 'DATE' then ' date '\r\n" + " when 'INTEGER' then ' decimal(38,0)'\r\n"
                            + " when 'CHAR' then ' string '\r\n" + " when 'CLOB' then ' clob '\r\n"
                            + " when 'TIMESTAMP(6)' then ' timestamp '\r\n" + " when 'NCLOB' then ' clob '\r\n"
                            + " when 'LONG RAW' then ' string '\r\n" + " when 'LONG' then ' string '\r\n"
                            + " when 'UNDEFINED' then ' string '\r\n" + " when 'BLOB' then ' blob '\r\n"
                            + " when 'Bit' then ' boolean '\r\n" + " when 'Boolean' then ' boolean '\r\n"
                            + " end sqlTxt\r\n" + " ,COLUMN_ID\r\n"
                            + " ,min(COLUMN_ID)over(partition by table_name) as COLUMN_ID_MIN\r\n"
                            + " ,max(COLUMN_ID)over(partition by table_name) as COLUMN_ID_MAX\r\n"
                            + " from user_tab_columns \r\n" + " where table_name = '" + tableName
                            + "' --order by COLUMN_ID asc \r\n" + " )c \r\n" + "left join user_users u on 1=1 \r\n"
                            + "order by c.table_name,c.COLUMN_ID asc";

                    mysqlrs = mysqlpstm.executeQuery(sql);
                    while (mysqlrs.next()) {
                        sql4 = mysqlrs.getString(2);
                        sql5 = sql5 + sql4;
                    }
                }
            }
        }
            sql6 = "create database IF NOT EXISTS " + MYSQLUSERNAME1;
            System.out.println("-------------------建orc表库名的语句为:" + sql6);
            hivepstm.execute(sql6);
            System.out.println("-------------------建ORC表,一共" + i + "个表的表结构的语句为:" + sql5);
            hivepstm.execute(sql5);
            System.out.println("----------------------------------------建ORC表已结束!!!!!!!!!!!!!!!!!!!!!!!!!!!!!");
//          hivepstm.close();
//          oraclepstm.close();
        } catch (SQLException e) {
            e.printStackTrace();
        } finally {
            MYSQLReleaseResource();
            HiveReleaseResource();
        }
    }

    public void ImportDataToORC() {
        mysqlconn = getMYSQLConnection();
        hiveconn = getHiveConnection();
        String sql = "select table_name from user_tables where num_rows > 0 order by table_name asc";
        int i = 0;
        try {
            mysqlpstm = mysqlconn.createStatement();
            mysqlrs = mysqlpstm.executeQuery(sql);
            hivepstm = hiveconn.createStatement();
            while (mysqlrs.next()) {
                i = i + 1;
                String table_name = mysqlrs.getString("table_name").replaceAll("\\$", "");
                String sql7 = "insert into " + MYSQLUSERNAME1 + "." + table_name + " select * from " + MYSQLUSERNAME1
                        + "_ex." + table_name;
                System.out.println("现在插入第"+i+"个表:"+sql7);
                hivepstm.execute(sql7);
            }
        } catch (SQLException e) {
            e.printStackTrace();
        } finally {
            MYSQLReleaseResource();
            HiveReleaseResource();
        }
    }

    public Connection getMYSQLConnection() {
        try {
            Class.forName(MYSQLDRIVER);
            mysqlconn = DriverManager.getConnection(MYSQLURL, MYSQLUSERNAME, MYSQLPASSWORD);
        } catch (ClassNotFoundException e) {
            throw new RuntimeException("class not find !", e);
        } catch (SQLException e) {
            throw new RuntimeException("get connection error!", e);
        }

        return mysqlconn;
    }

    public void MYSQLReleaseResource() {
        if (mysqlrs != null) {
            try {
                mysqlrs.close();
            } catch (SQLException e) {
                e.printStackTrace();
            }
        }
        if (mysqlpstm != null) {
            try {
                mysqlpstm.close();
            } catch (SQLException e) {
                e.printStackTrace();
            }
        }
        if (mysqlconn != null) {
            try {
                mysqlconn.close();
            } catch (SQLException e) {
                e.printStackTrace();
            }
        }
    }

    public Connection getHiveConnection() {
        try {
            Class.forName(HIVEDRIVER);
            hiveconn = DriverManager.getConnection(HIVEURL, HIVEUSERNAME, HIVEPASSWORD);
        } catch (ClassNotFoundException e) {
            throw new RuntimeException("class not find !", e);
        } catch (SQLException e) {
            throw new RuntimeException("get connection error!", e);
        }
        return hiveconn;
    }

    public void HiveReleaseResource() {
        if (hivers != null) {
            try {
                hivers.close();
            } catch (SQLException e) {
                e.printStackTrace();
            }
        }
        if (hivepstm != null) {
            try {
                hivepstm.close();
            } catch (SQLException e) {
                e.printStackTrace();
            }
        }
        if (hiveconn != null) {
            try {
                hiveconn.close();
            } catch (SQLException e) {
                e.printStackTrace();
            }
        }
    }
}
相关实践学习
如何在云端创建MySQL数据库
开始实验后,系统会自动创建一台自建MySQL的 源数据库 ECS 实例和一台 目标数据库 RDS。
全面了解阿里云能为你做什么
阿里云在全球各地部署高效节能的绿色数据中心,利用清洁计算为万物互联的新世界提供源源不断的能源动力,目前开服的区域包括中国(华北、华东、华南、香港)、新加坡、美国(美东、美西)、欧洲、中东、澳大利亚、日本。目前阿里云的产品涵盖弹性计算、数据库、存储与CDN、分析与搜索、云通信、网络、管理与监控、应用服务、互联网中间件、移动服务、视频服务等。通过本课程,来了解阿里云能够为你的业务带来哪些帮助     相关的阿里云产品:云服务器ECS 云服务器 ECS(Elastic Compute Service)是一种弹性可伸缩的计算服务,助您降低 IT 成本,提升运维效率,使您更专注于核心业务创新。产品详情: https://www.aliyun.com/product/ecs
目录
相关文章
|
3月前
|
SQL 分布式计算 Hadoop
创建hive表并关联数据
创建hive表并关联数据
53 0
|
3月前
|
SQL 存储 分布式计算
【大数据技术Hadoop+Spark】Hive数据仓库架构、优缺点、数据模型介绍(图文解释 超详细)
【大数据技术Hadoop+Spark】Hive数据仓库架构、优缺点、数据模型介绍(图文解释 超详细)
829 0
|
11天前
|
SQL 物联网 数据处理
"颠覆传统,Hive SQL与Flink激情碰撞!解锁流批一体数据处理新纪元,让数据决策力瞬间爆表,你准备好了吗?"
【8月更文挑战第9天】数据时代,实时性和准确性至关重要。传统上,批处理与流处理各司其职,但Apache Flink打破了这一界限,尤其Flink与Hive SQL的结合,开创了流批一体的数据处理新时代。这不仅简化了数据处理流程,还极大提升了效率和灵活性。例如,通过Flink SQL,可以轻松实现流数据与批数据的融合分析,无需在两者间切换。这种融合不仅降低了技术门槛,还为企业提供了更强大的数据支持,无论是在金融、电商还是物联网领域,都将发挥巨大作用。
31 6
|
1天前
|
SQL 存储 监控
Hive 插入大量数据
【8月更文挑战第15天】
|
1月前
|
SQL 关系型数据库 MySQL
实时计算 Flink版操作报错合集之从mysql读数据写到hive报错,是什么原因
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
|
1月前
|
SQL DataWorks 监控
DataWorks产品使用合集之同步数据到Hive时,如何使用业务字段作为分区键
DataWorks作为一站式的数据开发与治理平台,提供了从数据采集、清洗、开发、调度、服务化、质量监控到安全管理的全套解决方案,帮助企业构建高效、规范、安全的大数据处理体系。以下是对DataWorks产品使用合集的概述,涵盖数据处理的各个环节。
|
2月前
|
SQL 分布式计算 HIVE
实时计算 Flink版产品使用问题之同步到Hudi的数据是否可以被Hive或Spark直接读取
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
2月前
|
SQL 分布式计算 NoSQL
使用Spark高效将数据从Hive写入Redis (功能最全)
使用Spark高效将数据从Hive写入Redis (功能最全)
155 1
|
2月前
|
SQL 关系型数据库 MySQL
基于Hive的天气情况大数据分析系统(通过hive进行大数据分析将分析的数据通过sqoop导入到mysql,通过Django基于mysql的数据做可视化)
基于Hive的天气情况大数据分析系统(通过hive进行大数据分析将分析的数据通过sqoop导入到mysql,通过Django基于mysql的数据做可视化)
|
3月前
|
SQL 数据采集 存储
Hive实战 —— 电商数据分析(全流程详解 真实数据)
关于基于小型数据的Hive数仓构建实战,目的是通过分析某零售企业的门店数据来进行业务洞察。内容涵盖了数据清洗、数据分析和Hive表的创建。项目需求包括客户画像、消费统计、资源利用率、特征人群定位和数据可视化。数据源包括Customer、Transaction、Store和Review四张表,涉及多个维度的聚合和分析,如按性别、国家统计客户、按时间段计算总收入等。项目执行需先下载数据和配置Zeppelin环境,然后通过Hive进行数据清洗、建表和分析。在建表过程中,涉及ODS、DWD、DWT、DWS和DM五层,每层都有其特定的任务和粒度。最后,通过Hive SQL进行各种业务指标的计算和分析。
558 1
Hive实战 —— 电商数据分析(全流程详解 真实数据)