socket之flume监控系统

简介: socket之flume监控系统

公司要求写一个监控系统用来监控2000台flume服务器的运行情况,还必须对服务器进行一些操作,如:重新启动flume,停止flume,更新flume文件等。这里也走了很多弯路,希望能给其他人前车之鉴。

首先来分析系统需求:一、要不间断的读写flume服务器的运行情况,用JSP展示出来;二、要提供一些对服务器的操作动作(上面有提到)。

初步设想是一个socket服务端,一个socket客户端各两个线程应该能解决。

服务端不停的接收客户端发送的状态信息,并且监听指令输入进而对flume服务器进行相应的操作。

客户端采用心跳模式,发送状态信息,并且监听服务端发来的指令,进而执行相关操作。

后来经过一系列测试,是不行的。原因如下:

服务端实现多线程来不间断的接收客户端消息,对客户端发送重启、停止的指令可以顺利执行。但是当传输文件的时候就有问题了。写完文件的的时候必须关掉输出流,否则客户端无法真正读取文件,处在阻塞状态;但是如果关闭了输出流,socket也相应关闭了,就不能继续接收客户端的状态信息了。后来网上有说使用半关闭:shutdownOutputStream();貌似实现了传输文件及继续监听客户机状态信息的功能,但当你再次发生指令的时候,问题来了,之前socket的outputStream被半关闭,没办法再对客户机发送指令。

想来想去,没有好的解决办法,只能把程序拆分。服务端一分为二,客户端也是一分为二。这样就有两个服务端,两个客户端。

服务机:1、不间断的接收客户端的消息。2、当有指令过来的时候跟客户机建立连接操作完成后断开连接;

客户机:1、不间断的发送本地flume运行情况。2、监听客户端的指令输入,进而操作。

服务机1部分的实现代码:

public class NioServer {
  private LinkedBlockingQueue<FlumeInfo> queue = new LinkedBlockingQueue<FlumeInfo>();
  private List<Socket> list = new ArrayList<Socket>();
  private static final int BUFFER_SIZE = 1024;// 缓存大小
  private ServerSocketChannel serverChannel;
  ExecutorService executor = Executors.newCachedThreadPool();
  public NioServer(ServletContext servletContext) {
    try {
      executor.execute(new Handler(servletContext));
      executor.execute(new Task());
    } catch (IOException e) {
      System.err.println(e.getMessage());
      //e.printStackTrace();
    }
  }
  class Handler implements Runnable {
    private ServletContext servletContext;
    private Selector selector;
    private CharsetDecoder decoder = Charset.forName("UTF-8").newDecoder(); // 字节转字符
    private ByteBuffer buffer;
    public Handler(ServletContext servletContext) throws IOException {
      this.servletContext = servletContext;
    }
    public void run() {
      try {
        // 获取一个ServerSocket通道
        serverChannel = ServerSocketChannel.open();
        serverChannel.configureBlocking(false);
        // 从web.xml中context-param节点获取socket端口
        String port = this.servletContext
            .getInitParameter("socketPort");
        serverChannel.socket().bind(
            new InetSocketAddress(Integer.parseInt(port)));
        selector = Selector.open();// 获取通道管理器
        // 将通道管理器与通道绑定,并为该通道注册SelectionKey.OP_ACCEPT事件,
        serverChannel.register(selector, SelectionKey.OP_ACCEPT);
        System.out.println("Waiting for client connect……");
        while (true) {
          selector.select();
          for (Iterator<SelectionKey> itr = selector.selectedKeys()
              .iterator(); itr.hasNext();) {
            SelectionKey key = itr.next();
            listen(key);
            itr.remove();
          }
        }
      } catch (IOException e) {
        if(!serverChannel.isOpen()||!selector.isOpen()||serverChannel.socket().isClosed()){
          new NioServer(servletContext);
        }
      }
    }
    // 监听
    private void listen(SelectionKey key) throws IOException {
      if (key.isAcceptable()) {// 客户端请求连接事件
        ServerSocketChannel server = (ServerSocketChannel) key
            .channel();
        SocketChannel channel = server.accept();// 获得客户端连接通道
        Socket s=channel.socket();
        s.setKeepAlive(true);
        s.setSoTimeout(1000);
        list.add(channel.socket());
        channel.configureBlocking(false);
        System.out.println("a new client connected!");
        // 在与客户端连接成功后,为客户端通道注册SelectionKey.OP_READ事件。
        channel.register(selector, SelectionKey.OP_READ);
      } else if (key.isReadable()) {// 有可读数据事件
        // 获取客户端传输数据可读取消息通道。
        SocketChannel channel = (SocketChannel) key.channel();
        buffer = ByteBuffer.allocate(BUFFER_SIZE);
        try {
          int len = channel.read(buffer);
          if (len <= 0) {
            return;
          }else{
            buffer.flip();
            DateFormat df = new SimpleDateFormat(
                "yyyy-MM-dd HH:mm:ss");
            String createTime = df.format(new Date());
            UUID uuid = UUID.randomUUID();
            CharBuffer charBuffer = decoder.decode(buffer);
            String msg = charBuffer.toString();
            String[] strs = msg.split(",");
            FlumeInfo info = new FlumeInfo();
            for (String str : strs) {
              String[] texts = str.split("=");
              String s = texts[0];
              if ("name".equals(s)) {
                info.setName(texts[1]);
              } else if ("ip".equals(s)) {
                info.setIP(texts[1].replace("/", ""));
              } else if ("port".equals(s)) {
                info.setPort(texts[1]);
              } else if ("status".equals(s)) {
                String status = texts[1];
                if (status.equalsIgnoreCase("true")) {
                  status = "1";
                } else {
                  status = "0";
                }
                info.setStatus(status);
              } else if ("version".equals(s)) {
                info.setVersion(texts[1]);
              }
            }
            info.setCode("code");
            info.setSystemId(uuid.toString());
            info.setCreateTime(createTime);
            queue.put(info);
            buffer.clear();
          }
        } catch (Exception e) {
        }
      }
    }
  }
  class Task implements Runnable {
    private Connection conn;
    private PreparedStatement pstmt;
    public Task() {
      DBHelper helper = new DBHelper();
      conn = helper.getConnection();
    }
    public void run() {
      try {
        while (true) {
          FlumeInfo flume = queue.take();
          StringBuffer sb = new StringBuffer("insert into TB1");
          sb.append(" (systemId,name,code,ip,port,status,createTime,version) values (?,?,?,?,?,?,?,?)");
          pstmt = conn.prepareStatement(sb.toString());
          pstmt.setString(1, flume.getSystemId());
          pstmt.setString(2, flume.getName());
          pstmt.setString(3, flume.getCode());
          pstmt.setString(4, flume.getIP());
          pstmt.setString(5, flume.getPort());
          pstmt.setString(6, flume.getStatus());
          pstmt.setString(7, flume.getCreateTime());
          pstmt.setString(8, flume.getVersion());
          pstmt.executeUpdate();
        }
      } catch (Exception e) {
        //e.printStackTrace();
        System.err.println(e.getMessage());
      } finally {
        try {
          if (pstmt != null)
            pstmt.close();
          if (conn != null)
            conn.close();
        } catch (SQLException e) {
          e.printStackTrace();
        }
      }
    }
  }
  // 关闭服务
  public void closeServerSocket() {
    try {
      if (serverChannel != null && serverChannel.isOpen()) {
        serverChannel.close();
      }
    } catch (Exception ex) {
      System.out.println("SocketThread err:" + ex.getMessage());
    }
  }
}



服务端2的实现代码:

public class FileSkClient {
  private static final int BUFFER_SIZE = 1024;// 缓存大小
  private static final String IP = "127.0.0.1";
  private static final int PORT = 3356;
  private String filePath = null;
  private Socket socket = null;
  private String savePath = null;
  BufferedReader br = null;
  PrintWriter out = null;
  private int sumL = 0;
  public String handle(String IP, String command, String filePath,
      String savePath) throws Exception {
    String result = "FAILED";
    try {
      socket = new Socket(IP, PORT);
      this.filePath = filePath;
      this.savePath = savePath;
      br = new BufferedReader(new InputStreamReader(
          socket.getInputStream()));
      out = new PrintWriter(socket.getOutputStream());
      if ("0".equals(command)) {
        out.print("restart");
        out.flush();
        result = br.readLine();
        System.out.println(result);
      } else if ("1".equals(command)) {
        out.print("replace");
        out.flush();
        String isReady = br.readLine();
        if ("ready".equals(isReady)) {
          sumL = 0;
          boolean b=findFile(socket);
          if(b)result="SUCCESS";
          System.out.println(result);
        }
      } else if ("2".equals(command)) {
        out.print("stop");
        out.flush();
        result = br.readLine();
        System.out.println(result);
      }
    } catch (Exception e) {
      throw new Exception(e);
    } finally {
      if(out!=null)out.close();
      if(br!=null)br.close();
      if(socket!=null)socket.close();
    }
    return result;
  }
  // 选择文件
  private boolean findFile(Socket s) throws Exception {
    System.out.println("Begin to zip the file...");
    File f = FileUtil.doZip(filePath);
    if (f == null) {
      System.err.println("文件打包异常,无法继续");
      //throw new FileNotFoundException("文件打包异常,无法继续");
      return false;
    }
    return sendFile(s, f);
  }
  // 发送文件
  private boolean sendFile(Socket s, File file) throws Exception {
    DataOutputStream dos = null;
    DataInputStream dis = null;
    try {
      dos = new DataOutputStream(new BufferedOutputStream(
          s.getOutputStream()));
      long l = file.length();
      String dir = file.getName();
      if (savePath != null && !"".equals(savePath)) {
        dir = savePath + File.separator + file.getName();
      }
      dos.writeUTF(dir);
      dos.writeLong(l);
      dos.flush();
      dis = new DataInputStream(new BufferedInputStream(
          new FileInputStream(file)));
      byte[] bs = new byte[BUFFER_SIZE];
      int length = 0;
      System.out.println("开始传输文件...");
      while ((length = dis.read(bs)) > 0) {
        sumL += length;
        dos.write(bs, 0, length);
        dos.flush();
      }
      dos.close();
      dis.close();
      if (sumL == l) {
        System.out.println("文件传输完成");
        return true;
      }
    } catch (Exception e) {
      throw new Exception("发送文件失败:" + e.getMessage());
    } finally {
      try {
        if (dos != null)
          dos.close();
        if (dis != null)
          dis.close();
        if (s != null)
          s.close();
      } catch (IOException e) {
        e.printStackTrace();
      }
    }
    return false;
  }
  public static void main(String[] args) {
    System.out.println("请输入命令(0--重启、1--覆盖重启、2--停止):");
    Scanner sc = new Scanner(System.in);
    String str = sc.nextLine();
    String filePath = "";
    String savePath = "";
    try {
      new FileSkClient().handle(IP, str, filePath, savePath);
    } catch (Exception e) {
      e.printStackTrace();
    }
  }
}





客户机1实现代码:

public class MonitorClient {
  private static final Logger logger = LoggerFactory
      .getLogger(MonitorClient.class);
  private Runtime rt = Runtime.getRuntime();
  private Socket s = null;
  private PrintWriter w = null;
  private Timer timer = new Timer();
  private static int i = 0;
  private String hostName = InetAddress.getLocalHost().getCanonicalHostName();// 主机名
  private static final String configFile = "config.properties";//版本号存放文件
  private Properties pro = new Properties();// 加载属性文件,读取数据库连接配置信息
  private static final int TimeOut=60000;//发送数据间隔时间
  public static void main(String[] args) throws Exception {
    new MonitorClient();
  }
  public MonitorClient() throws Exception {
    try {
      pro.load(MonitorClient.class.getResourceAsStream(configFile));
      String IP=pro.getProperty("server.ip");
      String port=pro.getProperty("server.port");
      pro.clear();
      s = new Socket(IP, Integer.parseInt(port));
      s.setKeepAlive(true);
      System.out.println("connect successed!");
      w = new PrintWriter(s.getOutputStream());
      timer.schedule(new Processor(), 1000, TimeOut);// 在*秒后执行此任务,每次间隔*秒.
    } catch (Exception e) {
      System.err.println("Run error:" + e.getMessage());
      if (i < 50) {
        new MonitorClient();// 重试
        i++;
      }else{
        if(w!=null)w.close();
        destroyedTimer();
        if(s!=null)s.close();
      }
    }
  }
  public void destroyedTimer() {
    if (timer != null) {
      timer.cancel();
    }
  }
  // 处理类
  class Processor extends TimerTask {
    public void run() {
      try {
        boolean isAlive = isAlive();
        String version = getVersion();
        StringBuffer sb = new StringBuffer();
        sb.append("name="+hostName + ",");
        sb.append("ip="+s.getLocalAddress() + ",");
        sb.append("port="+s.getLocalPort() + ",");
        sb.append("status="+isAlive + ",");
        sb.append("version="+version);
        w.println(sb.toString());
        w.flush();
      } catch (Exception e) {
        e.printStackTrace();
        logger.info("Run error!" + e.getMessage());
      }
    }
  }
  // 判断是否是windows操作系统
  private boolean isWin() {
    String os = System.getProperty("os.name").toLowerCase();
    if (os.startsWith("win")) {// 如果是Windows操作系统
      return true;
    }
    return false;
  }
  /**
   * 判断是否运行
   * 
   * @param rt
   * @return
   * @throws IOException
   */
  private boolean isAlive() {
    boolean isAlive = false;
    try {
      if (isWin()) {
        String program = "CWAgen.exe";
        String command = "TASKLIST.EXE /FI \"IMAGENAME EQ CWAGEN.EXE\" /FO CSV /NH";
        Process p = rt.exec(command);
        BufferedReader read = new BufferedReader(new InputStreamReader(
            p.getInputStream()));
        String line = "";
        while ((line = read.readLine()) != null) {
          if (line.indexOf(program) != -1) {
            System.out.println("This application is running:" + line);
            isAlive = true;
          } else {
            System.out.println("This application is closed");
          }
        }
        read.close();
        p.destroy();
      }else{
        String command = "ps -ef | grep \"CWAgen\" | wc -l";
        Process p = rt.exec(command);
        BufferedReader read = new BufferedReader(new InputStreamReader(
            p.getInputStream()));
        String line = "";
        while ((line = read.readLine()) != null) {
          if (line.equals("1")) {
            System.out.println("This application is running:" + line);
            isAlive = true;
          } else if (line.equals("0")){
            System.out.println("This application is closed");
          }
        }
        read.close();
        p.destroy();
        
      }
    } catch (Exception e) {
      e.printStackTrace();
    }
    return isAlive;
  }
  /**
   * 获取版本号
   * 
   * @return
   */
  public String getVersion() {
    try {
      pro.load(MonitorClient.class.getResourceAsStream(configFile));
      return pro.getProperty("flume.version");
    } catch (IOException e) {
      e.printStackTrace();
      return null;
    }finally{
      pro.clear();
    }
  }
  
}

客户机2部分实现代码:

public class FileSkServer {
  ExecutorService exec = Executors.newCachedThreadPool();
  private static final String SUCCESS_MSG = "SUCCESS";// 成功标识符
  private static final String FAILED_MSG = "FAILED";// 失败标识符
  private static final String REPLACE = "replace";// 操作命令
  private static final String RESTART = "restart";// 操作命令
  private static final String STOP = "stop";// 操作命令
  private static final int BUFFER_SIZE = 1024;// 缓存大小
  private static final String configFile = "config.properties";// 版本号存放文件
  private Properties pro = new Properties();// 加载属性文件,读取数据库连接配置信息
  private String PATH = "";
  private Runtime rt = Runtime.getRuntime();
  private static boolean flag = true;
  Socket socket = null;
  public FileSkServer() {
    ServerSocket server = null;
    try {
      pro.load(FileSkServer.class.getResourceAsStream(configFile));
      String port = pro.getProperty("fileServer.port");
      server = new ServerSocket(Integer.parseInt(port));// 绑定接受数据端口
      System.out.println("Waiting for client connect……");
      while (flag) {
        socket = server.accept();
        exec.execute(new Handler(socket));
      }
    } catch (IOException e) {
      //e.printStackTrace();
      if(server.isClosed()||socket.isClosed()){
        new FileSkServer();
      }
    } finally {
    }
  }
  public static void main(String[] args) {
    new FileSkServer();
  }
  // 判断是否是windows操作系统
  private boolean isWin() throws IOException {
    boolean flag = false;
    String os = System.getProperty("os.name").toLowerCase();
    pro.load(FileSkServer.class.getResourceAsStream(configFile));
    if (os.startsWith("win")) {// 如果是Windows操作系统
      flag = true;
    }else{
      PATH = pro.getProperty("flume_home_linux");
    }
    return flag;
  }
  // 判断系统架构(32位或64位)
  private boolean is64bit() throws IOException {
    boolean flag = false;
    String os = System.getProperty("os.arch");
    pro.load(MonitorClient.class.getResourceAsStream(configFile));
    if (os.contains("86") || os.contains("32")) {
      PATH = pro.getProperty("flume_home_win_32");
    } else if (os.contains("64")) {
      PATH = pro.getProperty("flume_home_win_64");
      flag= true;
    }
    return flag;
  }
  // 停止系统
  private boolean stop() {
    System.out.println("Begin to stop this application...");
    try {
      String command = "";
      if (isWin()) {
        is64bit();
        command = "cmd /c " + PATH + "bin\\stopCWAgen.bat start";
      } else {
        command = PATH + "bin/stopCWAgen.sh";
      }
      if (isAlive()) {// 结束TOMCAT进程
        Process p= rt.exec(command, null, new File(PATH + "bin\\"));
        final BufferedReader br2 = new BufferedReader(
            new InputStreamReader(p.getErrorStream(), "UTF-8"));
        final BufferedReader br = new BufferedReader(
            new InputStreamReader(p.getInputStream(), "UTF-8"));
        Thread t1 = new Thread() {
          public void run() {
            String line = null;
            try {
              while ((line = br2.readLine()) != null) {
                System.out.println(line);
              }
            } catch (IOException e) {
              e.printStackTrace();
            }
          }
        };
        Thread t2 = new Thread() {
          public void run() {
            String line = null;
            try {
              while ((line = br.readLine()) != null) {
                System.out.println(line);
              }
            } catch (IOException e) {
              e.printStackTrace();
            }
          }
        };
        t1.start();
        t2.start();
        // if (p.waitFor() != 0) {
        // if (p.exitValue() == 1) {// 0表示正常结束,1:非正常结束
        // System.err.println("Stop Failed!");
        // }
        // }
        if (isAlive())
          kill();// 强制结束进程
      }
    } catch (Exception e) {
      e.printStackTrace();
      return false;
    }
    return true;
  }
  // 启动系统
  private String start() {
    System.out.println("starting run this application...");
    String result = FAILED_MSG;
    try {
      if (!isAlive()) {
        String command = "";
        Process p = null;
        if (isWin()) {
          if (is64bit()) {
            command = "cmd /c " + PATH
                + "bin\\startCWAgen.bat start";
          } else {
            command = "cmd /c " + PATH
                + "bin\\startCWAgen_32.bat start";
          }
        } else {
          command = PATH + "bin/startup.sh";
        }
        p = rt.exec(command, null, new File(PATH + "bin\\"));
        final BufferedReader br2 = new BufferedReader(
            new InputStreamReader(p.getErrorStream(), "UTF-8"));
        final BufferedReader br = new BufferedReader(
            new InputStreamReader(p.getInputStream(), "UTF-8"));
        Thread t1 = new Thread() {
          public void run() {
            String line = null;
            try {
              while ((line = br2.readLine()) != null) {
                System.out.println(line);
              }
            } catch (IOException e) {
              e.printStackTrace();
            }
          }
        };
        Thread t2 = new Thread() {
          public void run() {
            String line = null;
            try {
              while ((line = br.readLine()) != null) {
                System.out.println(line);
              }
            } catch (IOException e) {
              e.printStackTrace();
            }
          }
        };
        t1.start();
        t2.start();
        // if (p.waitFor() != 0) {
        // if (p.exitValue() == 1) {
        // System.err.println("Start failed!");
        // return FAILED_MSG;
        // }
        // }
        if (isAlive()) {
          System.out.println("Start successfully!");
          return SUCCESS_MSG;
        }
      }
    } catch (Exception e) {
      e.printStackTrace();
    }
    return result;
  }
  /**
   * 判断是否运行
   * 
   * @param rt
   * @return
   * @throws IOException
   */
  private boolean isAlive() {
    boolean isAlive = false;
    try {
      if (isWin()) {
        String program = "CWAgen.exe";
        String command = "TASKLIST.EXE /FI \"IMAGENAME EQ CWAGEN.EXE\" /FO CSV /NH";
        Process p = rt.exec(command);
        BufferedReader read = new BufferedReader(new InputStreamReader(
            p.getInputStream()));
        String line = "";
        while ((line = read.readLine()) != null) {
          if (line.indexOf(program) != -1) {
            System.out.println("This application is running:"
                + line);
            isAlive = true;
          } else {
            System.out.println("This application is closed");
          }
        }
        read.close();
        p.destroy();
      } else {
        String command = "ps -ef | grep \"CWAgen\" | wc -l";
        Process p = rt.exec(command);
        BufferedReader read = new BufferedReader(new InputStreamReader(
            p.getInputStream()));
        String line = "";
        while ((line = read.readLine()) != null) {
          if (line.equals("1")) {
            System.out.println("This application is running:"
                + line);
            isAlive = true;
          } else if (line.equals("0")) {
            System.out.println("This application is closed");
          }
        }
        read.close();
        p.destroy();
      }
    } catch (Exception e) {
      e.printStackTrace();
    }
    return isAlive;
  }
  /**
   * 强制结束进程
   * 
   * @param rt
   * @throws IOException
   */
  public boolean kill() {
    try {
      System.out.println("Begin to stop this application...");
      if (isAlive()) {
        Process p = null;
        if (isWin()) {
          String cmd = "taskkill -f -im CWAgen.exe";
          p = rt.exec(cmd);
        } else {
          String[] cmd = { "sh", "-c", "ps aux | grep CWAgen" };
          p = rt.exec(cmd);
          InputStreamReader is = new InputStreamReader(
              p.getInputStream());
          BufferedReader read = new BufferedReader(is);
          String line = null;
          while ((line = read.readLine()) != null) {
            if (line.indexOf("org.apache.catalina.startup.Bootstrap start") >= 0) {
              String tomcatPid = line.split("\\s+")[1];
              rt.exec("kill -9 " + tomcatPid);
            }
          }
          is.close();
          read.close();
        }
        if (p.waitFor() != 0) {
          if (p.exitValue() == 1) {
            System.err.println("Start failed!");
          }
        }
        p.destroy();
      }
      if (!isAlive()) {
        System.out.println("Stop successfully!");
        return true;
      }
    } catch (Exception e) {
      e.printStackTrace();
    }
    return false;
  }
  /**
   * 命令处理类
   * 
   * @author T430
   * 
   */
  class Handler implements Runnable {
    Socket s;
    double sumL = 0;
    PrintWriter w = null;
    InputStream in = null;
    public Handler(Socket s) {
      this.s = s;
    }
    public void run() {
      DataInputStream dis = null;
      BufferedOutputStream bout = null;
      try {
        String command = null;
        if (flag) {
          int len = 0;
          byte[] b = new byte[BUFFER_SIZE];
          in = s.getInputStream();
          w = new PrintWriter(s.getOutputStream());
          if ((len = in.read(b)) != -1) {
            command = new String(b, 0, len);
            System.out.println(s.getLocalPort() + "command: "
                + command);
          }
        }
        if (REPLACE.equalsIgnoreCase(command)) {
          flag = false;
          stop();
          System.out.println(PATH);
          w.println("ready");
          w.flush();
          dis = new DataInputStream(in);
          String fileName = dis.readUTF();
          long fileLength = dis.readLong();
          String dir = PATH + fileName;
          File file = new File(dir);
          if (!file.exists()) {
            file.createNewFile();
          }
          byte[] buf = new byte[BUFFER_SIZE];
          int read = 0;
          System.out.println("Begin to receive file(" + fileName
              + "),size(" + fileLength + "B)...");
          bout = new BufferedOutputStream(new FileOutputStream(file));
          while ((read = dis.read(buf)) != -1) {
            sumL += read;
            bout.write(buf, 0, read);
            bout.flush();
          }
          bout.close();
          dis.close();
          if (fileLength == sumL) {
            System.out.println("Received over!The save path:"
                + file.getPath());
            w.println("Received over!The save path:"
                + file.getPath());
            w.flush();
            boolean isUnZip = unZip(file, file.getParent());// 解压文件
            if (isUnZip) {
              String result = start();
              w.println(result);
              w.flush();
            } else {
              w.println("Failed");
              w.flush();
            }
          }
          flag = true;
        } else if (RESTART.equalsIgnoreCase(command)) {
          String result = FAILED_MSG;
          if (stop()) {
            result = start();
          }
          w.println(result);
          w.flush();
        } else if (STOP.equalsIgnoreCase(command)) {
          boolean b = stop();
          String result = FAILED_MSG;
          if (b)
            result = SUCCESS_MSG;
          w.println(result);
          w.flush();
        }
      } catch (Exception e) {
        e.printStackTrace();
      } finally {
        try {
          if (w != null)
            w.close();
          if (dis != null)
            dis.close();
          if (bout != null)
            bout.close();
          if (s != null)
            s.close();
        } catch (IOException e) {
          e.printStackTrace();
        }
      }
    }
    /**
     * 解压一个文件
     * 
     * @param zipfilename
     *            解压的文件
     * @param destDir
     *            解压的目录
     */
    private boolean unZip(File file, String destDir) {
      if (destDir == null)
        destDir = file.getParent();
      OutputStream os = null;
      InputStream is = null;
      if (!file.isFile() || !file.getName().endsWith(".zip")) {
        System.out.println("The file cannot unzip");
        return false;
      } else {
        try {
          int length;
          byte b[] = new byte[1024];
          destDir = destDir.endsWith("\\") ? destDir : destDir + "\\";
          ZipFile zipFile = new ZipFile(file);
          Enumeration<?> enumeration = zipFile.entries();
          ZipEntry zipEntry = null;
          System.out.println("Begin to unzip the file...");
          while (enumeration.hasMoreElements()) {
            zipEntry = (ZipEntry) enumeration.nextElement();
            File loadFile = new File(destDir + zipEntry.getName());
            // 判断压缩文件中的某个条目是文件夹还是文件
            if (zipEntry.isDirectory()) {// 如果是目录,那么判断该文件是否已存在并且不是一个文件夹,解决空文件夹解压后不存在的问题
              if (!loadFile.exists()) {
                loadFile.mkdirs();
              }
            } else {
              if (!loadFile.getParentFile().exists()) {
                loadFile.getParentFile().mkdirs();
              }
              os = new FileOutputStream(loadFile);
              is = zipFile.getInputStream(zipEntry);
              while ((length = is.read(b)) > 0) {
                os.write(b, 0, length);
                os.flush();
              }
            }
          }
          zipFile.close();
          is.close();
          os.close();
          file.delete();
          System.out.println("unzip successed");
          return true;
        } catch (Exception e) {
          e.printStackTrace();
          return false;
        }
      }
    }
  }
}


写到这里基本能实现上述需求,接下来是服务端的界面。

用到了Spring MVC,因为对这个不是很了解,所有写的很简单。

总体是采用Spring mvc+mybatis的架构,数据库是DB2,用到了EasyUI 1.4(很强大,以后丰富一下这方面的知识)

整个项目,包括建表语句、客户机实现都提供了下载:http://download.csdn.net/detail/tiantang_1986/8142325

相关文章
|
6月前
|
消息中间件 监控 网络协议
Flume系统
Flume是一个分布式、可靠、和高可用的海量日志采集、聚合和传输系统,起源于Cloudera。【2月更文挑战第8天】
72 4
61 Flume采集系统结构图
61 Flume采集系统结构图
36 0
61 Flume采集系统结构图
|
数据采集 缓存 监控
Apache Flume-案例-监控采集文件夹变化 (exec source)|学习笔记
快速学习 Apache Flume-案例-监控采集文件夹变化 (exec source)
Apache Flume-案例-监控采集文件夹变化 (exec source)|学习笔记
|
1月前
|
SQL 分布式计算 监控
Hadoop-20 Flume 采集数据双写至本地+HDFS中 监控目录变化 3个Agent MemoryChannel Source对比
Hadoop-20 Flume 采集数据双写至本地+HDFS中 监控目录变化 3个Agent MemoryChannel Source对比
60 3
|
3月前
|
存储 数据采集 数据处理
【Flume拓扑揭秘】掌握Flume的四大常用结构,构建强大的日志收集系统!
【8月更文挑战第24天】Apache Flume是一个强大的工具,专为大规模日志数据的收集、聚合及传输设计。其核心架构包括源(Source)、通道(Channel)与接收器(Sink)。Flume支持多样化的拓扑结构以适应不同需求,包括单层、扇入(Fan-in)、扇出(Fan-out)及复杂多层拓扑。单层拓扑简单直观,适用于单一数据流场景;扇入结构集中处理多源头数据;扇出结构则实现数据多目的地分发;复杂多层拓扑提供高度灵活性,适合多层次数据处理。通过灵活配置,Flume能够高效构建各种规模的数据收集系统。
71 0
|
6月前
|
存储 监控 Linux
Flume【部署 02】Flume监控工具Ganglia的安装与配置(CentOS 7.5 在线安装系统监控工具Ganglia + 权限问题处理 + Flume接入监控配置 + 图例说明)
【2月更文挑战第17天】Flume【部署 02】Flume监控工具Ganglia的安装与配置(CentOS 7.5 在线安装系统监控工具Ganglia + 权限问题处理 + Flume接入监控配置 + 图例说明)
190 1
Flume【部署 02】Flume监控工具Ganglia的安装与配置(CentOS 7.5 在线安装系统监控工具Ganglia + 权限问题处理 + Flume接入监控配置 + 图例说明)
|
6月前
|
存储 Java 关系型数据库
【Kafka+Flume+Mysql+Spark】实现新闻话题实时统计分析系统(附源码)
【Kafka+Flume+Mysql+Spark】实现新闻话题实时统计分析系统(附源码)
126 1
【Kafka+Flume+Mysql+Spark】实现新闻话题实时统计分析系统(附源码)
|
6月前
|
存储 监控 Linux
Ganglia【部署 01】Flume监控工具Ganglia的安装与配置(CentOS 7.5 在线安装系统监控工具Ganglia + 权限问题处理 + Flume接入监控配置 + 图例说明)
Ganglia【部署 01】Flume监控工具Ganglia的安装与配置(CentOS 7.5 在线安装系统监控工具Ganglia + 权限问题处理 + Flume接入监控配置 + 图例说明)
113 0
|
存储 监控 数据可视化
大数据Flume数据流监控
大数据Flume数据流监控
101 0
|
存储 监控 中间件
【Flume中间件】(3)实时监听文件到HDFS系统
【Flume中间件】(3)实时监听文件到HDFS系统
134 5
【Flume中间件】(3)实时监听文件到HDFS系统