公司有个需求,为了减少运维人员的工作量,需要开发一个远程部署flume的工具。我这里使用比较方便安全的SFTP协议,前提是需要客户端安装有SSH。Unix、linux、aix系统基本默认安装,windows的要自行安装。
其实在windows系统中还可以使用SMB文件共享协议来实现文件传输,还有比较通用的FTP协议、Telnet协议,但这里我们选用比较安全,操作方便的SFTP协议,其他的,我在下面也贴出来。
首先下载依赖jar包:jsch-0.1.52.jar。官网:http://www.jcraft.com/jsch/
然后编写工具类,具体看代码:
package com.mysite.stfp; import java.io.BufferedReader; import java.io.IOException; import java.io.InputStreamReader; import java.util.Map; import java.util.Properties; import java.util.Vector; import org.apache.log4j.Logger; import com.jcraft.jsch.Channel; import com.jcraft.jsch.ChannelExec; import com.jcraft.jsch.ChannelSftp; import com.jcraft.jsch.ChannelSftp.LsEntry; import com.jcraft.jsch.JSch; import com.jcraft.jsch.JSchException; import com.jcraft.jsch.Session; import com.jcraft.jsch.SftpException; public class SFTPChannel { private static final Logger LOG = Logger.getLogger(SFTPChannel.class.getName()); private Session session = null; private ChannelSftp sftp = null; private ChannelExec exec = null; public SFTPChannel(String host, int port, String userName, String password, int timeout) { try { JSch jsch = new JSch(); // 创建JSch对象 session = jsch.getSession(userName, host, port);// 根据用户名,主机ip,端口获取一个Session对象 LOG.debug("Session created."); if (password != null) { session.setPassword(password); // 设置密码 } Properties config = new Properties(); config.put("StrictHostKeyChecking", "no"); session.setConfig(config); // 为Session对象设置properties session.setTimeout(timeout); // 设置timeout时间 session.connect(); // 通过Session建立链接 LOG.debug("Session connected."); } catch (JSchException e) { e.printStackTrace(); } } public SFTPChannel(Map<String, String> map, int timeout) { try { String host = map.get("host").toString(); int port = Integer.parseInt(map.get("port") + ""); String userName = map.get("userName").toString(); String password = map.get("password").toString(); JSch jsch = new JSch(); // 创建JSch对象 session = jsch.getSession(userName, host, port);// 根据用户名,主机ip,端口获取一个Session对象 LOG.debug("Session created."); if (password != null) { session.setPassword(password); // 设置密码 } Properties config = new Properties(); config.put("StrictHostKeyChecking", "no"); session.setConfig(config); // 为Session对象设置properties session.setTimeout(timeout); // 设置timeout时间 session.connect(); // 通过Session建立链接 LOG.debug("Session connected."); } catch (JSchException e) { e.printStackTrace(); } } /** * 打开SFTP通道 */ public ChannelSftp getSftp() { LOG.debug("Opening Channel."); try { Channel channel = session.openChannel("sftp");// 打开SFTP通道 channel.connect(); // 建立SFTP通道的连接 sftp = (ChannelSftp) channel; } catch (JSchException e) { e.printStackTrace(); } return sftp; } /** * 打开exec通道 */ public ChannelExec getExec() { LOG.debug("Opening Channel."); try { Channel channel = session.openChannel("exec");// 打开SFTP通道 channel.setInputStream(null); channel.connect(); // 建立SFTP通道的连接 exec = (ChannelExec) channel; } catch (JSchException e) { e.printStackTrace(); } return exec; } public void execCmd(String command) throws JSchException { BufferedReader reader = null; Channel channel = null; try { channel = session.openChannel("exec"); ((ChannelExec) channel).setCommand(command); channel.setInputStream(null); ((ChannelExec) channel).setErrStream(System.err); channel.connect(); reader = new BufferedReader(new InputStreamReader(channel.getInputStream())); String buf = null; while ((buf = reader.readLine()) != null) { System.out.println(buf); } } catch (IOException e) { e.printStackTrace(); } catch (JSchException e) { e.printStackTrace(); } finally { try { reader.close(); } catch (IOException e) { e.printStackTrace(); } channel.disconnect(); } } public void pwd() throws SftpException { LOG.info(sftp.pwd()); } public void ls() throws SftpException { ls("."); } public void ls(String path) throws SftpException { Vector<?> vector = sftp.ls(path); for (Object object : vector) { if (object instanceof LsEntry) { LsEntry entry = LsEntry.class.cast(object); LOG.info(entry.getFilename()); } } } public void closeChannel() { if (sftp != null){ sftp.quit(); sftp.disconnect(); } if (exec != null) exec.disconnect(); if (session != null) { session.disconnect(); } } public void closeSftp() { if (sftp != null){ sftp.quit(); sftp.disconnect(); } } public void closeExec() { if (exec != null) exec.disconnect(); } }
然后是调用方法:
@RequestMapping(value = "/sftp.do") public String uploadBySftp(HttpServletRequest request, HttpServletResponse response) { String result = "handle failed!"; SFTPChannel channel = null; try { if (logger.isDebugEnabled()) { logger.debug("execute uploadBySftp method..."); } Map<String, String> params = getParas(request); String path = params.get("path").toString(); String os = params.get("os").toString(); String userName = params.get("userName").toString(); params.put("userName", decoder(userName)); String password = params.get("password").toString(); params.put("password", decoder(password)); channel = new SFTPChannel(params, 60000); ChannelSftp chSftp = channel.getSftp(); try { channel.ls(path); } catch (Exception e) { chSftp.mkdir(path); } File file = new File(src); long fileSize = file.length(); chSftp.put(src, path, new FileProgressMonitor(fileSize), ChannelSftp.OVERWRITE); String fileName = file.getName(); if (file.isFile() && (fileName.endsWith(".zip") || fileName.endsWith(".ZIP"))) channel.execCmd("unzip -o " + path + fileName + " -d " + path); else if (file.isFile() && (fileName.endsWith(".tar") || fileName.endsWith(".tar.gz"))) channel.execCmd("tar -zxvf " + path + fileName); chSftp.rm(path + file.getName()); if (!"windows".equals(os)) { channel.execCmd(path + "flume/bin/flume.sh"); } else { channel.execCmd(path + "flume\\bin\\start.bat"); } result = "success"; return result; } catch (SftpException e) { logger.error("has a error:{}", e.getMessage()); e.printStackTrace(); result = "file create failed"; return result; } catch (JSchException e) { logger.error("has a error:{}", e.getMessage()); e.printStackTrace(); return result; } finally { if (channel != null) channel.closeChannel(); } }
下面是重写的文件传输的百分比统计:
package com.mysite.stfp; import java.text.DecimalFormat; import java.util.Timer; import java.util.TimerTask; import com.jcraft.jsch.SftpProgressMonitor; public class FileProgressMonitor extends TimerTask implements SftpProgressMonitor { private long progressInterval = 1000; // 默认间隔时间为1秒 private boolean isEnd = false; // 记录传输是否结束 private long transfered; // 记录已传输的数据总大小 private long fileSize; // 记录文件总大小 private Timer timer; // 定时器对象 private boolean isScheduled = false; // 记录是否已启动timer记时器 public FileProgressMonitor(long fileSize) { this.fileSize = fileSize; } @Override public void run() { if (!isEnd()) { // 判断传输是否已结束 //System.out.println("Transfering is in progress."); long transfered = getTransfered(); if (transfered != fileSize) { // 判断当前已传输数据大小是否等于文件总大小 //System.out.println("Current transfered: " + transfered + " bytes"); sendProgressMessage(transfered); } else { System.out.println("File transfering is done."); setEnd(true); // 如果当前已传输数据大小等于文件总大小,说明已完成,设置end } } else { //System.out.println("Transfering done. Cancel timer."); stop(); // 如果传输结束,停止timer记时器 return; } } public void stop() { //System.out.println("Try to stop progress monitor."); if (timer != null) { timer.cancel(); timer.purge(); timer = null; isScheduled = false; } System.out.println("Progress monitor stoped."); } public void start() { //System.out.println("Try to start progress monitor."); if (timer == null) { timer = new Timer(); } timer.schedule(this, 1000, progressInterval); isScheduled = true; System.out.println("Progress monitor started."); } /** * 打印progress信息 * * @param transfered */ private void sendProgressMessage(long transfered) { if (fileSize != 0) { double d = ((double) transfered * 100) / (double) fileSize; DecimalFormat df = new DecimalFormat("#.##"); System.out.println("Sending progress message: " + df.format(d) + "%"); } else { System.out.println("Sending progress message: " + transfered); } } /** * 实现了SftpProgressMonitor接口的count方法 */ public boolean count(long count) { if (isEnd()) return false; if (!isScheduled) { start(); } add(count); return true; } /** * 实现了SftpProgressMonitor接口的end方法 */ public void end() { setEnd(true); System.out.println("transfering end."); } private synchronized void add(long count) { transfered = transfered + count; } private synchronized long getTransfered() { return transfered; } public synchronized void setTransfered(long transfered) { this.transfered = transfered; } private synchronized void setEnd(boolean isEnd) { this.isEnd = isEnd; } private synchronized boolean isEnd() { return isEnd; } public void init(int op, String src, String dest, long max) { // Not used for putting InputStream } }
------------------------------------------------------------------------------------------------
以上是SFTP协议的一些方法,下面介绍smb协议操作,依赖的包是jcifs-1.3.18.jar:
package com.mysite.smb; import java.io.BufferedInputStream; import java.io.BufferedOutputStream; import java.io.File; import java.io.FileInputStream; import java.io.FileOutputStream; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; import jcifs.smb.SmbFile; import jcifs.smb.SmbFileInputStream; import jcifs.smb.SmbFileOutputStream; /** * * @author T430 * */ public class SmbUtil { /** * 方法一: * * @param remoteUrl * 远程路径 smb://192.168.75.204/test/新建 文本文档.txt * @throws IOException */ public static void smbGet(String remoteUrl) throws IOException { SmbFileInputStream in = null; try { SmbFile smbFile = new SmbFile(remoteUrl); int length = smbFile.getContentLength();// 得到文件的大小 byte buffer[] = new byte[length]; in = new SmbFileInputStream(smbFile); // 建立smb文件输入流 while ((in.read(buffer)) != -1) { System.out.write(buffer); System.out.println(buffer.length); } } catch (IOException e) { throw new IOException(e); } finally { try { if (in != null) in.close(); } catch (IOException e) { e.printStackTrace(); } } } // 从共享目录下载文件 /** * 方法二: 路径格式:smb://192.168.75.204/test/新建 文本文档.txt * smb://username:password@192.168.0.77/test * * @param remoteUrl * 远程路径 * @param localDir * 要写入的本地路径 * @throws Exception */ public static void smbGet(String remoteUrl, String localDir) throws IOException { InputStream in = null; OutputStream out = null; try { SmbFile remoteFile = new SmbFile(remoteUrl); String fileName = remoteFile.getName(); File localFile = new File(localDir + File.separator + fileName); in = new BufferedInputStream(new SmbFileInputStream(remoteFile)); out = new BufferedOutputStream(new FileOutputStream(localFile)); byte[] buffer = new byte[1024]; while (in.read(buffer) != -1) { out.write(buffer); buffer = new byte[1024]; } } catch (IOException e) { throw new IOException(e); } finally { try { if (out != null) out.close(); if (in != null) in.close(); } catch (IOException e) { e.printStackTrace(); } } } /** * 向共享目录上传文件 远程url smb://192.168.1.77/test 如果需要用户名密码就这样: * smb://username:password@192.168.1.77/test * * @param remoteUrl * @param localFilePath * @throws Exception */ public static void smbPut(String remoteUrl, String localFilePath) throws IOException { InputStream in = null; OutputStream out = null; try { File localFile = new File(localFilePath); String fileName = localFile.getName(); SmbFile remoteFile = new SmbFile(remoteUrl + "/" + fileName); in = new BufferedInputStream(new FileInputStream(localFile)); out = new BufferedOutputStream(new SmbFileOutputStream(remoteFile)); byte[] buffer = new byte[1024]; while (in.read(buffer) != -1) { out.write(buffer); buffer = new byte[1024]; } } catch (IOException e) { throw new IOException(e); } finally { try { if (out != null) out.close(); if (in != null) in.close(); } catch (IOException e) { e.printStackTrace(); } } } }
FTP的操作,依赖包commons-net-3.3.jar:
package com.mysite.ftp; import java.io.File; import java.io.FileInputStream; import java.io.FileOutputStream; import java.io.IOException; import java.util.Map; import org.apache.commons.io.IOUtils; import org.apache.commons.net.ftp.FTPClient; import org.apache.commons.net.ftp.FTPReply; public class FtpUtil { private static FTPClient ftp; /** * FTP上传 * * @param map * @param src */ public static void upload(Map<String, Object> map, String src) { String host = map.get("host").toString(); String userName = map.get("userName").toString(); String password = map.get("password").toString(); String path = map.get("path") == null ? "" : map.get("path").toString(); String port = map.get("port") == null ? "21" : map.get("port").toString(); upload(host, Integer.parseInt(port), userName, password, path, src); } /** * FTP上传 * * @param host * @param userName * @param password * @param path * @param src */ public static void upload(String host, int port, String userName, String password, String path, String src) { try { boolean isConnect = connect(path, host, port, userName, password); if(isConnect){ File file = new File(src); upload(file); } } catch (IOException e) { e.printStackTrace(); throw new RuntimeException("FTP客户端出错!", e); } finally { try { ftp.disconnect(); } catch (IOException e) { e.printStackTrace(); throw new RuntimeException("关闭FTP连接发生异常!", e); } } } /** * * @param path * 上传到ftp服务器哪个路径下 * @param addr * 地址 * @param port * 端口号 * @param username * 用户名 * @param password * 密码 * @return * @throws Exception */ private static boolean connect(String path, String addr, int port, String username, String password) throws IOException { boolean result = false; ftp = new FTPClient(); int reply; ftp.connect(addr, port); ftp.login(username, password); ftp.setFileType(FTPClient.BINARY_FILE_TYPE); reply = ftp.getReplyCode(); if (!FTPReply.isPositiveCompletion(reply)) { ftp.disconnect(); return result; } ftp.changeWorkingDirectory(path); ftp.setBufferSize(1024); ftp.setControlEncoding("GBK"); result = true; return result; } /** * * @param file * 上传的文件或文件夹 * @throws Exception */ private static void upload(File file) throws IOException { if (file.isDirectory()) { ftp.makeDirectory(file.getName()); ftp.changeWorkingDirectory(file.getName()); String[] files = file.list(); for (int i = 0; i < files.length; i++) { File file1 = new File(file.getPath() + "\\" + files[i]); if (file1.isDirectory()) { upload(file1); ftp.changeToParentDirectory(); } else { File file2 = new File(file.getPath() + "\\" + files[i]); FileInputStream input = new FileInputStream(file2); ftp.storeFile(file2.getName(), input); input.close(); } } } else { File file2 = new File(file.getPath()); FileInputStream input = new FileInputStream(file2); ftp.storeFile(file2.getName(), input); input.close(); } } /** * FTP下载 * * @param host * @param userName * @param password * @param path * @param src */ public static void download(String host, String userName, String password, String path, String src) { FTPClient ftpClient = new FTPClient(); FileOutputStream fos = null; try { ftpClient.connect(host); ftpClient.login(userName, password); String remoteFileName = path; fos = new FileOutputStream(src); ftpClient.setBufferSize(1024); // 设置文件类型(二进制) ftpClient.setFileType(FTPClient.BINARY_FILE_TYPE); ftpClient.retrieveFile(remoteFileName, fos); } catch (IOException e) { e.printStackTrace(); throw new RuntimeException("FTP客户端出错!", e); } finally { IOUtils.closeQuietly(fos); try { ftpClient.disconnect(); } catch (IOException e) { e.printStackTrace(); throw new RuntimeException("关闭FTP连接发生异常!", e); } } } }
Telnet协议,依赖包commons-net-3.3.jar:
package com.mysite.telnet; import java.io.BufferedReader; import java.io.IOException; import java.io.InputStream; import java.io.InputStreamReader; import java.io.PrintStream; import java.net.SocketException; import org.apache.commons.net.telnet.TelnetClient; public class TelnetUtil { private static InputStream in; private static PrintStream out; private static String prompt = "#"; private static String os = "windows"; public static void handle(String host, int port, String os, String user, String pwd) { TelnetClient telnet = null; try { if ("windows".equals(os)) { telnet = new TelnetClient("VT220"); prompt = ""; } else { TelnetUtil.os = os; telnet = new TelnetClient(); prompt = user.equals("root") ? "#" : "$"; } telnet.connect(host, port); telnet.setSoTimeout(60000); in = telnet.getInputStream(); out = new PrintStream(telnet.getOutputStream()); login(user, pwd); //sendCommand("cd /d \"D:/ftp/flume/bin\""); //sendCommand("start \"\" \"D:/ftp/flume/bin/startCWAgen.bat\""); sendCommand("java -jar \"D:/ftp/flume/lib/controller.jar\" \"D:/ftp/flume/lib/config.properties\""); readUntil(); } catch (SocketException e) { e.printStackTrace(); } catch (IOException e) { e.printStackTrace(); } finally { try { if (telnet != null) telnet.disconnect(); } catch (IOException e) { e.printStackTrace(); try { throw new Exception("telnet 关闭失败"); } catch (Exception e1) { e1.printStackTrace(); } } } } public static void handle(String host, String os, String user, String pwd) { int port = 23; handle(host, port, os, user, pwd); } private static void login(String user, String password) { readUntil("login:"); write(user); if ("windows".equals(os)) { readUntil("password:"); } else { readUntil("Password:"); } write(password); readUntil(prompt + " "); } private static void su(String password) { try { write("su"); readUntil("Password: "); write(password); prompt = "#"; readUntil(prompt + " "); } catch (Exception e) { e.printStackTrace(); } } private static void write(String value) { try { out.println(value); out.flush(); System.out.println(value); } catch (Exception e) { e.printStackTrace(); } } public static String sendCommand(String command) { try { write(command); return readUntil(prompt + " "); } catch (Exception e) { e.printStackTrace(); } return null; } private static String readUntil() { InputStreamReader isr = null; BufferedReader br = null; try { //StringBuffer sb = new StringBuffer(); isr = new InputStreamReader(in, "GBK"); br = new BufferedReader(isr); int str = 0; while ((str = br.read()) != -1) { //sb.append((char) str); System.out.print((char) str); } } catch (Exception e) { e.printStackTrace(); } finally { try { if (isr != null) isr.close(); if (br != null) br.close(); } catch (Exception e) { e.printStackTrace(); } } return null; } private static String readUntil(String pattern) { StringBuffer sb = new StringBuffer(); try { char lastChar = pattern.charAt(pattern.length() - 1); char ch = (char) in.read(); while (true) { sb.append(ch); if (ch == lastChar) { if (sb.toString().endsWith(pattern)) { byte[] temp = sb.toString().getBytes("iso8859-1");// 处理编码,界面显示乱码问题 return new String(temp, "GBK"); } } ch = (char) in.read(); } } catch (Exception e) { e.printStackTrace(); } return sb.toString(); } public static void main(String[] args) { TelnetUtil.handle("127.0.0.1", "windows", "bob", "123"); } }