代码实例
package com.epoint.com.mysql_mpp_full;
import java.io.FileNotFoundException;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
import java.sql.Statement;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.TimeZone;
public class AutoMysqltoMPP {
private static String MYSQLUSERNAME = "root";
private static String MYSQLPASSWORD = "Gepoint";
private static String MYSQLDRIVER = "com.mysql.jdbc.Driver";
private static String MYSQLURL = "jdbc:mysql://100.2.5.221:3307/dep_fr_db";
private static String MYSQLDATABASE = "dep_fr_db";
private static String MPPDRIVER = "com.MPP.jdbc.Driver";
private static String MPPURL = "jdbc:MPP://100.2.5.1:5258/dep_fr_db";
private static String MPPUSERNAME = "mpp";
private static String MPPPASSWORD = "h3c";
private static Connection mysqlconn = null;
private static Statement mysqlpstm = null;
private static ResultSet mysqlrs = null;
private static Connection mppconn = null;
private static Statement mppstm = null;
private static ResultSet mpprs = null;
String sql1 = " ";
String sql2 = " ";
String sql3 = " ";
String sql4 = " ";
String sql5 = " ";
String sql6 = " ";
public static void main(String[] args) throws Exception {
AutoMysqltoMPP aidth = new AutoMysqltoMPP();
aidth.getMYSQLConnection();
aidth.MYSQLReleaseResource();
aidth.getMPPConnection();
aidth.MPPReleaseResource();
aidth.CreateMPPTable();
System.out.println("表已创建完毕,赶紧去查看表吧!!");
SimpleDateFormat sdf = new SimpleDateFormat("HH:mm:ss:SS");
TimeZone t = sdf.getTimeZone();
t.setRawOffset(0);
sdf.setTimeZone(t);
Long startTime = System.currentTimeMillis();
mysqlconn = getMYSQLConnection();
mppconn = getMPPConnection();
mppstm = mppconn.createStatement();
mysqlpstm = mysqlconn.createStatement();
mppstm.execute("TRUNCATE table code_main");
aidth.tableInput();
Long endTime = System.currentTimeMillis();
System.out.println("用时:" + sdf.format(new Date(endTime - startTime)));
}
public void CreateMPPTable() {
mysqlconn = getMYSQLConnection();
mppconn = getMPPConnection();
try {
mppstm = mppconn.createStatement();
mysqlpstm = mysqlconn.createStatement();
int i = 0;
String sql = "SELECT table_schema\r\n" + " ,table_name\r\n" + " ,(\r\n" + " CASE \r\n"
+ " WHEN ORDINAL_POSITION = mincol\r\n" + " AND ORDINAL_POSITION < maxcol\r\n"
+ " THEN CONCAT (\"create table if not exists \"\r\n"
+ " ,table_schema\r\n" + " ,\".\"\r\n"
+ " ,table_name\r\n" + " ,\"(`\"\r\n"
+ " ,column_name\r\n" + " ,\"` \"\r\n"
+ " ,COLUMN_TYPE\r\n" + " ,\",\"\r\n"
+ " )\r\n" + " WHEN ORDINAL_POSITION = mincol\r\n"
+ " AND ORDINAL_POSITION = maxcol\r\n"
+ " THEN CONCAT (\"create table if not exists \"\r\n"
+ " ,table_schema\r\n" + " ,\".\"\r\n"
+ " ,table_name\r\n" + " ,\"(`\"\r\n"
+ " ,column_name\r\n" + " ,\"` \"\r\n"
+ " ,COLUMN_TYPE\r\n" + " ,\");\"\r\n"
+ " )\r\n" + " WHEN ORDINAL_POSITION > mincol\r\n"
+ " AND ORDINAL_POSITION < maxcol\r\n" + " THEN CONCAT (\r\n"
+ " \"`\"\r\n" + " ,column_name\r\n"
+ " ,\"` \"\r\n" + " ,COLUMN_TYPE\r\n"
+ " ,\",\"\r\n" + " )\r\n"
+ " WHEN ORDINAL_POSITION = maxcol\r\n" + " THEN CONCAT (\r\n"
+ " \"`\"\r\n" + " ,column_name\r\n"
+ " ,\"` \"\r\n" + " ,COLUMN_TYPE\r\n"
+ " ,\");\"\r\n" + " )\r\n" + " END\r\n"
+ " ) AS statement\r\n" + " ,ORDINAL_POSITION\r\n" + " ,maxcol\r\n" + " ,mincol\r\n"
+ "FROM (\r\n" + " SELECT b.table_schema,b.table_name,b.ORDINAL_POSITION,b.column_name,\r\n"
+ " (case\r\n" + " when column_type = 'timestamp' then 'datetime'\r\n"
+ " when column_type = 'bit(1)' then 'int(1)'\r\n" + " else\r\n" + " column_type\r\n"
+ " end ) AS column_type\r\n" + " ,a.maxcol\r\n" + " ,a.mincol\r\n" + " FROM (\r\n"
+ " SELECT table_schema\r\n" + " ,table_name\r\n"
+ " ,max(ORDINAL_POSITION) maxcol\r\n" + " ,min(ORDINAL_POSITION) mincol\r\n"
+ " FROM information_schema.COLUMNS\r\n" + " GROUP BY table_schema\r\n"
+ " ,table_name\r\n" + " ) a\r\n" + " JOIN (\r\n" + " SELECT table_schema\r\n"
+ " ,table_name\r\n" + " ,ORDINAL_POSITION\r\n" + " ,column_name\r\n"
+ " ,COLUMN_TYPE\r\n" + " FROM information_schema.COLUMNS\r\n"
+ " ORDER BY table_schema\r\n" + " ,table_name\r\n"
+ " ,ORDINAL_POSITION ASC\r\n" + " ) b ON a.table_schema = b.table_schema\r\n"
+ " AND a.table_name = b.table_name\r\n" + " ) c\r\n" + "WHERE table_schema = '"
+ MYSQLDATABASE + "'";
mysqlrs = mysqlpstm.executeQuery(sql);
while (mysqlrs.next()) {
sql1 = mysqlrs.getString(3);
sql2 = sql2 + sql1;
}
sql3 = "create database IF NOT EXISTS " + MYSQLDATABASE;
mppstm.execute(sql3);
System.out.println("-------------------建mpp表,表结构的语句为:" + sql2);
String[] sqls = sql2.split(";");
for (String m : sqls) {
mppstm.execute(m);
}
System.out.println("----------------------------------------建mpp表已结束!!!!!!!!!!!!!!!!!!!!!!!!!!!!!");
mppstm.close();
mysqlpstm.close();
} catch (SQLException e) {
e.printStackTrace();
} finally {
MYSQLReleaseResource();
MPPReleaseResource();
}
}
public void ImportDataToMPP() {
mysqlconn = getMYSQLConnection();
mppconn = getMPPConnection();
String sql = "select table_name from user_tables where num_rows > 0 order by table_name asc";
int i = 0;
try {
mysqlpstm = mysqlconn.createStatement();
mysqlrs = mysqlpstm.executeQuery(sql);
mppstm = mppconn.createStatement();
while (mysqlrs.next()) {
i = i + 1;
String table_name = mysqlrs.getString("table_name").replaceAll("\\$", "");
String sql7 = "insert into " + MYSQLDATABASE + "." + table_name + " select * from " + MYSQLDATABASE
+ "_ex." + table_name;
System.out.println("现在插入第" + i + "个表:" + sql7);
mppstm.execute(sql7);
}
} catch (SQLException e) {
e.printStackTrace();
} finally {
MYSQLReleaseResource();
MPPReleaseResource();
}
}
public static List<List<String>> tableInput() throws FileNotFoundException, SQLException {
List<List<String>> FindList = new ArrayList<List<String>>();
mysqlconn = getMYSQLConnection();
mppconn = getMPPConnection();
mppstm = mppconn.createStatement();
mysqlpstm = mysqlconn.createStatement();
PreparedStatement pre = null;
ResultSet resultSet = null;
String sql = "SELECT CODEID,CODENAME,LEVNUM,CATEGORYNUM,description,isfromsoa FROM code_main";
try {
pre = mysqlconn.prepareStatement(sql);
resultSet = pre.executeQuery();
String[] columu = { "CODEID","CODENAME","LEVNUM","CATEGORYNUM","description","isfromsoa"};
int i = 0;
while (resultSet.next()) {
List<String> minList = new ArrayList<String>();
for (String each : columu) {
minList.add(resultSet.getString(each));
}
FindList.add(minList);
i++;
if (i % 10000 == 0) {
executeManySql(FindList);
FindList.removeAll(FindList);
System.out.println(i);
}
}
executeManySql(FindList);
return FindList;
} catch (SQLException e) {
e.printStackTrace();
} finally {
try {
pre.close();
} catch (SQLException e) {
e.printStackTrace();
}
try {
mppstm.close();
mysqlpstm.close();
} catch (SQLException e) {
e.printStackTrace();
}
}
return null;
}
public static void executeManySql(List<List<String>> FindList) throws SQLException {
mysqlconn = getMYSQLConnection();
mppconn = getMPPConnection();
mppstm = mppconn.createStatement();
mysqlpstm = mysqlconn.createStatement();
mppconn.setAutoCommit(false);
Statement stat = null;
PreparedStatement pst = (PreparedStatement) mppconn
.prepareStatement("insert into code_main values (?,?,?,?,?,?)");
for (List<String> minList : FindList) {
for (int i = 0; i < minList.size(); i++) {
pst.setString(i + 1, minList.get(i));
}
pst.addBatch();
}
pst.executeBatch();
mppconn.commit();
pst.close();
mppstm.close();
mysqlpstm.close();
}
public static Connection getMYSQLConnection() {
try {
Class.forName(MYSQLDRIVER);
mysqlconn = DriverManager.getConnection(MYSQLURL, MYSQLUSERNAME, MYSQLPASSWORD);
} catch (ClassNotFoundException e) {
throw new RuntimeException("class not find !", e);
} catch (SQLException e) {
throw new RuntimeException("get connection error!", e);
}
return mysqlconn;
}
public void MYSQLReleaseResource() {
if (mysqlrs != null) {
try {
mysqlrs.close();
} catch (SQLException e) {
e.printStackTrace();
}
}
if (mysqlpstm != null) {
try {
mysqlpstm.close();
} catch (SQLException e) {
e.printStackTrace();
}
}
if (mysqlconn != null) {
try {
mysqlconn.close();
} catch (SQLException e) {
e.printStackTrace();
}
}
}
public static Connection getMPPConnection() {
try {
Class.forName(MPPDRIVER);
mppconn = DriverManager.getConnection(MPPURL, MPPUSERNAME, MPPPASSWORD);
} catch (ClassNotFoundException e) {
throw new RuntimeException("class not find !", e);
} catch (SQLException e) {
throw new RuntimeException("get connection error!", e);
}
return mppconn;
}
public void MPPReleaseResource() {
if (mpprs != null) {
try {
mpprs.close();
} catch (SQLException e) {
e.printStackTrace();
}
}
if (mppstm != null) {
try {
mppstm.close();
} catch (SQLException e) {
e.printStackTrace();
}
}
if (mppconn != null) {
try {
mppconn.close();
} catch (SQLException e) {
e.printStackTrace();
}
}
}
}