mysql全表抽取到MPP

本文涉及的产品
云数据库 RDS MySQL,集群系列 2核4GB
推荐场景:
搭建个人博客
阿里云百炼推荐规格 ADB PostgreSQL,4核16GB 100GB 1个月
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
简介: 代码实例package com.epoint.com.mysql_mpp_full;import java.io.FileNotFoundException;import java.

代码实例

package com.epoint.com.mysql_mpp_full;

import java.io.FileNotFoundException;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
import java.sql.Statement;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.TimeZone;

public class AutoMysqltoMPP {
    private static String MYSQLUSERNAME = "root";
    private static String MYSQLPASSWORD = "Gepoint";
    private static String MYSQLDRIVER = "com.mysql.jdbc.Driver";
    private static String MYSQLURL = "jdbc:mysql://100.2.5.221:3307/dep_fr_db";
    private static String MYSQLDATABASE = "dep_fr_db";

    private static String MPPDRIVER = "com.MPP.jdbc.Driver";
    private static String MPPURL = "jdbc:MPP://100.2.5.1:5258/dep_fr_db";
    private static String MPPUSERNAME = "mpp";
    private static String MPPPASSWORD = "h3c";

    private static Connection mysqlconn = null;
    private static Statement mysqlpstm = null;
    private static ResultSet mysqlrs = null;

    private static Connection mppconn = null;
    private static Statement mppstm = null;
    private static ResultSet mpprs = null;

    String sql1 = " ";
    String sql2 = " ";
    String sql3 = " ";
    String sql4 = " ";
    String sql5 = " ";
    String sql6 = " ";

    public static void main(String[] args) throws Exception {
        AutoMysqltoMPP aidth = new AutoMysqltoMPP();
        aidth.getMYSQLConnection();
        aidth.MYSQLReleaseResource();
        aidth.getMPPConnection();
        aidth.MPPReleaseResource();
        aidth.CreateMPPTable();
        // aidth.ImportDataToMPP();
        System.out.println("表已创建完毕,赶紧去查看表吧!!");

        SimpleDateFormat sdf = new SimpleDateFormat("HH:mm:ss:SS");
        TimeZone t = sdf.getTimeZone();
        t.setRawOffset(0);
        sdf.setTimeZone(t);
        Long startTime = System.currentTimeMillis();
        // 此段为要放置测取时间的函数
        mysqlconn = getMYSQLConnection();
        mppconn = getMPPConnection();
        mppstm = mppconn.createStatement();
        mysqlpstm = mysqlconn.createStatement();
        mppstm.execute("TRUNCATE table code_main");
        aidth.tableInput();

        Long endTime = System.currentTimeMillis();
        System.out.println("用时:" + sdf.format(new Date(endTime - startTime)));
    }

    public void CreateMPPTable() {
        mysqlconn = getMYSQLConnection();
        mppconn = getMPPConnection();
        try {
            mppstm = mppconn.createStatement();
            mysqlpstm = mysqlconn.createStatement();
            int i = 0;
            String sql = "SELECT table_schema\r\n" + "  ,table_name\r\n" + "    ,(\r\n" + "     CASE \r\n"
                    + "         WHEN ORDINAL_POSITION = mincol\r\n" + "             AND ORDINAL_POSITION < maxcol\r\n"
                    + "             THEN CONCAT (\"create  table if not exists \"\r\n"
                    + "                     ,table_schema\r\n" + "                      ,\".\"\r\n"
                    + "                     ,table_name\r\n" + "                        ,\"(`\"\r\n"
                    + "                     ,column_name\r\n" + "                       ,\"` \"\r\n"
                    + "                     ,COLUMN_TYPE\r\n" + "                       ,\",\"\r\n"
                    + "                     )\r\n" + "          WHEN ORDINAL_POSITION = mincol\r\n"
                    + "             AND ORDINAL_POSITION = maxcol\r\n"
                    + "             THEN CONCAT (\"create  table if not exists \"\r\n"
                    + "                     ,table_schema\r\n" + "                      ,\".\"\r\n"
                    + "                     ,table_name\r\n" + "                        ,\"(`\"\r\n"
                    + "                     ,column_name\r\n" + "                       ,\"` \"\r\n"
                    + "                     ,COLUMN_TYPE\r\n" + "                       ,\");\"\r\n"
                    + "                     )\r\n" + "          WHEN ORDINAL_POSITION > mincol\r\n"
                    + "             AND ORDINAL_POSITION < maxcol\r\n" + "              THEN CONCAT (\r\n"
                    + "                     \"`\"\r\n" + "                      ,column_name\r\n"
                    + "                     ,\"` \"\r\n" + "                        ,COLUMN_TYPE\r\n"
                    + "                     ,\",\"\r\n" + "                     )\r\n"
                    + "         WHEN ORDINAL_POSITION = maxcol\r\n" + "             THEN CONCAT (\r\n"
                    + "                     \"`\"\r\n" + "                      ,column_name\r\n"
                    + "                     ,\"` \"\r\n" + "                        ,COLUMN_TYPE\r\n"
                    + "                     ,\");\"\r\n" + "                        )\r\n" + "          END\r\n"
                    + "     ) AS statement\r\n" + " ,ORDINAL_POSITION\r\n" + "  ,maxcol\r\n" + "    ,mincol\r\n"
                    + "FROM (\r\n" + "  SELECT b.table_schema,b.table_name,b.ORDINAL_POSITION,b.column_name,\r\n"
                    + " (case\r\n" + "  when column_type = 'timestamp' then 'datetime'\r\n"
                    + " when column_type = 'bit(1)' then 'int(1)'\r\n" + "  else\r\n" + "       column_type\r\n"
                    + " end ) AS column_type\r\n" + "       ,a.maxcol\r\n" + "      ,a.mincol\r\n" + "  FROM (\r\n"
                    + "     SELECT table_schema\r\n" + "            ,table_name\r\n"
                    + "         ,max(ORDINAL_POSITION) maxcol\r\n" + "          ,min(ORDINAL_POSITION) mincol\r\n"
                    + "     FROM information_schema.COLUMNS\r\n" + "        GROUP BY table_schema\r\n"
                    + "         ,table_name\r\n" + "        ) a\r\n" + "    JOIN (\r\n" + "     SELECT table_schema\r\n"
                    + "         ,table_name\r\n" + "            ,ORDINAL_POSITION\r\n" + "          ,column_name\r\n"
                    + "         ,COLUMN_TYPE\r\n" + "       FROM information_schema.COLUMNS\r\n"
                    + "     ORDER BY table_schema\r\n" + "          ,table_name\r\n"
                    + "         ,ORDINAL_POSITION ASC\r\n" + "      ) b ON a.table_schema = b.table_schema\r\n"
                    + "     AND a.table_name = b.table_name\r\n" + "    ) c\r\n" + "WHERE table_schema = '"
                    + MYSQLDATABASE + "'";
            mysqlrs = mysqlpstm.executeQuery(sql);
            while (mysqlrs.next()) {
                sql1 = mysqlrs.getString(3);
                sql2 = sql2 + sql1;
            }

            sql3 = "create database IF NOT EXISTS " + MYSQLDATABASE;
            mppstm.execute(sql3);
            System.out.println("-------------------建mpp表,表结构的语句为:" + sql2);
            String[] sqls = sql2.split(";");
            for (String m : sqls) {
                mppstm.execute(m);
            }
            System.out.println("----------------------------------------建mpp表已结束!!!!!!!!!!!!!!!!!!!!!!!!!!!!!");
            mppstm.close();
            mysqlpstm.close();
        } catch (SQLException e) {
            e.printStackTrace();
        } finally {
            MYSQLReleaseResource();
            MPPReleaseResource();
        }
    }

    public void ImportDataToMPP() {
        mysqlconn = getMYSQLConnection();
        mppconn = getMPPConnection();
        String sql = "select table_name from user_tables where num_rows > 0 order by table_name asc";
        int i = 0;
        try {
            mysqlpstm = mysqlconn.createStatement();
            mysqlrs = mysqlpstm.executeQuery(sql);
            mppstm = mppconn.createStatement();
            while (mysqlrs.next()) {
                i = i + 1;
                String table_name = mysqlrs.getString("table_name").replaceAll("\\$", "");
                String sql7 = "insert into " + MYSQLDATABASE + "." + table_name + " select * from " + MYSQLDATABASE
                        + "_ex." + table_name;
                System.out.println("现在插入第" + i + "个表:" + sql7);
                mppstm.execute(sql7);
            }
        } catch (SQLException e) {
            e.printStackTrace();
        } finally {
            MYSQLReleaseResource();
            MPPReleaseResource();
        }
    }

    public static List<List<String>> tableInput() throws FileNotFoundException, SQLException {
        List<List<String>> FindList = new ArrayList<List<String>>();
        mysqlconn = getMYSQLConnection();
        mppconn = getMPPConnection();
        mppstm = mppconn.createStatement();
        mysqlpstm = mysqlconn.createStatement();
        PreparedStatement pre = null;
        ResultSet resultSet = null;
        String sql = "SELECT CODEID,CODENAME,LEVNUM,CATEGORYNUM,description,isfromsoa  FROM code_main";
        try {
            pre = mysqlconn.prepareStatement(sql);
            resultSet = pre.executeQuery();
            String[] columu = { "CODEID","CODENAME","LEVNUM","CATEGORYNUM","description","isfromsoa"};
            int i = 0;
            while (resultSet.next()) {
                List<String> minList = new ArrayList<String>();
                for (String each : columu) {
                    minList.add(resultSet.getString(each));
                }
                FindList.add(minList);
                i++;
                if (i % 10000 == 0) { // 设置的每次提交大小为10000
                    executeManySql(FindList);
                    FindList.removeAll(FindList);
                    System.out.println(i);
                }
            }
            executeManySql(FindList);// 最后别忘了提交剩余的
            return FindList;
        } catch (SQLException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        } finally {
            try {
                pre.close();// 关闭Statement
            } catch (SQLException e) {
                e.printStackTrace();
            }
            try {
                mppstm.close();
                mysqlpstm.close();
            } catch (SQLException e) {
                e.printStackTrace();
            }
        }
        return null;
    }

//executeManySql(FindList)函数如下,因为数据量比较大,所以我设置的每次提交大小为10000,这样就不会内存溢出了。

    public static void executeManySql(List<List<String>> FindList) throws SQLException {
        mysqlconn = getMYSQLConnection();
        mppconn = getMPPConnection();
        mppstm = mppconn.createStatement();
        mysqlpstm = mysqlconn.createStatement();
        mppconn.setAutoCommit(false);
        Statement stat = null;

        PreparedStatement pst = (PreparedStatement) mppconn
                .prepareStatement("insert into code_main values (?,?,?,?,?,?)");
        for (List<String> minList : FindList) {
            for (int i = 0; i < minList.size(); i++) {
                pst.setString(i + 1, minList.get(i));
            }
            // 把一个SQL命令加入命令列表
            pst.addBatch();
        }
        // 执行批量更新
        pst.executeBatch();
        // 语句执行完毕,提交本事务
        mppconn.commit();
        pst.close();
        mppstm.close();
        mysqlpstm.close();// 一定要记住关闭连接,不然mysql回应为too many connection自我保护而断开。
    }//同时我还设置了计时的函数,可以看到这个从数据抽取到完成数据迁移的时间。

    public static Connection getMYSQLConnection() {
        try {
            Class.forName(MYSQLDRIVER);
            mysqlconn = DriverManager.getConnection(MYSQLURL, MYSQLUSERNAME, MYSQLPASSWORD);
        } catch (ClassNotFoundException e) {
            throw new RuntimeException("class not find !", e);
        } catch (SQLException e) {
            throw new RuntimeException("get connection error!", e);
        }

        return mysqlconn;
    }

    public void MYSQLReleaseResource() {
        if (mysqlrs != null) {
            try {
                mysqlrs.close();
            } catch (SQLException e) {
                e.printStackTrace();
            }
        }
        if (mysqlpstm != null) {
            try {
                mysqlpstm.close();
            } catch (SQLException e) {
                e.printStackTrace();
            }
        }
        if (mysqlconn != null) {
            try {
                mysqlconn.close();
            } catch (SQLException e) {
                e.printStackTrace();
            }
        }
    }

    public static Connection getMPPConnection() {
        try {
            Class.forName(MPPDRIVER);
            mppconn = DriverManager.getConnection(MPPURL, MPPUSERNAME, MPPPASSWORD);
        } catch (ClassNotFoundException e) {
            throw new RuntimeException("class not find !", e);
        } catch (SQLException e) {
            throw new RuntimeException("get connection error!", e);
        }
        return mppconn;
    }

    public void MPPReleaseResource() {
        if (mpprs != null) {
            try {
                mpprs.close();
            } catch (SQLException e) {
                e.printStackTrace();
            }
        }
        if (mppstm != null) {
            try {
                mppstm.close();
            } catch (SQLException e) {
                e.printStackTrace();
            }
        }
        if (mppconn != null) {
            try {
                mppconn.close();
            } catch (SQLException e) {
                e.printStackTrace();
            }
        }
    }
}
相关实践学习
阿里云百炼xAnalyticDB PostgreSQL构建AIGC应用
通过该实验体验在阿里云百炼中构建企业专属知识库构建及应用全流程。同时体验使用ADB-PG向量检索引擎提供专属安全存储,保障企业数据隐私安全。
AnalyticDB PostgreSQL 企业智能数据中台:一站式管理数据服务资产
企业在数据仓库之上可构建丰富的数据服务用以支持数据应用及业务场景;ADB PG推出全新企业智能数据平台,用以帮助用户一站式的管理企业数据服务资产,包括创建, 管理,探索, 监控等; 助力企业在现有平台之上快速构建起数据服务资产体系
目录
相关文章
|
8月前
|
关系型数据库 MySQL Apache
**ADB MySQL湖仓版能够平滑迁移到湖仓**,阿里云提供了相应的迁移工具和服务来简化这一过程。
**ADB MySQL湖仓版能够平滑迁移到湖仓**,阿里云提供了相应的迁移工具和服务来简化这一过程。
353 2
|
关系型数据库 MySQL Linux
ADB MySQL湖仓版在Linux和Windows上的性能差异
ADB MySQL湖仓版在Linux和Windows上的性能差异
124 1
|
数据采集 JSON 关系型数据库
将 MySQL 数据抽取并写入 DataHub,您可以按照以下步骤进行
将 MySQL 数据抽取并写入 DataHub,您可以按照以下步骤进行
585 2
|
4月前
|
关系型数据库 MySQL Go
go抽取mysql配置到yaml配置文件
go抽取mysql配置到yaml配置文件
|
8月前
|
Cloud Native 关系型数据库 MySQL
云原生数据仓库产品使用合集之ADB MySQL湖仓版和 StarRocks 的使用场景区别,或者 ADB 对比 StarRocks 的优劣势
阿里云AnalyticDB提供了全面的数据导入、查询分析、数据管理、运维监控等功能,并通过扩展功能支持与AI平台集成、跨地域复制与联邦查询等高级应用场景,为企业构建实时、高效、可扩展的数据仓库解决方案。以下是对AnalyticDB产品使用合集的概述,包括数据导入、查询分析、数据管理、运维监控、扩展功能等方面。
|
8月前
|
Cloud Native 关系型数据库 MySQL
云原生数据仓库产品使用合集之如何使用ADB MySQL湖仓版声纹特征提取服务
阿里云AnalyticDB提供了全面的数据导入、查询分析、数据管理、运维监控等功能,并通过扩展功能支持与AI平台集成、跨地域复制与联邦查询等高级应用场景,为企业构建实时、高效、可扩展的数据仓库解决方案。以下是对AnalyticDB产品使用合集的概述,包括数据导入、查询分析、数据管理、运维监控、扩展功能等方面。
|
7月前
|
Cloud Native 关系型数据库 MySQL
云原生数据仓库AnalyticDB产品使用合集之如何修改云ADB MySQL版的默认LIMIT
阿里云AnalyticDB提供了全面的数据导入、查询分析、数据管理、运维监控等功能,并通过扩展功能支持与AI平台集成、跨地域复制与联邦查询等高级应用场景,为企业构建实时、高效、可扩展的数据仓库解决方案。以下是对AnalyticDB产品使用合集的概述,包括数据导入、查询分析、数据管理、运维监控、扩展功能等方面。
104 21
|
7月前
|
缓存 DataWorks 关系型数据库
DataWorks产品使用合集之如何抽取MySQL视图数据
DataWorks作为一站式的数据开发与治理平台,提供了从数据采集、清洗、开发、调度、服务化、质量监控到安全管理的全套解决方案,帮助企业构建高效、规范、安全的大数据处理体系。以下是对DataWorks产品使用合集的概述,涵盖数据处理的各个环节。
110 4
|
7月前
|
分布式计算 大数据 关系型数据库
MaxCompute产品使用问题之如何查看数据离线同步每天从MySQL抽取的数据量
MaxCompute作为一款全面的大数据处理平台,广泛应用于各类大数据分析、数据挖掘、BI及机器学习场景。掌握其核心功能、熟练操作流程、遵循最佳实践,可以帮助用户高效、安全地管理和利用海量数据。以下是一个关于MaxCompute产品使用的合集,涵盖了其核心功能、应用场景、操作流程以及最佳实践等内容。
|
6月前
|
Cloud Native 关系型数据库 MySQL
云原生数据仓库使用问题之如何将ADB中的数据导出到自建的MySQL数据库
阿里云AnalyticDB提供了全面的数据导入、查询分析、数据管理、运维监控等功能,并通过扩展功能支持与AI平台集成、跨地域复制与联邦查询等高级应用场景,为企业构建实时、高效、可扩展的数据仓库解决方案。以下是对AnalyticDB产品使用合集的概述,包括数据导入、查询分析、数据管理、运维监控、扩展功能等方面。