socket监控进程,并对程序执行有关操作。

简介: socket监控进程,并对程序执行有关操作。

之前写了一篇《socket之flume监控系统》,但由于生产环境上的端口限制,so要做一些优化和改善。那这篇就是前作的优化篇吧。

需求是:1、每隔1分钟获取客户端所运行的application的运行状态,这里采用的是通过命令获取该application的进程。客户机有AIX、LINUX、WINDOWS三种平台,AIX跟LINUX大同小异。

2、不定时给客户端发送指令,包括重启、关闭application,更新application的文件。


废话不多说,贴代码。

客户端代码:

public class MonitorClient {
  private static Timer timer = new Timer();
  private static String configFile = "config.properties";// 配置文件
  private static Properties pro = null;// 加载属性文件,读取数据库连接配置信息
  private long timeout = 60000;// 发送数据间隔时间
  private static final String SUCCESS_MSG = "SUCCESS";// 成功标识符
  private static final String FAILED_MSG = "FAILED";// 失败标识符
  private static final String REPLACE = "replace";// 操作命令
  private static final String RESTART = "restart";// 操作命令
  private static final String STOP = "stop";// 操作命令
  private DateFormat df=new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
  public MonitorClient(File file) {
    loadConfig(file);
    String time = pro.getProperty("timeout");
    if (time != null)
      timeout = Long.valueOf(time);
    timer.schedule(new Handler(), 1000, timeout);
  }
  public void loadConfig(File file) {//加载配置文件
    try {
      if(pro==null){
        pro = new Properties();
        FileInputStream fis = new FileInputStream(file);
        pro.load(fis);
        fis.close();
      }
    } catch (FileNotFoundException e) {
      System.err.println("The file [" + file.getName() + "] not found!");
      System.exit(0);
    } catch (IOException e) {
      e.printStackTrace();
      System.exit(0);
    }
  }
  class Handler extends TimerTask {//操作类
    private Socket s;
    private OutputStream out;
    private PrintWriter w;
    
    @Override
    public void run() {
      try {
        String ip = pro.getProperty("server.ip");
        String port = "9099";
        if (pro.getProperty("server.port") != null)
          port = pro.getProperty("server.port");
        s = new Socket(ip, Integer.parseInt(port));
        String connectTime=df.format(new Date());
        System.out.println("-------connect server at "+connectTime+" -------");
        s.setOOBInline(false);
        s.setSoTimeout(60000);
        out = s.getOutputStream();
        w = new PrintWriter(out,true);
        String msg = sendMsg(s);//发送application状态
        w.println(msg);
        w.flush();
        InputStream in = s.getInputStream();
        int _count = 0; 
        while (_count == 0) { 
            _count = in.available(); 
        }
        byte[] _b = new byte[_count]; 
        int _len= in.read(_b);//获取服务端反馈及指令
        String command=new String(_b,0,_len).trim();
        if("ok".equalsIgnoreCase(command)) {//打印反馈结果
          System.out.println("send message success!");
        }else if(REPLACE.equalsIgnoreCase(command)) {//执行更新文件命令
          System.out.println("execute replace...");
          int sumL = 0;
          OperateUtil.stop();<span style="font-family: Arial, Helvetica, sans-serif;">//关闭application</span>
          DataInputStream dis = new DataInputStream(in);
          String name = dis.readUTF();
          long length = dis.readLong();
          String flumePath=OperateUtil.getPath();
          File file = new File(flumePath+name);
          if (!file.exists()) {
            file.createNewFile();
          }
          byte[] buf = new byte[1024];
          int read = 0;
          BufferedOutputStream bout = new BufferedOutputStream(
              new FileOutputStream(file));
          System.out.println("start read file...");
          while(sumL<length){//接收文件
            read = dis.read(buf);
            sumL += read;
            bout.write(buf, 0, read);
            bout.flush();
          }
          bout.close();
          if (length == sumL){
            System.out.println("done! the file size is " + sumL + " kb");
            String fileName=file.getName();
            String filePath=file.getParent();
            System.out.println("file path:"+filePath);
            if(fileName.endsWith(".zip")){//如果是zip文件,需要解压
              FileUtil.unZip(file, filePath);
              if(file.isFile()&&file.exists()){
                boolean flag = file.getAbsoluteFile().delete();
                System.out.println("The file[" + fileName + "]delete success:" + flag);
              }
            }
            String result = OperateUtil.start();//启动application
            w.println(result);
            w.flush();
          }
          dis.close();
        } else if (RESTART.equalsIgnoreCase(command)) {//执行启动命令
          System.out.println("execute restart...");
          String result = FAILED_MSG;
          OperateUtil.stop();
          result = OperateUtil.start();
          w.println(result);
          w.flush();
        } else if (STOP.equalsIgnoreCase(command)) {//执行停止命令
          System.out.println("execute stop...");
          String result = FAILED_MSG;
          if (OperateUtil.stop()) {
            result = SUCCESS_MSG;
          }
          w.println(result);
          w.flush();
        }
        if (in != null)
          in.close();
        if (out != null)
          out.close();
        if (w != null)
          w.close();
        s.close();
        if (s.isClosed()) {
          System.out.println("The connect has disconnected");
        }
      } catch (IOException e) {
        if(e.getMessage().indexOf("Connection refused: connect")!=-1){
          System.err.println("disconnected or server not running!");
        }else{
          e.printStackTrace();
        }
        try {
          s.close();
          timer.cancel();
        } catch (IOException e1) {
          e1.printStackTrace();
        }
        System.exit(0);
      }
    }
    private String sendMsg(Socket s){//发送消息
      StringBuffer sb = new StringBuffer();
      String hostName = "";
      try{
        hostName = InetAddress.getLocalHost().getCanonicalHostName();
      }catch (UnknownHostException e) {
        System.err.println("Get local host failed!"+e.getMessage());
      }
      boolean isAlive = OperateUtil.isAlive();
      String status="0";
      if (isAlive)status = "1";
      String ip=s.getLocalAddress().toString().replaceAll("/", "");
      String version = OperateUtil.getVersion();
      sb.append("name=" + hostName + ",");
      sb.append("ip=" + ip + ",");
      sb.append("port=" + s.getLocalPort() + ",");
      sb.append("status=" + status + ",");
      sb.append("version=" + version);
      
      return sb.toString();
    }
  }
  public static void main(String[] args) {
    String fileName=configFile;
    if(args.length>0)
      fileName=args[0];
    //System.out.println("==========fileName:"+fileName);
    File file=new File(fileName);
    new MonitorClient(file);
  }
}


服务端代码:

/**
 * 接收客户端application状态信息存进表格。先查询是否有机器的记录,如果有使用update,否则使用insert
 * 
 * @author T430
 * 
 */
public class MonitorSocket {
  private LinkedBlockingQueue<FlumeInfo> queue = new LinkedBlockingQueue<FlumeInfo>();
  private ExecutorService handler = Executors.newFixedThreadPool(20);
  private ExecutorService executor = Executors.newCachedThreadPool();
  public static Hashtable<String, String> commandList = new Hashtable<String, String>();
  public static Hashtable<String, String> resultList = new Hashtable<String, String>();
  private static final String SUCCESS_MSG = "SUCCESS";// 成功标识符
  private static final String FAILED_MSG = "FAILED";// 失败标识符
  private ServerSocket server;
  private DateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
  public MonitorSocket(ServletContext servletContext) {
    executor.execute(new Handler(servletContext));
    executor.execute(new Task(queue));
  }
  public static Hashtable<String, String> getCommandList() {
    return commandList;
  }
  public static Hashtable<String, String> getResultList() {
    return resultList;
  }
  public static synchronized void setCommandList(Hashtable<String, String> commandList) {
    MonitorSocket.commandList = commandList;
  }
  class Handler implements Runnable {//操作类
    private ServletContext servletContext;
    public Handler(ServletContext servletContext) {
      this.servletContext = servletContext;
    }
    public void run() {
      Socket s = null;
      DataOutputStream dos = null;
      try {
        String portStr = this.servletContext.getInitParameter("socketPort");
        int port = Integer.parseInt(portStr);
        server = new ServerSocket(port);
        System.out.println("Waiting for client connect……");
        while (true) {
          s = server.accept();
          String connectTime = df.format(new Date());
          String clien = s.getInetAddress().toString().replaceAll("/", "");
          System.out.println("------" + clien + " connect at " + connectTime + " ------");
          s.setSoTimeout(60000);
          InputStream in = s.getInputStream();
          OutputStream out = s.getOutputStream();
          PrintWriter w = new PrintWriter(out, true);
          BufferedReader br = new BufferedReader(new InputStreamReader(in));
          String msg = br.readLine();
          System.out.println("client msg:" + msg);
          if (msg.length() > 0 && !SUCCESS_MSG.equals(msg) && !FAILED_MSG.equals(msg)) {
            putQueue(msg);
          }
          if (!commandList.isEmpty()) {
            String command = commandList.get(clien);
            if (command != null) {
              commandList.remove(clien);
              String[] commands = command.split(";");
              String cmd = commands[1];
              System.out.println("server command:" + cmd);
              if (cmd.equals("0")) {//发送重启指令
                w.println("restart");
                w.flush();
              } else if (cmd.equals("1")) {//发送更新文件指令
                w.println("replace");
                w.flush();
                String fileName = commands[2];
                File file = new File(fileName);
                if (!file.exists()) {
                  System.out.println("file[" + fileName + "]not found");
                  in.close();
                  out.close();
                  w.close();
                  br.close();
                  s.close();
                  return;
                }
                BufferedInputStream bis = new BufferedInputStream(new FileInputStream(file));
                dos = new DataOutputStream(out);
                String name = file.getName();
                long length = file.length();
                String dir = commands[3] + File.separator + name;
                System.out.println("dir=" + dir);
                dos.writeUTF(dir);//发送客户端文件存放地址
                dos.writeLong(length);//文件大小
                dos.flush();
                int len = 0;
                byte[] buf = new byte[1024];
                while ((len = bis.read(buf)) != -1) {//发送文件
                  dos.write(buf, 0, len);
                  dos.flush();
                }
                bis.close();
              } else if (cmd.equals("2")) {//发送关闭指令
                w.println("stop");
                w.flush();
              }
              int count = 0;
              while (count == 0) {
                count = in.available();
              }
              byte[] b = new byte[count];
              int len = in.read(b);//获取客户端反馈结果
              String result = new String(b, 0, len).trim();
              System.out.println("execute result:" + result);
              resultList.put(clien, result);
            }
          }
          w.print("ok");
          w.flush();
          Thread.currentThread().sleep(1000);
          try {
            do {
              //发送一条特殊的信息,客户端设置setOOBInline(false),则会忽略;
              //如果客户端已经close的话,话报错,用来判断客户端是否close
              s.sendUrgentData(0xFF);
            } while (!s.isClosed());
          } catch (IOException e) {
            if (dos != null)
              dos.close();
            br.close();
            out.close();
            w.close();
            s.close();
            if (s.isClosed()) {
              System.out.println("The connect has disconnected");
            }
          }
        }
      } catch (Exception e) {
        e.printStackTrace();
      }
    }
  }
  private void putQueue(String msg) throws Exception {
    String createTime = df.format(new Date());
    String[] strs = msg.split(",");
    FlumeInfo info = new FlumeInfo();
    for (String str : strs) {
      String[] texts = str.split("=");
      String s = texts[0];
      if ("name".equals(s)) {
        info.setName(texts[1]);
      } else if ("ip".equals(s)) {
        info.setIP(texts[1]);
      } else if ("port".equals(s)) {
        info.setPort(texts[1]);
      } else if ("status".equals(s)) {
        String status = texts[1];
        info.setStatus(status);
      } else if ("version".equals(s)) {
        info.setVersion(texts[1]);
      }
    }
    info.setCode("code");
    info.setCreateTime(createTime);
    queue.put(info);
  }
  public void closeServerSocket() {
    try {
      server.close();
      executor.shutdown();
    } catch (IOException e) {
      System.err.println("close server error");
      e.printStackTrace();
    }
  }
}




有的操作类省略了。大家可以到这里去下载完整的代码:

客户端:去下载

服务端:去下载


相关文章
|
7天前
|
缓存 监控 调度
第六十一章 使用 ^PERFSAMPLE 监控进程 - 分析维度
第六十一章 使用 ^PERFSAMPLE 监控进程 - 分析维度
15 0
|
15天前
|
弹性计算 Dubbo Serverless
Serverless 应用引擎操作报错合集之阿里函数计算中,生成图片时进程卡住如何解决
Serverless 应用引擎(SAE)是阿里云提供的Serverless PaaS平台,支持Spring Cloud、Dubbo、HSF等主流微服务框架,简化应用的部署、运维和弹性伸缩。在使用SAE过程中,可能会遇到各种操作报错。以下是一些常见的报错情况及其可能的原因和解决方法。
|
7天前
|
监控
第六十章 使用 ^PERFSAMPLE 监控进程 - 预定义分析示例
第六十章 使用 ^PERFSAMPLE 监控进程 - 预定义分析示例
11 0
|
7天前
|
监控 Go
第五十九章 使用 ^PERFSAMPLE 监控进程 - 收集样本
第五十九章 使用 ^PERFSAMPLE 监控进程 - 收集样本
14 0
|
7天前
|
存储 算法 Linux
【Linux】程序地址空间 -- 详解 & Linux 2.6 内核进程调度队列 -- 了解
【Linux】程序地址空间 -- 详解 & Linux 2.6 内核进程调度队列 -- 了解
|
8天前
|
Linux Shell 程序员
【进程控制】进程程序替换的原理以及exec函数族
【进程控制】进程程序替换的原理以及exec函数族
|
8天前
|
监控 网络协议 iOS开发
程序退到后台的时候,所有线程被挂起,系统回收所有的socket资源问题及解决方案
程序退到后台的时候,所有线程被挂起,系统回收所有的socket资源问题及解决方案
26 0
|
15天前
|
Linux C++
【Linux】详解进程程序替换
【Linux】详解进程程序替换
|
15天前
|
算法 Linux 调度
xenomai内核解析--xenomai与普通linux进程之间通讯XDDP(一)--实时端socket创建流程
xenomai与普通linux进程之间通讯XDDP(一)--实时端socket创建流程
109 1
xenomai内核解析--xenomai与普通linux进程之间通讯XDDP(一)--实时端socket创建流程
|
15天前
|
运维 监控 Ubuntu
Python实现ubuntu系统进程内存监控
Python实现ubuntu系统进程内存监控
18 1