公司要求写一个监控系统用来监控2000台flume服务器的运行情况,还必须对服务器进行一些操作,如:重新启动flume,停止flume,更新flume文件等。这里也走了很多弯路,希望能给其他人前车之鉴。
首先来分析系统需求:一、要不间断的读写flume服务器的运行情况,用JSP展示出来;二、要提供一些对服务器的操作动作(上面有提到)。
初步设想是一个socket服务端,一个socket客户端各两个线程应该能解决。
服务端不停的接收客户端发送的状态信息,并且监听指令输入进而对flume服务器进行相应的操作。
客户端采用心跳模式,发送状态信息,并且监听服务端发来的指令,进而执行相关操作。
后来经过一系列测试,是不行的。原因如下:
服务端实现多线程来不间断的接收客户端消息,对客户端发送重启、停止的指令可以顺利执行。但是当传输文件的时候就有问题了。写完文件的的时候必须关掉输出流,否则客户端无法真正读取文件,处在阻塞状态;但是如果关闭了输出流,socket也相应关闭了,就不能继续接收客户端的状态信息了。后来网上有说使用半关闭:shutdownOutputStream();貌似实现了传输文件及继续监听客户机状态信息的功能,但当你再次发生指令的时候,问题来了,之前socket的outputStream被半关闭,没办法再对客户机发送指令。
想来想去,没有好的解决办法,只能把程序拆分。服务端一分为二,客户端也是一分为二。这样就有两个服务端,两个客户端。
服务机:1、不间断的接收客户端的消息。2、当有指令过来的时候跟客户机建立连接操作完成后断开连接;
客户机:1、不间断的发送本地flume运行情况。2、监听客户端的指令输入,进而操作。
服务机1部分的实现代码:
public class NioServer { private LinkedBlockingQueue<FlumeInfo> queue = new LinkedBlockingQueue<FlumeInfo>(); private List<Socket> list = new ArrayList<Socket>(); private static final int BUFFER_SIZE = 1024;// 缓存大小 private ServerSocketChannel serverChannel; ExecutorService executor = Executors.newCachedThreadPool(); public NioServer(ServletContext servletContext) { try { executor.execute(new Handler(servletContext)); executor.execute(new Task()); } catch (IOException e) { System.err.println(e.getMessage()); //e.printStackTrace(); } } class Handler implements Runnable { private ServletContext servletContext; private Selector selector; private CharsetDecoder decoder = Charset.forName("UTF-8").newDecoder(); // 字节转字符 private ByteBuffer buffer; public Handler(ServletContext servletContext) throws IOException { this.servletContext = servletContext; } public void run() { try { // 获取一个ServerSocket通道 serverChannel = ServerSocketChannel.open(); serverChannel.configureBlocking(false); // 从web.xml中context-param节点获取socket端口 String port = this.servletContext .getInitParameter("socketPort"); serverChannel.socket().bind( new InetSocketAddress(Integer.parseInt(port))); selector = Selector.open();// 获取通道管理器 // 将通道管理器与通道绑定,并为该通道注册SelectionKey.OP_ACCEPT事件, serverChannel.register(selector, SelectionKey.OP_ACCEPT); System.out.println("Waiting for client connect……"); while (true) { selector.select(); for (Iterator<SelectionKey> itr = selector.selectedKeys() .iterator(); itr.hasNext();) { SelectionKey key = itr.next(); listen(key); itr.remove(); } } } catch (IOException e) { if(!serverChannel.isOpen()||!selector.isOpen()||serverChannel.socket().isClosed()){ new NioServer(servletContext); } } } // 监听 private void listen(SelectionKey key) throws IOException { if (key.isAcceptable()) {// 客户端请求连接事件 ServerSocketChannel server = (ServerSocketChannel) key .channel(); SocketChannel channel = server.accept();// 获得客户端连接通道 Socket s=channel.socket(); s.setKeepAlive(true); s.setSoTimeout(1000); list.add(channel.socket()); channel.configureBlocking(false); System.out.println("a new client connected!"); // 在与客户端连接成功后,为客户端通道注册SelectionKey.OP_READ事件。 channel.register(selector, SelectionKey.OP_READ); } else if (key.isReadable()) {// 有可读数据事件 // 获取客户端传输数据可读取消息通道。 SocketChannel channel = (SocketChannel) key.channel(); buffer = ByteBuffer.allocate(BUFFER_SIZE); try { int len = channel.read(buffer); if (len <= 0) { return; }else{ buffer.flip(); DateFormat df = new SimpleDateFormat( "yyyy-MM-dd HH:mm:ss"); String createTime = df.format(new Date()); UUID uuid = UUID.randomUUID(); CharBuffer charBuffer = decoder.decode(buffer); String msg = charBuffer.toString(); String[] strs = msg.split(","); FlumeInfo info = new FlumeInfo(); for (String str : strs) { String[] texts = str.split("="); String s = texts[0]; if ("name".equals(s)) { info.setName(texts[1]); } else if ("ip".equals(s)) { info.setIP(texts[1].replace("/", "")); } else if ("port".equals(s)) { info.setPort(texts[1]); } else if ("status".equals(s)) { String status = texts[1]; if (status.equalsIgnoreCase("true")) { status = "1"; } else { status = "0"; } info.setStatus(status); } else if ("version".equals(s)) { info.setVersion(texts[1]); } } info.setCode("code"); info.setSystemId(uuid.toString()); info.setCreateTime(createTime); queue.put(info); buffer.clear(); } } catch (Exception e) { } } } } class Task implements Runnable { private Connection conn; private PreparedStatement pstmt; public Task() { DBHelper helper = new DBHelper(); conn = helper.getConnection(); } public void run() { try { while (true) { FlumeInfo flume = queue.take(); StringBuffer sb = new StringBuffer("insert into TB1"); sb.append(" (systemId,name,code,ip,port,status,createTime,version) values (?,?,?,?,?,?,?,?)"); pstmt = conn.prepareStatement(sb.toString()); pstmt.setString(1, flume.getSystemId()); pstmt.setString(2, flume.getName()); pstmt.setString(3, flume.getCode()); pstmt.setString(4, flume.getIP()); pstmt.setString(5, flume.getPort()); pstmt.setString(6, flume.getStatus()); pstmt.setString(7, flume.getCreateTime()); pstmt.setString(8, flume.getVersion()); pstmt.executeUpdate(); } } catch (Exception e) { //e.printStackTrace(); System.err.println(e.getMessage()); } finally { try { if (pstmt != null) pstmt.close(); if (conn != null) conn.close(); } catch (SQLException e) { e.printStackTrace(); } } } } // 关闭服务 public void closeServerSocket() { try { if (serverChannel != null && serverChannel.isOpen()) { serverChannel.close(); } } catch (Exception ex) { System.out.println("SocketThread err:" + ex.getMessage()); } } }
服务端2的实现代码:
public class FileSkClient { private static final int BUFFER_SIZE = 1024;// 缓存大小 private static final String IP = "127.0.0.1"; private static final int PORT = 3356; private String filePath = null; private Socket socket = null; private String savePath = null; BufferedReader br = null; PrintWriter out = null; private int sumL = 0; public String handle(String IP, String command, String filePath, String savePath) throws Exception { String result = "FAILED"; try { socket = new Socket(IP, PORT); this.filePath = filePath; this.savePath = savePath; br = new BufferedReader(new InputStreamReader( socket.getInputStream())); out = new PrintWriter(socket.getOutputStream()); if ("0".equals(command)) { out.print("restart"); out.flush(); result = br.readLine(); System.out.println(result); } else if ("1".equals(command)) { out.print("replace"); out.flush(); String isReady = br.readLine(); if ("ready".equals(isReady)) { sumL = 0; boolean b=findFile(socket); if(b)result="SUCCESS"; System.out.println(result); } } else if ("2".equals(command)) { out.print("stop"); out.flush(); result = br.readLine(); System.out.println(result); } } catch (Exception e) { throw new Exception(e); } finally { if(out!=null)out.close(); if(br!=null)br.close(); if(socket!=null)socket.close(); } return result; } // 选择文件 private boolean findFile(Socket s) throws Exception { System.out.println("Begin to zip the file..."); File f = FileUtil.doZip(filePath); if (f == null) { System.err.println("文件打包异常,无法继续"); //throw new FileNotFoundException("文件打包异常,无法继续"); return false; } return sendFile(s, f); } // 发送文件 private boolean sendFile(Socket s, File file) throws Exception { DataOutputStream dos = null; DataInputStream dis = null; try { dos = new DataOutputStream(new BufferedOutputStream( s.getOutputStream())); long l = file.length(); String dir = file.getName(); if (savePath != null && !"".equals(savePath)) { dir = savePath + File.separator + file.getName(); } dos.writeUTF(dir); dos.writeLong(l); dos.flush(); dis = new DataInputStream(new BufferedInputStream( new FileInputStream(file))); byte[] bs = new byte[BUFFER_SIZE]; int length = 0; System.out.println("开始传输文件..."); while ((length = dis.read(bs)) > 0) { sumL += length; dos.write(bs, 0, length); dos.flush(); } dos.close(); dis.close(); if (sumL == l) { System.out.println("文件传输完成"); return true; } } catch (Exception e) { throw new Exception("发送文件失败:" + e.getMessage()); } finally { try { if (dos != null) dos.close(); if (dis != null) dis.close(); if (s != null) s.close(); } catch (IOException e) { e.printStackTrace(); } } return false; } public static void main(String[] args) { System.out.println("请输入命令(0--重启、1--覆盖重启、2--停止):"); Scanner sc = new Scanner(System.in); String str = sc.nextLine(); String filePath = ""; String savePath = ""; try { new FileSkClient().handle(IP, str, filePath, savePath); } catch (Exception e) { e.printStackTrace(); } } }
客户机1实现代码:
public class MonitorClient { private static final Logger logger = LoggerFactory .getLogger(MonitorClient.class); private Runtime rt = Runtime.getRuntime(); private Socket s = null; private PrintWriter w = null; private Timer timer = new Timer(); private static int i = 0; private String hostName = InetAddress.getLocalHost().getCanonicalHostName();// 主机名 private static final String configFile = "config.properties";//版本号存放文件 private Properties pro = new Properties();// 加载属性文件,读取数据库连接配置信息 private static final int TimeOut=60000;//发送数据间隔时间 public static void main(String[] args) throws Exception { new MonitorClient(); } public MonitorClient() throws Exception { try { pro.load(MonitorClient.class.getResourceAsStream(configFile)); String IP=pro.getProperty("server.ip"); String port=pro.getProperty("server.port"); pro.clear(); s = new Socket(IP, Integer.parseInt(port)); s.setKeepAlive(true); System.out.println("connect successed!"); w = new PrintWriter(s.getOutputStream()); timer.schedule(new Processor(), 1000, TimeOut);// 在*秒后执行此任务,每次间隔*秒. } catch (Exception e) { System.err.println("Run error:" + e.getMessage()); if (i < 50) { new MonitorClient();// 重试 i++; }else{ if(w!=null)w.close(); destroyedTimer(); if(s!=null)s.close(); } } } public void destroyedTimer() { if (timer != null) { timer.cancel(); } } // 处理类 class Processor extends TimerTask { public void run() { try { boolean isAlive = isAlive(); String version = getVersion(); StringBuffer sb = new StringBuffer(); sb.append("name="+hostName + ","); sb.append("ip="+s.getLocalAddress() + ","); sb.append("port="+s.getLocalPort() + ","); sb.append("status="+isAlive + ","); sb.append("version="+version); w.println(sb.toString()); w.flush(); } catch (Exception e) { e.printStackTrace(); logger.info("Run error!" + e.getMessage()); } } } // 判断是否是windows操作系统 private boolean isWin() { String os = System.getProperty("os.name").toLowerCase(); if (os.startsWith("win")) {// 如果是Windows操作系统 return true; } return false; } /** * 判断是否运行 * * @param rt * @return * @throws IOException */ private boolean isAlive() { boolean isAlive = false; try { if (isWin()) { String program = "CWAgen.exe"; String command = "TASKLIST.EXE /FI \"IMAGENAME EQ CWAGEN.EXE\" /FO CSV /NH"; Process p = rt.exec(command); BufferedReader read = new BufferedReader(new InputStreamReader( p.getInputStream())); String line = ""; while ((line = read.readLine()) != null) { if (line.indexOf(program) != -1) { System.out.println("This application is running:" + line); isAlive = true; } else { System.out.println("This application is closed"); } } read.close(); p.destroy(); }else{ String command = "ps -ef | grep \"CWAgen\" | wc -l"; Process p = rt.exec(command); BufferedReader read = new BufferedReader(new InputStreamReader( p.getInputStream())); String line = ""; while ((line = read.readLine()) != null) { if (line.equals("1")) { System.out.println("This application is running:" + line); isAlive = true; } else if (line.equals("0")){ System.out.println("This application is closed"); } } read.close(); p.destroy(); } } catch (Exception e) { e.printStackTrace(); } return isAlive; } /** * 获取版本号 * * @return */ public String getVersion() { try { pro.load(MonitorClient.class.getResourceAsStream(configFile)); return pro.getProperty("flume.version"); } catch (IOException e) { e.printStackTrace(); return null; }finally{ pro.clear(); } } }
客户机2部分实现代码:
public class FileSkServer { ExecutorService exec = Executors.newCachedThreadPool(); private static final String SUCCESS_MSG = "SUCCESS";// 成功标识符 private static final String FAILED_MSG = "FAILED";// 失败标识符 private static final String REPLACE = "replace";// 操作命令 private static final String RESTART = "restart";// 操作命令 private static final String STOP = "stop";// 操作命令 private static final int BUFFER_SIZE = 1024;// 缓存大小 private static final String configFile = "config.properties";// 版本号存放文件 private Properties pro = new Properties();// 加载属性文件,读取数据库连接配置信息 private String PATH = ""; private Runtime rt = Runtime.getRuntime(); private static boolean flag = true; Socket socket = null; public FileSkServer() { ServerSocket server = null; try { pro.load(FileSkServer.class.getResourceAsStream(configFile)); String port = pro.getProperty("fileServer.port"); server = new ServerSocket(Integer.parseInt(port));// 绑定接受数据端口 System.out.println("Waiting for client connect……"); while (flag) { socket = server.accept(); exec.execute(new Handler(socket)); } } catch (IOException e) { //e.printStackTrace(); if(server.isClosed()||socket.isClosed()){ new FileSkServer(); } } finally { } } public static void main(String[] args) { new FileSkServer(); } // 判断是否是windows操作系统 private boolean isWin() throws IOException { boolean flag = false; String os = System.getProperty("os.name").toLowerCase(); pro.load(FileSkServer.class.getResourceAsStream(configFile)); if (os.startsWith("win")) {// 如果是Windows操作系统 flag = true; }else{ PATH = pro.getProperty("flume_home_linux"); } return flag; } // 判断系统架构(32位或64位) private boolean is64bit() throws IOException { boolean flag = false; String os = System.getProperty("os.arch"); pro.load(MonitorClient.class.getResourceAsStream(configFile)); if (os.contains("86") || os.contains("32")) { PATH = pro.getProperty("flume_home_win_32"); } else if (os.contains("64")) { PATH = pro.getProperty("flume_home_win_64"); flag= true; } return flag; } // 停止系统 private boolean stop() { System.out.println("Begin to stop this application..."); try { String command = ""; if (isWin()) { is64bit(); command = "cmd /c " + PATH + "bin\\stopCWAgen.bat start"; } else { command = PATH + "bin/stopCWAgen.sh"; } if (isAlive()) {// 结束TOMCAT进程 Process p= rt.exec(command, null, new File(PATH + "bin\\")); final BufferedReader br2 = new BufferedReader( new InputStreamReader(p.getErrorStream(), "UTF-8")); final BufferedReader br = new BufferedReader( new InputStreamReader(p.getInputStream(), "UTF-8")); Thread t1 = new Thread() { public void run() { String line = null; try { while ((line = br2.readLine()) != null) { System.out.println(line); } } catch (IOException e) { e.printStackTrace(); } } }; Thread t2 = new Thread() { public void run() { String line = null; try { while ((line = br.readLine()) != null) { System.out.println(line); } } catch (IOException e) { e.printStackTrace(); } } }; t1.start(); t2.start(); // if (p.waitFor() != 0) { // if (p.exitValue() == 1) {// 0表示正常结束,1:非正常结束 // System.err.println("Stop Failed!"); // } // } if (isAlive()) kill();// 强制结束进程 } } catch (Exception e) { e.printStackTrace(); return false; } return true; } // 启动系统 private String start() { System.out.println("starting run this application..."); String result = FAILED_MSG; try { if (!isAlive()) { String command = ""; Process p = null; if (isWin()) { if (is64bit()) { command = "cmd /c " + PATH + "bin\\startCWAgen.bat start"; } else { command = "cmd /c " + PATH + "bin\\startCWAgen_32.bat start"; } } else { command = PATH + "bin/startup.sh"; } p = rt.exec(command, null, new File(PATH + "bin\\")); final BufferedReader br2 = new BufferedReader( new InputStreamReader(p.getErrorStream(), "UTF-8")); final BufferedReader br = new BufferedReader( new InputStreamReader(p.getInputStream(), "UTF-8")); Thread t1 = new Thread() { public void run() { String line = null; try { while ((line = br2.readLine()) != null) { System.out.println(line); } } catch (IOException e) { e.printStackTrace(); } } }; Thread t2 = new Thread() { public void run() { String line = null; try { while ((line = br.readLine()) != null) { System.out.println(line); } } catch (IOException e) { e.printStackTrace(); } } }; t1.start(); t2.start(); // if (p.waitFor() != 0) { // if (p.exitValue() == 1) { // System.err.println("Start failed!"); // return FAILED_MSG; // } // } if (isAlive()) { System.out.println("Start successfully!"); return SUCCESS_MSG; } } } catch (Exception e) { e.printStackTrace(); } return result; } /** * 判断是否运行 * * @param rt * @return * @throws IOException */ private boolean isAlive() { boolean isAlive = false; try { if (isWin()) { String program = "CWAgen.exe"; String command = "TASKLIST.EXE /FI \"IMAGENAME EQ CWAGEN.EXE\" /FO CSV /NH"; Process p = rt.exec(command); BufferedReader read = new BufferedReader(new InputStreamReader( p.getInputStream())); String line = ""; while ((line = read.readLine()) != null) { if (line.indexOf(program) != -1) { System.out.println("This application is running:" + line); isAlive = true; } else { System.out.println("This application is closed"); } } read.close(); p.destroy(); } else { String command = "ps -ef | grep \"CWAgen\" | wc -l"; Process p = rt.exec(command); BufferedReader read = new BufferedReader(new InputStreamReader( p.getInputStream())); String line = ""; while ((line = read.readLine()) != null) { if (line.equals("1")) { System.out.println("This application is running:" + line); isAlive = true; } else if (line.equals("0")) { System.out.println("This application is closed"); } } read.close(); p.destroy(); } } catch (Exception e) { e.printStackTrace(); } return isAlive; } /** * 强制结束进程 * * @param rt * @throws IOException */ public boolean kill() { try { System.out.println("Begin to stop this application..."); if (isAlive()) { Process p = null; if (isWin()) { String cmd = "taskkill -f -im CWAgen.exe"; p = rt.exec(cmd); } else { String[] cmd = { "sh", "-c", "ps aux | grep CWAgen" }; p = rt.exec(cmd); InputStreamReader is = new InputStreamReader( p.getInputStream()); BufferedReader read = new BufferedReader(is); String line = null; while ((line = read.readLine()) != null) { if (line.indexOf("org.apache.catalina.startup.Bootstrap start") >= 0) { String tomcatPid = line.split("\\s+")[1]; rt.exec("kill -9 " + tomcatPid); } } is.close(); read.close(); } if (p.waitFor() != 0) { if (p.exitValue() == 1) { System.err.println("Start failed!"); } } p.destroy(); } if (!isAlive()) { System.out.println("Stop successfully!"); return true; } } catch (Exception e) { e.printStackTrace(); } return false; } /** * 命令处理类 * * @author T430 * */ class Handler implements Runnable { Socket s; double sumL = 0; PrintWriter w = null; InputStream in = null; public Handler(Socket s) { this.s = s; } public void run() { DataInputStream dis = null; BufferedOutputStream bout = null; try { String command = null; if (flag) { int len = 0; byte[] b = new byte[BUFFER_SIZE]; in = s.getInputStream(); w = new PrintWriter(s.getOutputStream()); if ((len = in.read(b)) != -1) { command = new String(b, 0, len); System.out.println(s.getLocalPort() + "command: " + command); } } if (REPLACE.equalsIgnoreCase(command)) { flag = false; stop(); System.out.println(PATH); w.println("ready"); w.flush(); dis = new DataInputStream(in); String fileName = dis.readUTF(); long fileLength = dis.readLong(); String dir = PATH + fileName; File file = new File(dir); if (!file.exists()) { file.createNewFile(); } byte[] buf = new byte[BUFFER_SIZE]; int read = 0; System.out.println("Begin to receive file(" + fileName + "),size(" + fileLength + "B)..."); bout = new BufferedOutputStream(new FileOutputStream(file)); while ((read = dis.read(buf)) != -1) { sumL += read; bout.write(buf, 0, read); bout.flush(); } bout.close(); dis.close(); if (fileLength == sumL) { System.out.println("Received over!The save path:" + file.getPath()); w.println("Received over!The save path:" + file.getPath()); w.flush(); boolean isUnZip = unZip(file, file.getParent());// 解压文件 if (isUnZip) { String result = start(); w.println(result); w.flush(); } else { w.println("Failed"); w.flush(); } } flag = true; } else if (RESTART.equalsIgnoreCase(command)) { String result = FAILED_MSG; if (stop()) { result = start(); } w.println(result); w.flush(); } else if (STOP.equalsIgnoreCase(command)) { boolean b = stop(); String result = FAILED_MSG; if (b) result = SUCCESS_MSG; w.println(result); w.flush(); } } catch (Exception e) { e.printStackTrace(); } finally { try { if (w != null) w.close(); if (dis != null) dis.close(); if (bout != null) bout.close(); if (s != null) s.close(); } catch (IOException e) { e.printStackTrace(); } } } /** * 解压一个文件 * * @param zipfilename * 解压的文件 * @param destDir * 解压的目录 */ private boolean unZip(File file, String destDir) { if (destDir == null) destDir = file.getParent(); OutputStream os = null; InputStream is = null; if (!file.isFile() || !file.getName().endsWith(".zip")) { System.out.println("The file cannot unzip"); return false; } else { try { int length; byte b[] = new byte[1024]; destDir = destDir.endsWith("\\") ? destDir : destDir + "\\"; ZipFile zipFile = new ZipFile(file); Enumeration<?> enumeration = zipFile.entries(); ZipEntry zipEntry = null; System.out.println("Begin to unzip the file..."); while (enumeration.hasMoreElements()) { zipEntry = (ZipEntry) enumeration.nextElement(); File loadFile = new File(destDir + zipEntry.getName()); // 判断压缩文件中的某个条目是文件夹还是文件 if (zipEntry.isDirectory()) {// 如果是目录,那么判断该文件是否已存在并且不是一个文件夹,解决空文件夹解压后不存在的问题 if (!loadFile.exists()) { loadFile.mkdirs(); } } else { if (!loadFile.getParentFile().exists()) { loadFile.getParentFile().mkdirs(); } os = new FileOutputStream(loadFile); is = zipFile.getInputStream(zipEntry); while ((length = is.read(b)) > 0) { os.write(b, 0, length); os.flush(); } } } zipFile.close(); is.close(); os.close(); file.delete(); System.out.println("unzip successed"); return true; } catch (Exception e) { e.printStackTrace(); return false; } } } } }
写到这里基本能实现上述需求,接下来是服务端的界面。
用到了Spring MVC,因为对这个不是很了解,所有写的很简单。
总体是采用Spring mvc+mybatis的架构,数据库是DB2,用到了EasyUI 1.4(很强大,以后丰富一下这方面的知识)
整个项目,包括建表语句、客户机实现都提供了下载:http://download.csdn.net/detail/tiantang_1986/8142325