开发者社区> 科技小先锋> 正文

Netty实践(四):心跳检测实现

简介:
+关注继续查看

心跳检测的概念

在分布式架构中,比如Hadoop集群,Storm集群等,或多或少都涉及到Master/Slave的概念,往往是一个或者多个Master和N个Slave之间进行通信。那么通常Master应该需要知道Slave的状态,Slave会定时的向Master进行发送消息,相当于告知Master:“我还活着,我现在在做什么,什么进度,我的CPU/内存情况如何”等,这就是所谓的心跳。Master根据Slave的心跳,进行协调,比如Slave的CPU/内存消耗很大,那么Master可以将任务分配给其他负载小的Slave进行处理;比如Slave一段时间没有发送心跳过来,那么Master可能会将可服务列表中暂时删除该Slave,并可能发出报警,告知运维/开发人员进行处理.如下图所示。

wKiom1iWwz-y9e5cAABXb5p25Bs315.png



Netty实现心跳检测代码实例


心跳信息对象

主要储存Slave的IP,通信PORT,时间,内存,CPU信息等。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
package day4;
 
import java.io.Serializable;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
 
/**
 * Created by zhangfengzhe on 2017/2/4.
 */
public class HeartInfo implements Serializable{
 
    private String ip;
 
    private int port;
 
    private Date lasttime;
 
    private Map<String , String> cpuInfo = new HashMap<String,String>();
 
    private Map<String , String> memInfo = new HashMap<String, String>();
 
    public String getIp() {
        return ip;
    }
 
    public void setIp(String ip) {
        this.ip = ip;
    }
 
    public int getPort() {
        return port;
    }
 
    public void setPort(int port) {
        this.port = port;
    }
 
    public Date getLasttime() {
        return lasttime;
    }
 
    public void setLasttime(Date lasttime) {
        this.lasttime = lasttime;
    }
 
    public Map<String, String> getCpuInfo() {
        return cpuInfo;
    }
 
    public void setCpuInfo(Map<String, String> cpuInfo) {
        this.cpuInfo = cpuInfo;
    }
 
    public Map<String, String> getMemInfo() {
        return memInfo;
    }
 
    public void setMemInfo(Map<String, String> memInfo) {
        this.memInfo = memInfo;
    }
 
    @Override
    public String toString() {
        return "HeartInfo{" +
                "ip='" + ip + '\'' +
                ", port=" + port +
                ", lasttime=" + lasttime +
                ", cpuInfo=" + cpuInfo +
                ", memInfo=" + memInfo +
                '}';
    }
}


JBoss Marshalling编解码处理器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
package day3;
 
import io.netty.handler.codec.marshalling.DefaultMarshallerProvider;
import io.netty.handler.codec.marshalling.DefaultUnmarshallerProvider;
import io.netty.handler.codec.marshalling.MarshallerProvider;
import io.netty.handler.codec.marshalling.MarshallingDecoder;
import io.netty.handler.codec.marshalling.MarshallingEncoder;
import io.netty.handler.codec.marshalling.UnmarshallerProvider;
 
import org.jboss.marshalling.MarshallerFactory;
import org.jboss.marshalling.Marshalling;
import org.jboss.marshalling.MarshallingConfiguration;
 
/**
 * Marshalling工厂
 */
public final class MarshallingCodeCFactory {
 
    /**
     * 创建Jboss Marshalling解码器MarshallingDecoder
     * @return MarshallingDecoder
     */
    public static MarshallingDecoder buildMarshallingDecoder() {
       //首先通过Marshalling工具类的精通方法获取Marshalling实例对象 参数serial标识创建的是java序列化工厂对象。
      final MarshallerFactory marshallerFactory = Marshalling.getProvidedMarshallerFactory("serial");
      //创建了MarshallingConfiguration对象,配置了版本号为5 
      final MarshallingConfiguration configuration = new MarshallingConfiguration();
      configuration.setVersion(5);
      //根据marshallerFactory和configuration创建provider
      UnmarshallerProvider provider = new DefaultUnmarshallerProvider(marshallerFactory, configuration);
      //构建Netty的MarshallingDecoder对象,俩个参数分别为provider和单个消息序列化后的最大长度
      MarshallingDecoder decoder = new MarshallingDecoder(provider, 1024);
      return decoder;
    }
 
    /**
     * 创建Jboss Marshalling编码器MarshallingEncoder
     * @return MarshallingEncoder
     */
    public static MarshallingEncoder buildMarshallingEncoder() {
      final MarshallerFactory marshallerFactory = Marshalling.getProvidedMarshallerFactory("serial");
      final MarshallingConfiguration configuration = new MarshallingConfiguration();
      configuration.setVersion(5);
      MarshallerProvider provider = new DefaultMarshallerProvider(marshallerFactory, configuration);
      //构建Netty的MarshallingEncoder对象,MarshallingEncoder用于实现序列化接口的POJO对象序列化为二进制数组
      MarshallingEncoder encoder = new MarshallingEncoder(provider);
      return encoder;
    }
}


Client

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
package day4;
 
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
 
public class Client {
 
    
   public static void main(String[] args) throws Exception{
       
      EventLoopGroup group = new NioEventLoopGroup();
      Bootstrap b = new Bootstrap();
 
      final int port = 8765;
      final String serverIP = "127.0.0.1";
 
      b.group(group)
       .channel(NioSocketChannel.class)
       .handler(new ChannelInitializer<SocketChannel>() {
         @Override
         protected void initChannel(SocketChannel sc) throws Exception {
            sc.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingDecoder());
            sc.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingEncoder());
            sc.pipeline().addLast(new ClientHandler(port));
         }
      });
       
      ChannelFuture cf = b.connect(serverIP, port).sync();
 
      cf.channel().closeFuture().sync();
      group.shutdownGracefully();
   }
}


Client Handler

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
package day4;
 
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;
import io.netty.util.ReferenceCountUtil;
 
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
 
/**
 * Created by zhangfengzhe on 2017/2/4.
 */
public class ClientHandler extends ChannelHandlerAdapter {
 
    private String ip;
    private int port;
    private ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(1);
    private ScheduledFuture<?> scheduledFuture;
 
    private static final String SUCCESS = "OK";
 
    public ClientHandler(){}
 
    public ClientHandler(int port) {
 
        this.port = port;
 
        //获取本机IP
        try {
            this.ip = InetAddress.getLocalHost().getHostAddress();
        catch (UnknownHostException e) {
            e.printStackTrace();
        }
 
    }
 
    //通道建立初始化时  发送信息 准备握手验证
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
 
        String authInfo = this.ip + ":" this.port;
 
        ctx.writeAndFlush(authInfo);
    }
 
    //当服务器发送认证信息后,开始启动心跳发送
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
 
        if(msg instanceof String){
 
            //认证成功
            if(SUCCESS.equals((String)msg)){
 
                this.scheduledFuture = scheduledExecutorService.scheduleWithFixedDelay(new HeartTask(ctx,ip,port),2,3, TimeUnit.SECONDS);
 
            }else{
 
                System.out.println("服务器发来消息:" + msg);
            }
 
        }
 
        ReferenceCountUtil.release(msg);
 
    }
 
    //如果出现异常 取消定时
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
 
        cause.printStackTrace();
        if(this.scheduledFuture != null){
            this.scheduledFuture.cancel(true);
            this.scheduledFuture = null;
        }
 
    }
}

Client和Server建立通道初始化的时候,Client会向服务器发送信息用于认证。在实际开发中,Client在发送心跳前,需要和Server端进行握手验证,会涉及到加解密,这里为了简单起见,省去了这些过程。从上面的代码也可以看到,如果服务端认证成功,那么Client会开始启动定时线程去执行任务,那么接下来,我们看看这个心跳任务。


心跳任务HeartTask

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
package day4;
 
import io.netty.channel.ChannelHandlerContext;
import org.hyperic.sigar.CpuPerc;
import org.hyperic.sigar.Mem;
import org.hyperic.sigar.Sigar;
 
import java.util.Date;
import java.util.Random;
 
/**
 * Created by zhangfengzhe on 2017/2/4.
 */
public class HeartTask implements  Runnable{
 
    //持有引用,方便读写操作
    private ChannelHandlerContext ctx;
 
    private HeartInfo heartInfo = new HeartInfo();
 
    public HeartTask(ChannelHandlerContext ctx, String ip, int port) {
 
        this.ctx = ctx;
 
        heartInfo.setIp(ip);
        heartInfo.setPort(port);
    }
 
    @Override
    public void run() {
 
        try{
            //利用sigar获取 内存/CPU方面的信息 ; 利用CTX给服务器端发送消息
            Sigar sigar = new Sigar();
 
            //内存使用信息memory
            Mem mem = sigar.getMem();
            heartInfo.getMemInfo().put("total",String.valueOf(mem.getTotal()));
            heartInfo.getMemInfo().put("used",String.valueOf(mem.getUsed()));
            heartInfo.getMemInfo().put("free",String.valueOf(mem.getFree()));
 
 
            //CPU使用信息
            CpuPerc cpuPerc = sigar.getCpuPerc();
            heartInfo.getCpuInfo().put("user",String.valueOf(cpuPerc.getUser()));
            heartInfo.getCpuInfo().put("sys",String.valueOf(cpuPerc.getSys()));
            heartInfo.getCpuInfo().put("wait",String.valueOf(cpuPerc.getWait()));
            heartInfo.getCpuInfo().put("idle",String.valueOf(cpuPerc.getIdle()));
 
            heartInfo.setLasttime(new Date());
 
            ctx.writeAndFlush(heartInfo);
 
        }catch (Exception e){
            e.printStackTrace();
        }
    }
}

首先,为了方便在心跳任务中进行读写操作,HeartTask持有ChannelHandlerContext的引用。其次,为了方便收集系统的内存、CPU信息,这里使用了Sigar,也是在实际中引用非常广泛的一个工具。


Server

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
package day4;
 
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
 
public class Server {
 
   public static void main(String[] args) throws Exception{
       
      EventLoopGroup pGroup = new NioEventLoopGroup();
      EventLoopGroup cGroup = new NioEventLoopGroup();
       
      ServerBootstrap b = new ServerBootstrap();
      b.group(pGroup, cGroup)
       .channel(NioServerSocketChannel.class)
       .option(ChannelOption.SO_BACKLOG, 1024)
       //设置日志
       .handler(new LoggingHandler(LogLevel.INFO))
       .childHandler(new ChannelInitializer<SocketChannel>() {
         protected void initChannel(SocketChannel sc) throws Exception {
            sc.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingDecoder());
            sc.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingEncoder());
            sc.pipeline().addLast(new ServerHandler());
         }
      });
       
      ChannelFuture cf = b.bind(8765).sync();
       
      cf.channel().closeFuture().sync();
      pGroup.shutdownGracefully();
      cGroup.shutdownGracefully();
       
   }
}


Server Handler

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
package day4;
 
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;
 
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
 
/**
 * Created by zhangfengzhe on 2017/2/4.
 */
public class ServerHandler extends ChannelHandlerAdapter {
 
    //KEY: ip:port VALUE: HeartInfo
    private Map<String,HeartInfo> heartInfoMap = new HashMap<String, HeartInfo>();
 
    private static final List<String> authList = new ArrayList<String>();
 
    static {
        //从其他地方加载出来的IP列表
        authList.add("192.168.99.219:8765");
    }
 
 
    //服务器会接收到2种消息 一个是客户端初始化时发送过来的认证信息 第二个是心跳信息
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
 
        if(msg instanceof String){
 
            if(authList.contains(msg)){ //验证通过
                ctx.writeAndFlush("OK");
            }else{
                ctx.writeAndFlush("不在认证列表中...");
            }
 
        }else if(msg instanceof HeartInfo){
 
            System.out.println((HeartInfo)msg);
 
            ctx.writeAndFlush("心跳接收成功!");
 
            HeartInfo heartInfo = (HeartInfo)msg;
            heartInfoMap.put(heartInfo.getIp() + ":" + heartInfo.getPort(),heartInfo);
        }
 
    }
}


运行结果


Client端

wKioL1iWyKajZ8I3AABV5IWr6XU837.png


Server端

wKiom1iWyLPTcWUcAABo90sCbsE792.png


到这里,心跳检测就实现了,就这么简单,你会了么,See U~


本文转自zfz_linux_boy 51CTO博客,原文链接:http://blog.51cto.com/zhangfengzhe/1895031,如需转载请自行联系原作者

版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。

相关文章
Netty(五)之心跳机制与重连
文章目标 1)实现客户端和服务端的心跳 2)心跳多少次没有应答断开处理 3)客户端宕机通知服务端 4)服务端宕机客户端重连
25 0
Netty进阶 -- 非阻塞网络编程 实现群聊+私聊+心跳检测系统
Netty进阶 -- 非阻塞网络编程 实现群聊+私聊+心跳检测系统
82 0
Netty(一) SpringBoot 整合长连接心跳机制(下)
Netty 是一个高性能的 NIO 网络框架,本文基于 SpringBoot 以常见的心跳机制来认识 Netty。
411 0
Netty(一) SpringBoot 整合长连接心跳机制(中)
Netty 是一个高性能的 NIO 网络框架,本文基于 SpringBoot 以常见的心跳机制来认识 Netty。
265 0
Netty(一) SpringBoot 整合长连接心跳机制(上)
Netty 是一个高性能的 NIO 网络框架,本文基于 SpringBoot 以常见的心跳机制来认识 Netty。
564 0
Netty功能实现:实现心跳检测
Netty功能实现:实现心跳检测
164 0
Netty进阶 -- 非阻塞网络编程 实现群聊+私聊+心跳检测系统
通俗易懂带你完成Netty进阶 -- 非阻塞网络编程 实现群聊+私聊+心跳检测系统
150 0
netty案例,netty4.1中级拓展篇八《Netty心跳服务与断线重连》
在我们使用netty中,需要监测服务是否稳定以及在网络异常链接断开时候可以自动重连。需要实现监听;f.addListener(new MyChannelFutureListener()
97 0
Netty 中的心跳机制,还有谁不会?
我们知道在TCP长连接或者WebSocket长连接中一般我们都会使用心跳机制–即发送特殊的数据包来通告对方自己的业务还没有办完,不要关闭链接。
76 0
Netty 如何实现心跳机制与断线重连?
心跳机制 何为心跳 所谓心跳, 即在 TCP 长连接中, 客户端和服务器之间定期发送的一种特殊的数据包, 通知对方自己还在线, 以确保 TCP 连接的有效性.
237 0
+关注
科技小先锋
文章
问答
视频
文章排行榜
最热
最新
相关电子书
更多
低代码开发师(初级)实战教程
立即下载
阿里巴巴DevOps 最佳实践手册
立即下载
冬季实战营第三期:MySQL数据库进阶实战
立即下载