package epoint.mppdb_01.h3c
import java.io.File
import java.io.FileInputStream
import java.io.FileOutputStream
import java.io.InputStream
import java.io.OutputStream
import java.net.URI
import java.sql.Blob
import java.sql.Connection
import java.sql.DriverManager
import java.sql.ResultSet
import java.sql.Statement
import org.apache.commons.net.ftp.FTPClient
import org.apache.commons.net.ftp.FTPReply
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.FSDataInputStream
import org.apache.hadoop.fs.FileSystem
import org.apache.hadoop.fs.Path
import org.apache.hadoop.io.IOUtils
public class MySQLblobToMPPphoto {
// MySQL连接
public static Connection getMySQLConnection() throws Exception {
String MySQLDRIVER = "com.mysql.jdbc.Driver"
String MySQLURL = "jdbc:mysql://192.168.186.13:3306/bigdata_scene03_rktj"
String MySQLUSERNAME = "root"
String MySQLPASSWORD = "Gepoint"
Connection MySQLconn = DriverManager.getConnection(MySQLURL, MySQLUSERNAME, MySQLPASSWORD)
return MySQLconn
}
// MPP连接
public static Connection getMPPConnection() throws Exception {
String MPPDRIVER = "com.MPP.jdbc.Driver"
String MPPURL = "jdbc:MPP://192.168.186.14:5258/bigdata_scene03_rktj"
String MPPUSERNAME = "mpp"
String MPPPASSWORD = "h3c"
Connection MPPconn = DriverManager.getConnection(MPPURL, MPPUSERNAME, MPPPASSWORD)
return MPPconn
}
//
public static void getMySQLblobToHDFS() throws Exception {
Connection conn = getMySQLConnection()
ResultSet rs = null
try {
String sql = "select ROW_ID,photo from t_rk_baseinfo_blob limit 10"
Statement prest = conn.prepareStatement(sql)
rs = prest.executeQuery(sql)
while (rs.next()) {
int row_id = rs.getInt(1)
Blob photo = rs.getBlob(2)
System.out.println(row_id + " " + photo)
InputStream in = photo.getBinaryStream()
OutputStream out = new FileOutputStream("H:/photo/" + row_id + ".jpg")
int len = 0
byte[] buffer = new byte[1024]
while ((len = in.read(buffer)) != -1) {
out.write(buffer, 0, len)
}
upload("H:/photo/" + row_id + ".jpg")
}
prest.close()
rs.close()
} catch (Exception e) {
e.printStackTrace()
} finally {
// 关闭连接
if (conn != null) {
try {
conn.close()
conn = null
} catch (Exception e) {
e.printStackTrace()
}
}
}
}
public static void main(String[] args) throws Exception {
getMySQLblobToHDFS()
}
// HDFS附件上传
public static void upload(String uploadpath) throws Exception {
Configuration conf = new Configuration()
URI uri = new URI("hdfs://192.168.186.14:8020")
FileSystem fs = FileSystem.get(uri, conf, "HDFS")
Path resP = new Path(uploadpath)
Path destP = new Path("/photo")
if (!fs.exists(destP)) {
fs.mkdirs(destP)
}
fs.copyFromLocalFile(resP, destP)
fs.close()
System.out.println("***********************")
System.out.println("上传成功!")
}
// HDFS附件下载
public static void download() throws Exception {
Configuration conf = new Configuration()
String dest = "hdfs://192.168.186.14:/photo/11.png"
String local = "D://11.png"
FileSystem fs = FileSystem.get(URI.create(dest), conf, "hdfs")
FSDataInputStream fsdi = fs.open(new Path(dest))
OutputStream output = new FileOutputStream(local)
IOUtils.copyBytes(fsdi, output, 4096, true)
System.out.println("***********************")
System.out.println("下载成功!")
}
}