Netty实践(四):心跳检测实现

简介:

心跳检测的概念

在分布式架构中,比如Hadoop集群,Storm集群等,或多或少都涉及到Master/Slave的概念,往往是一个或者多个Master和N个Slave之间进行通信。那么通常Master应该需要知道Slave的状态,Slave会定时的向Master进行发送消息,相当于告知Master:“我还活着,我现在在做什么,什么进度,我的CPU/内存情况如何”等,这就是所谓的心跳。Master根据Slave的心跳,进行协调,比如Slave的CPU/内存消耗很大,那么Master可以将任务分配给其他负载小的Slave进行处理;比如Slave一段时间没有发送心跳过来,那么Master可能会将可服务列表中暂时删除该Slave,并可能发出报警,告知运维/开发人员进行处理.如下图所示。

wKiom1iWwz-y9e5cAABXb5p25Bs315.png



Netty实现心跳检测代码实例


心跳信息对象

主要储存Slave的IP,通信PORT,时间,内存,CPU信息等。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
package  day4;
 
import  java.io.Serializable;
import  java.util.Date;
import  java.util.HashMap;
import  java.util.Map;
 
/**
  * Created by zhangfengzhe on 2017/2/4.
  */
public  class  HeartInfo  implements  Serializable{
 
     private  String ip;
 
     private  int  port;
 
     private  Date lasttime;
 
     private  Map<String , String> cpuInfo =  new  HashMap<String,String>();
 
     private  Map<String , String> memInfo =  new  HashMap<String, String>();
 
     public  String getIp() {
         return  ip;
     }
 
     public  void  setIp(String ip) {
         this .ip = ip;
     }
 
     public  int  getPort() {
         return  port;
     }
 
     public  void  setPort( int  port) {
         this .port = port;
     }
 
     public  Date getLasttime() {
         return  lasttime;
     }
 
     public  void  setLasttime(Date lasttime) {
         this .lasttime = lasttime;
     }
 
     public  Map<String, String> getCpuInfo() {
         return  cpuInfo;
     }
 
     public  void  setCpuInfo(Map<String, String> cpuInfo) {
         this .cpuInfo = cpuInfo;
     }
 
     public  Map<String, String> getMemInfo() {
         return  memInfo;
     }
 
     public  void  setMemInfo(Map<String, String> memInfo) {
         this .memInfo = memInfo;
     }
 
     @Override
     public  String toString() {
         return  "HeartInfo{"  +
                 "ip='"  + ip + '\ ''  +
                 ", port="  + port +
                 ", lasttime="  + lasttime +
                 ", cpuInfo="  + cpuInfo +
                 ", memInfo="  + memInfo +
                 '}' ;
     }
}


JBoss Marshalling编解码处理器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
package  day3;
 
import  io.netty.handler.codec.marshalling.DefaultMarshallerProvider;
import  io.netty.handler.codec.marshalling.DefaultUnmarshallerProvider;
import  io.netty.handler.codec.marshalling.MarshallerProvider;
import  io.netty.handler.codec.marshalling.MarshallingDecoder;
import  io.netty.handler.codec.marshalling.MarshallingEncoder;
import  io.netty.handler.codec.marshalling.UnmarshallerProvider;
 
import  org.jboss.marshalling.MarshallerFactory;
import  org.jboss.marshalling.Marshalling;
import  org.jboss.marshalling.MarshallingConfiguration;
 
/**
  * Marshalling工厂
  */
public  final  class  MarshallingCodeCFactory {
 
     /**
      * 创建Jboss Marshalling解码器MarshallingDecoder
      * @return MarshallingDecoder
      */
     public  static  MarshallingDecoder buildMarshallingDecoder() {
        //首先通过Marshalling工具类的精通方法获取Marshalling实例对象 参数serial标识创建的是java序列化工厂对象。
       final  MarshallerFactory marshallerFactory = Marshalling.getProvidedMarshallerFactory( "serial" );
       //创建了MarshallingConfiguration对象,配置了版本号为5 
       final  MarshallingConfiguration configuration =  new  MarshallingConfiguration();
       configuration.setVersion( 5 );
       //根据marshallerFactory和configuration创建provider
       UnmarshallerProvider provider =  new  DefaultUnmarshallerProvider(marshallerFactory, configuration);
       //构建Netty的MarshallingDecoder对象,俩个参数分别为provider和单个消息序列化后的最大长度
       MarshallingDecoder decoder =  new  MarshallingDecoder(provider,  1024 );
       return  decoder;
     }
 
     /**
      * 创建Jboss Marshalling编码器MarshallingEncoder
      * @return MarshallingEncoder
      */
     public  static  MarshallingEncoder buildMarshallingEncoder() {
       final  MarshallerFactory marshallerFactory = Marshalling.getProvidedMarshallerFactory( "serial" );
       final  MarshallingConfiguration configuration =  new  MarshallingConfiguration();
       configuration.setVersion( 5 );
       MarshallerProvider provider =  new  DefaultMarshallerProvider(marshallerFactory, configuration);
       //构建Netty的MarshallingEncoder对象,MarshallingEncoder用于实现序列化接口的POJO对象序列化为二进制数组
       MarshallingEncoder encoder =  new  MarshallingEncoder(provider);
       return  encoder;
     }
}


Client

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
package  day4;
 
import  io.netty.bootstrap.Bootstrap;
import  io.netty.channel.ChannelFuture;
import  io.netty.channel.ChannelInitializer;
import  io.netty.channel.EventLoopGroup;
import  io.netty.channel.nio.NioEventLoopGroup;
import  io.netty.channel.socket.SocketChannel;
import  io.netty.channel.socket.nio.NioSocketChannel;
 
public  class  Client {
 
    
    public  static  void  main(String[] args)  throws  Exception{
       
       EventLoopGroup group =  new  NioEventLoopGroup();
       Bootstrap b =  new  Bootstrap();
 
       final  int  port =  8765 ;
       final  String serverIP =  "127.0.0.1" ;
 
       b.group(group)
        .channel(NioSocketChannel. class )
        .handler( new  ChannelInitializer<SocketChannel>() {
          @Override
          protected  void  initChannel(SocketChannel sc)  throws  Exception {
             sc.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingDecoder());
             sc.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingEncoder());
             sc.pipeline().addLast( new  ClientHandler(port));
          }
       });
       
       ChannelFuture cf = b.connect(serverIP, port).sync();
 
       cf.channel().closeFuture().sync();
       group.shutdownGracefully();
    }
}


Client Handler

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
package  day4;
 
import  io.netty.channel.ChannelHandlerAdapter;
import  io.netty.channel.ChannelHandlerContext;
import  io.netty.util.ReferenceCountUtil;
 
import  java.net.InetAddress;
import  java.net.UnknownHostException;
import  java.util.concurrent.Executors;
import  java.util.concurrent.ScheduledExecutorService;
import  java.util.concurrent.ScheduledFuture;
import  java.util.concurrent.TimeUnit;
 
/**
  * Created by zhangfengzhe on 2017/2/4.
  */
public  class  ClientHandler  extends  ChannelHandlerAdapter {
 
     private  String ip;
     private  int  port;
     private  ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool( 1 );
     private  ScheduledFuture<?> scheduledFuture;
 
     private  static  final  String SUCCESS =  "OK" ;
 
     public  ClientHandler(){}
 
     public  ClientHandler( int  port) {
 
         this .port = port;
 
         //获取本机IP
         try  {
             this .ip = InetAddress.getLocalHost().getHostAddress();
         catch  (UnknownHostException e) {
             e.printStackTrace();
         }
 
     }
 
     //通道建立初始化时  发送信息 准备握手验证
     @Override
     public  void  channelActive(ChannelHandlerContext ctx)  throws  Exception {
 
         String authInfo =  this .ip +  ":"  this .port;
 
         ctx.writeAndFlush(authInfo);
     }
 
     //当服务器发送认证信息后,开始启动心跳发送
     @Override
     public  void  channelRead(ChannelHandlerContext ctx, Object msg)  throws  Exception {
 
         if (msg  instanceof  String){
 
             //认证成功
             if (SUCCESS.equals((String)msg)){
 
                 this .scheduledFuture = scheduledExecutorService.scheduleWithFixedDelay( new  HeartTask(ctx,ip,port), 2 , 3 , TimeUnit.SECONDS);
 
             } else {
 
                 System.out.println( "服务器发来消息:"  + msg);
             }
 
         }
 
         ReferenceCountUtil.release(msg);
 
     }
 
     //如果出现异常 取消定时
     @Override
     public  void  exceptionCaught(ChannelHandlerContext ctx, Throwable cause)  throws  Exception {
 
         cause.printStackTrace();
         if ( this .scheduledFuture !=  null ){
             this .scheduledFuture.cancel( true );
             this .scheduledFuture =  null ;
         }
 
     }
}

Client和Server建立通道初始化的时候,Client会向服务器发送信息用于认证。在实际开发中,Client在发送心跳前,需要和Server端进行握手验证,会涉及到加解密,这里为了简单起见,省去了这些过程。从上面的代码也可以看到,如果服务端认证成功,那么Client会开始启动定时线程去执行任务,那么接下来,我们看看这个心跳任务。


心跳任务HeartTask

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
package  day4;
 
import  io.netty.channel.ChannelHandlerContext;
import  org.hyperic.sigar.CpuPerc;
import  org.hyperic.sigar.Mem;
import  org.hyperic.sigar.Sigar;
 
import  java.util.Date;
import  java.util.Random;
 
/**
  * Created by zhangfengzhe on 2017/2/4.
  */
public  class  HeartTask  implements   Runnable{
 
     //持有引用,方便读写操作
     private  ChannelHandlerContext ctx;
 
     private  HeartInfo heartInfo =  new  HeartInfo();
 
     public  HeartTask(ChannelHandlerContext ctx, String ip,  int  port) {
 
         this .ctx = ctx;
 
         heartInfo.setIp(ip);
         heartInfo.setPort(port);
     }
 
     @Override
     public  void  run() {
 
         try {
             //利用sigar获取 内存/CPU方面的信息 ; 利用CTX给服务器端发送消息
             Sigar sigar =  new  Sigar();
 
             //内存使用信息memory
             Mem mem = sigar.getMem();
             heartInfo.getMemInfo().put( "total" ,String.valueOf(mem.getTotal()));
             heartInfo.getMemInfo().put( "used" ,String.valueOf(mem.getUsed()));
             heartInfo.getMemInfo().put( "free" ,String.valueOf(mem.getFree()));
 
 
             //CPU使用信息
             CpuPerc cpuPerc = sigar.getCpuPerc();
             heartInfo.getCpuInfo().put( "user" ,String.valueOf(cpuPerc.getUser()));
             heartInfo.getCpuInfo().put( "sys" ,String.valueOf(cpuPerc.getSys()));
             heartInfo.getCpuInfo().put( "wait" ,String.valueOf(cpuPerc.getWait()));
             heartInfo.getCpuInfo().put( "idle" ,String.valueOf(cpuPerc.getIdle()));
 
             heartInfo.setLasttime( new  Date());
 
             ctx.writeAndFlush(heartInfo);
 
         } catch  (Exception e){
             e.printStackTrace();
         }
     }
}

首先,为了方便在心跳任务中进行读写操作,HeartTask持有ChannelHandlerContext的引用。其次,为了方便收集系统的内存、CPU信息,这里使用了Sigar,也是在实际中引用非常广泛的一个工具。


Server

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
package  day4;
 
import  io.netty.bootstrap.ServerBootstrap;
import  io.netty.channel.ChannelFuture;
import  io.netty.channel.ChannelInitializer;
import  io.netty.channel.ChannelOption;
import  io.netty.channel.EventLoopGroup;
import  io.netty.channel.nio.NioEventLoopGroup;
import  io.netty.channel.socket.SocketChannel;
import  io.netty.channel.socket.nio.NioServerSocketChannel;
import  io.netty.handler.logging.LogLevel;
import  io.netty.handler.logging.LoggingHandler;
 
public  class  Server {
 
    public  static  void  main(String[] args)  throws  Exception{
       
       EventLoopGroup pGroup =  new  NioEventLoopGroup();
       EventLoopGroup cGroup =  new  NioEventLoopGroup();
       
       ServerBootstrap b =  new  ServerBootstrap();
       b.group(pGroup, cGroup)
        .channel(NioServerSocketChannel. class )
        .option(ChannelOption.SO_BACKLOG,  1024 )
        //设置日志
        .handler( new  LoggingHandler(LogLevel.INFO))
        .childHandler( new  ChannelInitializer<SocketChannel>() {
          protected  void  initChannel(SocketChannel sc)  throws  Exception {
             sc.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingDecoder());
             sc.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingEncoder());
             sc.pipeline().addLast( new  ServerHandler());
          }
       });
       
       ChannelFuture cf = b.bind( 8765 ).sync();
       
       cf.channel().closeFuture().sync();
       pGroup.shutdownGracefully();
       cGroup.shutdownGracefully();
       
    }
}


Server Handler

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
package  day4;
 
import  io.netty.channel.ChannelHandlerAdapter;
import  io.netty.channel.ChannelHandlerContext;
 
import  java.util.ArrayList;
import  java.util.HashMap;
import  java.util.List;
import  java.util.Map;
 
/**
  * Created by zhangfengzhe on 2017/2/4.
  */
public  class  ServerHandler  extends  ChannelHandlerAdapter {
 
     //KEY: ip:port VALUE: HeartInfo
     private  Map<String,HeartInfo> heartInfoMap =  new  HashMap<String, HeartInfo>();
 
     private  static  final  List<String> authList =  new  ArrayList<String>();
 
     static  {
         //从其他地方加载出来的IP列表
         authList.add( "192.168.99.219:8765" );
     }
 
 
     //服务器会接收到2种消息 一个是客户端初始化时发送过来的认证信息 第二个是心跳信息
     @Override
     public  void  channelRead(ChannelHandlerContext ctx, Object msg)  throws  Exception {
 
         if (msg  instanceof  String){
 
             if (authList.contains(msg)){  //验证通过
                 ctx.writeAndFlush( "OK" );
             } else {
                 ctx.writeAndFlush( "不在认证列表中..." );
             }
 
         } else  if (msg  instanceof  HeartInfo){
 
             System.out.println((HeartInfo)msg);
 
             ctx.writeAndFlush( "心跳接收成功!" );
 
             HeartInfo heartInfo = (HeartInfo)msg;
             heartInfoMap.put(heartInfo.getIp() +  ":"  + heartInfo.getPort(),heartInfo);
         }
 
     }
}


运行结果


Client端

wKioL1iWyKajZ8I3AABV5IWr6XU837.png


Server端

wKiom1iWyLPTcWUcAABo90sCbsE792.png


到这里,心跳检测就实现了,就这么简单,你会了么,See U~


本文转自zfz_linux_boy 51CTO博客,原文链接:http://blog.51cto.com/zhangfengzhe/1895031,如需转载请自行联系原作者

相关文章
|
6天前
|
监控 网络协议 调度
Netty Review - 深入探讨Netty的心跳检测机制:原理、实战、IdleStateHandler源码分析
Netty Review - 深入探讨Netty的心跳检测机制:原理、实战、IdleStateHandler源码分析
139 0
|
6天前
|
网络协议 调度
Netty心跳检测
客户端的心跳检测对于任何长连接的应用来说,都是一个非常基础的功能。要理解心跳的重要性,首先需要从网络连接假死的现象说起。
|
9月前
|
Nacos
Netty自定义消息协议的实现逻辑处理粘包拆包、心跳机制
Netty自定义消息协议的实现逻辑处理粘包拆包、心跳机制
116 0
Netty(五)之心跳机制与重连
文章目标 1)实现客户端和服务端的心跳 2)心跳多少次没有应答断开处理 3)客户端宕机通知服务端 4)服务端宕机客户端重连
123 0
Netty(五)之心跳机制与重连
|
网络协议 API 调度
Netty「源码分析」之 Idle 检测
Netty「源码分析」之 Idle 检测
181 0
|
JSON 前端开发 安全
Netty进阶 -- 非阻塞网络编程 实现群聊+私聊+心跳检测系统
Netty进阶 -- 非阻塞网络编程 实现群聊+私聊+心跳检测系统
151 0
Netty进阶 -- 非阻塞网络编程 实现群聊+私聊+心跳检测系统
|
监控 数据可视化 Java
Netty(一) SpringBoot 整合长连接心跳机制(下)
Netty 是一个高性能的 NIO 网络框架,本文基于 SpringBoot 以常见的心跳机制来认识 Netty。
Netty(一) SpringBoot 整合长连接心跳机制(中)
Netty 是一个高性能的 NIO 网络框架,本文基于 SpringBoot 以常见的心跳机制来认识 Netty。
|
监控 Java
Netty(一) SpringBoot 整合长连接心跳机制(上)
Netty 是一个高性能的 NIO 网络框架,本文基于 SpringBoot 以常见的心跳机制来认识 Netty。
|
8月前
|
监控 Java Linux
由浅入深Netty基础知识NIO网络编程1
由浅入深Netty基础知识NIO网络编程
43 0