使用JSch远程部署flume采集点

简介: 使用JSch远程部署flume采集点

公司有个需求,为了减少运维人员的工作量,需要开发一个远程部署flume的工具。我这里使用比较方便安全的SFTP协议,前提是需要客户端安装有SSH。Unix、linux、aix系统基本默认安装,windows的要自行安装。

其实在windows系统中还可以使用SMB文件共享协议来实现文件传输,还有比较通用的FTP协议、Telnet协议,但这里我们选用比较安全,操作方便的SFTP协议,其他的,我在下面也贴出来。

首先下载依赖jar包:jsch-0.1.52.jar。官网:http://www.jcraft.com/jsch/

然后编写工具类,具体看代码:

package com.mysite.stfp;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.Map;
import java.util.Properties;
import java.util.Vector;
import org.apache.log4j.Logger;
import com.jcraft.jsch.Channel;
import com.jcraft.jsch.ChannelExec;
import com.jcraft.jsch.ChannelSftp;
import com.jcraft.jsch.ChannelSftp.LsEntry;
import com.jcraft.jsch.JSch;
import com.jcraft.jsch.JSchException;
import com.jcraft.jsch.Session;
import com.jcraft.jsch.SftpException;
public class SFTPChannel {
  private static final Logger LOG = Logger.getLogger(SFTPChannel.class.getName());
  private Session session = null;
  private ChannelSftp sftp = null;
  private ChannelExec exec = null;
  public SFTPChannel(String host, int port, String userName, String password, int timeout) {
    try {
      JSch jsch = new JSch(); // 创建JSch对象
      session = jsch.getSession(userName, host, port);// 根据用户名,主机ip,端口获取一个Session对象
      LOG.debug("Session created.");
      if (password != null) {
        session.setPassword(password); // 设置密码
      }
      Properties config = new Properties();
      config.put("StrictHostKeyChecking", "no");
      session.setConfig(config); // 为Session对象设置properties
      session.setTimeout(timeout); // 设置timeout时间
      session.connect(); // 通过Session建立链接
      LOG.debug("Session connected.");
    } catch (JSchException e) {
      e.printStackTrace();
    }
  }
  public SFTPChannel(Map<String, String> map, int timeout) {
    try {
      String host = map.get("host").toString();
      int port = Integer.parseInt(map.get("port") + "");
      String userName = map.get("userName").toString();
      String password = map.get("password").toString();
      JSch jsch = new JSch(); // 创建JSch对象
      session = jsch.getSession(userName, host, port);// 根据用户名,主机ip,端口获取一个Session对象
      LOG.debug("Session created.");
      if (password != null) {
        session.setPassword(password); // 设置密码
      }
      Properties config = new Properties();
      config.put("StrictHostKeyChecking", "no");
      session.setConfig(config); // 为Session对象设置properties
      session.setTimeout(timeout); // 设置timeout时间
      session.connect(); // 通过Session建立链接
      LOG.debug("Session connected.");
    } catch (JSchException e) {
      e.printStackTrace();
    }
  }
  /**
   * 打开SFTP通道
   */
  public ChannelSftp getSftp() {
    LOG.debug("Opening Channel.");
    try {
      Channel channel = session.openChannel("sftp");// 打开SFTP通道
      channel.connect(); // 建立SFTP通道的连接
      sftp = (ChannelSftp) channel;
    } catch (JSchException e) {
      e.printStackTrace();
    }
    return sftp;
  }
  /**
   * 打开exec通道
   */
  public ChannelExec getExec() {
    LOG.debug("Opening Channel.");
    try {
      Channel channel = session.openChannel("exec");// 打开SFTP通道
      channel.setInputStream(null);
      channel.connect(); // 建立SFTP通道的连接
      exec = (ChannelExec) channel;
    } catch (JSchException e) {
      e.printStackTrace();
    }
    return exec;
  }
  public void execCmd(String command) throws JSchException {
    BufferedReader reader = null;
    Channel channel = null;
    try {
      channel = session.openChannel("exec");
      ((ChannelExec) channel).setCommand(command);
      channel.setInputStream(null);
      ((ChannelExec) channel).setErrStream(System.err);
      channel.connect();
      reader = new BufferedReader(new InputStreamReader(channel.getInputStream()));
      String buf = null;
      while ((buf = reader.readLine()) != null) {
        System.out.println(buf);
      }
    } catch (IOException e) {
      e.printStackTrace();
    } catch (JSchException e) {
      e.printStackTrace();
    } finally {
      try {
        reader.close();
      } catch (IOException e) {
        e.printStackTrace();
      }
      channel.disconnect();
    }
  }
  public void pwd() throws SftpException {
    LOG.info(sftp.pwd());
  }
  public void ls() throws SftpException {
    ls(".");
  }
  public void ls(String path) throws SftpException {
    Vector<?> vector = sftp.ls(path);
    for (Object object : vector) {
      if (object instanceof LsEntry) {
        LsEntry entry = LsEntry.class.cast(object);
        LOG.info(entry.getFilename());
      }
    }
  }
  public void closeChannel() {
    if (sftp != null){
      sftp.quit();
      sftp.disconnect();
    }
    if (exec != null)
      exec.disconnect();
    if (session != null) {
      session.disconnect();
    }
  }
  public void closeSftp() {
    if (sftp != null){
      sftp.quit();
      sftp.disconnect();
    }
  }
  public void closeExec() {
    if (exec != null)
      exec.disconnect();
  }
}


然后是调用方法:

@RequestMapping(value = "/sftp.do")
  public String uploadBySftp(HttpServletRequest request, HttpServletResponse response) {
    String result = "handle failed!";
    SFTPChannel channel = null;
    try {
      if (logger.isDebugEnabled()) {
        logger.debug("execute uploadBySftp method...");
      }
      Map<String, String> params = getParas(request);
      String path = params.get("path").toString();
      String os = params.get("os").toString();
      String userName = params.get("userName").toString();
      params.put("userName", decoder(userName));
      String password = params.get("password").toString();
      params.put("password", decoder(password));
      channel = new SFTPChannel(params, 60000);
      ChannelSftp chSftp = channel.getSftp();
      try {
        channel.ls(path);
      } catch (Exception e) {
        chSftp.mkdir(path);
      }
      File file = new File(src);
      long fileSize = file.length();
      chSftp.put(src, path, new FileProgressMonitor(fileSize), ChannelSftp.OVERWRITE);
      String fileName = file.getName();
      if (file.isFile() && (fileName.endsWith(".zip") || fileName.endsWith(".ZIP")))
        channel.execCmd("unzip -o " + path + fileName + " -d " + path);
      else if (file.isFile() && (fileName.endsWith(".tar") || fileName.endsWith(".tar.gz")))
        channel.execCmd("tar -zxvf " + path + fileName);
      chSftp.rm(path + file.getName());
      if (!"windows".equals(os)) {
        channel.execCmd(path + "flume/bin/flume.sh");
      } else {
        channel.execCmd(path + "flume\\bin\\start.bat");
      }
      result = "success";
      return result;
    } catch (SftpException e) {
      logger.error("has a error:{}", e.getMessage());
      e.printStackTrace();
      result = "file create failed";
      return result;
    } catch (JSchException e) {
      logger.error("has a error:{}", e.getMessage());
      e.printStackTrace();
      return result;
    } finally {
      if (channel != null)
        channel.closeChannel();
    }
  }

下面是重写的文件传输的百分比统计:

package com.mysite.stfp;
import java.text.DecimalFormat;
import java.util.Timer;
import java.util.TimerTask;
import com.jcraft.jsch.SftpProgressMonitor;
public class FileProgressMonitor extends TimerTask implements SftpProgressMonitor {
  private long progressInterval = 1000; // 默认间隔时间为1秒
  private boolean isEnd = false; // 记录传输是否结束
  private long transfered; // 记录已传输的数据总大小
  private long fileSize; // 记录文件总大小
  private Timer timer; // 定时器对象
  private boolean isScheduled = false; // 记录是否已启动timer记时器
  public FileProgressMonitor(long fileSize) {
    this.fileSize = fileSize;
  }
  @Override
  public void run() {
    if (!isEnd()) { // 判断传输是否已结束
      //System.out.println("Transfering is in progress.");
      long transfered = getTransfered();
      if (transfered != fileSize) { // 判断当前已传输数据大小是否等于文件总大小
        //System.out.println("Current transfered: " + transfered + " bytes");
        sendProgressMessage(transfered);
      } else {
        System.out.println("File transfering is done.");
        setEnd(true); // 如果当前已传输数据大小等于文件总大小,说明已完成,设置end
      }
    } else {
      //System.out.println("Transfering done. Cancel timer.");
      stop(); // 如果传输结束,停止timer记时器
      return;
    }
  }
  public void stop() {
    //System.out.println("Try to stop progress monitor.");
    if (timer != null) {
      timer.cancel();
      timer.purge();
      timer = null;
      isScheduled = false;
    }
    System.out.println("Progress monitor stoped.");
  }
  public void start() {
    //System.out.println("Try to start progress monitor.");
    if (timer == null) {
      timer = new Timer();
    }
    timer.schedule(this, 1000, progressInterval);
    isScheduled = true;
    System.out.println("Progress monitor started.");
  }
  /**
   * 打印progress信息
   * 
   * @param transfered
   */
  private void sendProgressMessage(long transfered) {
    if (fileSize != 0) {
      double d = ((double) transfered * 100) / (double) fileSize;
      DecimalFormat df = new DecimalFormat("#.##");
      System.out.println("Sending progress message: " + df.format(d) + "%");
    } else {
      System.out.println("Sending progress message: " + transfered);
    }
  }
  /**
   * 实现了SftpProgressMonitor接口的count方法
   */
  public boolean count(long count) {
    if (isEnd())
      return false;
    if (!isScheduled) {
      start();
    }
    add(count);
    return true;
  }
  /**
   * 实现了SftpProgressMonitor接口的end方法
   */
  public void end() {
    setEnd(true);
    System.out.println("transfering end.");
  }
  private synchronized void add(long count) {
    transfered = transfered + count;
  }
  private synchronized long getTransfered() {
    return transfered;
  }
  public synchronized void setTransfered(long transfered) {
    this.transfered = transfered;
  }
  private synchronized void setEnd(boolean isEnd) {
    this.isEnd = isEnd;
  }
  private synchronized boolean isEnd() {
    return isEnd;
  }
  public void init(int op, String src, String dest, long max) {
    // Not used for putting InputStream
  }
}

------------------------------------------------------------------------------------------------

以上是SFTP协议的一些方法,下面介绍smb协议操作,依赖的包是jcifs-1.3.18.jar:

package com.mysite.smb;
import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import jcifs.smb.SmbFile;
import jcifs.smb.SmbFileInputStream;
import jcifs.smb.SmbFileOutputStream;
/**
 * 
 * @author T430
 * 
 */
public class SmbUtil {
  /**
   * 方法一:
   * 
   * @param remoteUrl
   *            远程路径 smb://192.168.75.204/test/新建 文本文档.txt
   * @throws IOException
   */
  public static void smbGet(String remoteUrl) throws IOException {
    SmbFileInputStream in = null;
    try {
      SmbFile smbFile = new SmbFile(remoteUrl);
      int length = smbFile.getContentLength();// 得到文件的大小
      byte buffer[] = new byte[length];
      in = new SmbFileInputStream(smbFile);
      // 建立smb文件输入流
      while ((in.read(buffer)) != -1) {
        System.out.write(buffer);
        System.out.println(buffer.length);
      }
    } catch (IOException e) {
      throw new IOException(e);
    } finally {
      try {
        if (in != null)
          in.close();
      } catch (IOException e) {
        e.printStackTrace();
      }
    }
  }
  // 从共享目录下载文件
  /**
   * 方法二: 路径格式:smb://192.168.75.204/test/新建 文本文档.txt
   * smb://username:password@192.168.0.77/test
   * 
   * @param remoteUrl
   *            远程路径
   * @param localDir
   *            要写入的本地路径
   * @throws Exception
   */
  public static void smbGet(String remoteUrl, String localDir) throws IOException {
    InputStream in = null;
    OutputStream out = null;
    try {
      SmbFile remoteFile = new SmbFile(remoteUrl);
      String fileName = remoteFile.getName();
      File localFile = new File(localDir + File.separator + fileName);
      in = new BufferedInputStream(new SmbFileInputStream(remoteFile));
      out = new BufferedOutputStream(new FileOutputStream(localFile));
      byte[] buffer = new byte[1024];
      while (in.read(buffer) != -1) {
        out.write(buffer);
        buffer = new byte[1024];
      }
    } catch (IOException e) {
      throw new IOException(e);
    } finally {
      try {
        if (out != null)
          out.close();
        if (in != null)
          in.close();
      } catch (IOException e) {
        e.printStackTrace();
      }
    }
  }
  /**
   * 向共享目录上传文件 远程url smb://192.168.1.77/test 如果需要用户名密码就这样:
   * smb://username:password@192.168.1.77/test
   * 
   * @param remoteUrl
   * @param localFilePath
   * @throws Exception
   */
  public static void smbPut(String remoteUrl, String localFilePath) throws IOException {
    InputStream in = null;
    OutputStream out = null;
    try {
      File localFile = new File(localFilePath);
      String fileName = localFile.getName();
      SmbFile remoteFile = new SmbFile(remoteUrl + "/" + fileName);
      in = new BufferedInputStream(new FileInputStream(localFile));
      out = new BufferedOutputStream(new SmbFileOutputStream(remoteFile));
      byte[] buffer = new byte[1024];
      while (in.read(buffer) != -1) {
        out.write(buffer);
        buffer = new byte[1024];
      }
    } catch (IOException e) {
      throw new IOException(e);
    } finally {
      try {
        if (out != null)
          out.close();
        if (in != null)
          in.close();
      } catch (IOException e) {
        e.printStackTrace();
      }
    }
  }
}


FTP的操作,依赖包commons-net-3.3.jar:

package com.mysite.ftp;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.util.Map;
import org.apache.commons.io.IOUtils;
import org.apache.commons.net.ftp.FTPClient;
import org.apache.commons.net.ftp.FTPReply;
public class FtpUtil {
  private static FTPClient ftp;
  /**
   * FTP上传
   * 
   * @param map
   * @param src
   */
  public static void upload(Map<String, Object> map, String src) {
    String host = map.get("host").toString();
    String userName = map.get("userName").toString();
    String password = map.get("password").toString();
    String path = map.get("path") == null ? "" : map.get("path").toString();
    String port = map.get("port") == null ? "21" : map.get("port").toString();
    upload(host, Integer.parseInt(port), userName, password, path, src);
  }
  /**
   * FTP上传
   * 
   * @param host
   * @param userName
   * @param password
   * @param path
   * @param src
   */
  public static void upload(String host, int port, String userName, String password, String path, String src) {
    try {
      boolean isConnect = connect(path, host, port, userName, password);
      if(isConnect){
        File file = new File(src);
        upload(file);
      }
    } catch (IOException e) {
      e.printStackTrace();
      throw new RuntimeException("FTP客户端出错!", e);
    } finally {
      try {
        ftp.disconnect();
      } catch (IOException e) {
        e.printStackTrace();
        throw new RuntimeException("关闭FTP连接发生异常!", e);
      }
    }
  }
  /**
   * 
   * @param path
   *            上传到ftp服务器哪个路径下
   * @param addr
   *            地址
   * @param port
   *            端口号
   * @param username
   *            用户名
   * @param password
   *            密码
   * @return
   * @throws Exception
   */
  private static boolean connect(String path, String addr, int port, String username, String password) throws IOException {
    boolean result = false;
    ftp = new FTPClient();
    int reply;
    ftp.connect(addr, port);
    ftp.login(username, password);
    ftp.setFileType(FTPClient.BINARY_FILE_TYPE);
    reply = ftp.getReplyCode();
    if (!FTPReply.isPositiveCompletion(reply)) {
      ftp.disconnect();
      return result;
    }
    ftp.changeWorkingDirectory(path);
    ftp.setBufferSize(1024);
    ftp.setControlEncoding("GBK");
    result = true;
    return result;
  }
  /**
   * 
   * @param file
   *            上传的文件或文件夹
   * @throws Exception
   */
  private static void upload(File file) throws IOException {
    if (file.isDirectory()) {
      ftp.makeDirectory(file.getName());
      ftp.changeWorkingDirectory(file.getName());
      String[] files = file.list();
      for (int i = 0; i < files.length; i++) {
        File file1 = new File(file.getPath() + "\\" + files[i]);
        if (file1.isDirectory()) {
          upload(file1);
          ftp.changeToParentDirectory();
        } else {
          File file2 = new File(file.getPath() + "\\" + files[i]);
          FileInputStream input = new FileInputStream(file2);
          ftp.storeFile(file2.getName(), input);
          input.close();
        }
      }
    } else {
      File file2 = new File(file.getPath());
      FileInputStream input = new FileInputStream(file2);
      ftp.storeFile(file2.getName(), input);
      input.close();
    }
  }
  /**
   * FTP下载
   * 
   * @param host
   * @param userName
   * @param password
   * @param path
   * @param src
   */
  public static void download(String host, String userName, String password, String path, String src) {
    FTPClient ftpClient = new FTPClient();
    FileOutputStream fos = null;
    try {
      ftpClient.connect(host);
      ftpClient.login(userName, password);
      String remoteFileName = path;
      fos = new FileOutputStream(src);
      ftpClient.setBufferSize(1024);
      // 设置文件类型(二进制)
      ftpClient.setFileType(FTPClient.BINARY_FILE_TYPE);
      ftpClient.retrieveFile(remoteFileName, fos);
    } catch (IOException e) {
      e.printStackTrace();
      throw new RuntimeException("FTP客户端出错!", e);
    } finally {
      IOUtils.closeQuietly(fos);
      try {
        ftpClient.disconnect();
      } catch (IOException e) {
        e.printStackTrace();
        throw new RuntimeException("关闭FTP连接发生异常!", e);
      }
    }
  }
}

Telnet协议,依赖包commons-net-3.3.jar:


package com.mysite.telnet;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.PrintStream;
import java.net.SocketException;
import org.apache.commons.net.telnet.TelnetClient;
public class TelnetUtil {
  private static InputStream in;
  private static PrintStream out;
  private static String prompt = "#";
  private static String os = "windows";
  public static void handle(String host, int port, String os, String user, String pwd) {
    TelnetClient telnet = null;
    try {
      if ("windows".equals(os)) {
        telnet = new TelnetClient("VT220");
        prompt = "";
      } else {
        TelnetUtil.os = os;
        telnet = new TelnetClient();
        prompt = user.equals("root") ? "#" : "$";
      }
      telnet.connect(host, port);
      telnet.setSoTimeout(60000);
      in = telnet.getInputStream();
      out = new PrintStream(telnet.getOutputStream());
      login(user, pwd);
      //sendCommand("cd /d \"D:/ftp/flume/bin\"");
      //sendCommand("start \"\" \"D:/ftp/flume/bin/startCWAgen.bat\"");
      sendCommand("java -jar \"D:/ftp/flume/lib/controller.jar\" \"D:/ftp/flume/lib/config.properties\"");
      readUntil();
    } catch (SocketException e) {
      e.printStackTrace();
    } catch (IOException e) {
      e.printStackTrace();
    } finally {
      try {
        if (telnet != null)
          telnet.disconnect();
      } catch (IOException e) {
        e.printStackTrace();
        try {
          throw new Exception("telnet 关闭失败");
        } catch (Exception e1) {
          e1.printStackTrace();
        }
      }
    }
  }
  public static void handle(String host, String os, String user, String pwd) {
    int port = 23;
    handle(host, port, os, user, pwd);
  }
  private static void login(String user, String password) {
    readUntil("login:");
    write(user);
    if ("windows".equals(os)) {
      readUntil("password:");
    } else {
      readUntil("Password:");
    }
    write(password);
    readUntil(prompt + " ");
  }
  private static void su(String password) {
    try {
      write("su");
      readUntil("Password: ");
      write(password);
      prompt = "#";
      readUntil(prompt + " ");
    } catch (Exception e) {
      e.printStackTrace();
    }
  }
  private static void write(String value) {
    try {
      out.println(value);
      out.flush();
      System.out.println(value);
    } catch (Exception e) {
      e.printStackTrace();
    }
  }
  public static String sendCommand(String command) {
    try {
      write(command);
      return readUntil(prompt + " ");
    } catch (Exception e) {
      e.printStackTrace();
    }
    return null;
  }
  private static String readUntil() {
    InputStreamReader isr = null;
    BufferedReader br = null;
    try {
      //StringBuffer sb = new StringBuffer();
      isr = new InputStreamReader(in, "GBK");
      br = new BufferedReader(isr);
      int str = 0;
      while ((str = br.read()) != -1) {
        //sb.append((char) str);
        System.out.print((char) str);
      }
    } catch (Exception e) {
      e.printStackTrace();
    } finally {
      try {
        if (isr != null)
          isr.close();
        if (br != null)
          br.close();
      } catch (Exception e) {
        e.printStackTrace();
      }
    }
    return null;
  }
  private static String readUntil(String pattern) {
    StringBuffer sb = new StringBuffer();
    try {
      char lastChar = pattern.charAt(pattern.length() - 1);
      char ch = (char) in.read();
      while (true) {
        sb.append(ch);
        if (ch == lastChar) {
          if (sb.toString().endsWith(pattern)) {
            byte[] temp = sb.toString().getBytes("iso8859-1");// 处理编码,界面显示乱码问题
            return new String(temp, "GBK");
          }
        }
        ch = (char) in.read();
      }
    } catch (Exception e) {
      e.printStackTrace();
    }
    return sb.toString();
  }
  public static void main(String[] args) {
    TelnetUtil.handle("127.0.0.1", "windows", "bob", "123");
  }
}



相关文章
61 Flume采集系统结构图
61 Flume采集系统结构图
36 0
61 Flume采集系统结构图
|
存储 监控
63 Flume采集目录到HDFS
63 Flume采集目录到HDFS
75 0
|
数据采集 缓存 监控
Apache Flume-案例-监控采集文件夹变化 (exec source)|学习笔记
快速学习 Apache Flume-案例-监控采集文件夹变化 (exec source)
Apache Flume-案例-监控采集文件夹变化 (exec source)|学习笔记
|
消息中间件 数据采集 SQL
1、电商数仓(用户行为采集平台)数据仓库概念、用户行为日志、业务数据、模拟数据、用户行为数据采集模块、日志采集Flume(一)
1、电商数仓(用户行为采集平台)数据仓库概念、用户行为日志、业务数据、模拟数据、用户行为数据采集模块、日志采集Flume(一)
|
1月前
|
存储 数据采集 分布式计算
Hadoop-17 Flume 介绍与环境配置 实机云服务器测试 分布式日志信息收集 海量数据 实时采集引擎 Source Channel Sink 串行复制负载均衡
Hadoop-17 Flume 介绍与环境配置 实机云服务器测试 分布式日志信息收集 海量数据 实时采集引擎 Source Channel Sink 串行复制负载均衡
44 1
|
6月前
|
数据采集 分布式计算 Java
【数据采集与预处理】流数据采集工具Flume
【数据采集与预处理】流数据采集工具Flume
159 8
|
6月前
|
消息中间件 存储 监控
flume采集的一些特性
flume采集的一些特性
|
监控 Java
64 Flume采集文件到HDFS
64 Flume采集文件到HDFS
58 0
|
消息中间件 数据采集 JSON
1、电商数仓(用户行为采集平台)数据仓库概念、用户行为日志、业务数据、模拟数据、用户行为数据采集模块、日志采集Flume(二)
1、电商数仓(用户行为采集平台)数据仓库概念、用户行为日志、业务数据、模拟数据、用户行为数据采集模块、日志采集Flume(二)