Netty(五)之心跳机制与重连

简介: 文章目标1)实现客户端和服务端的心跳2)心跳多少次没有应答断开处理3)客户端宕机通知服务端4)服务端宕机客户端重连

运行代码下载(亲测有效)


链接:https://pan.baidu.com/s/1YaCQisOfovSCm5xV1XWtlA

提取码:ecc2

复制这段内容后打开百度网盘手机App,操作更方便哦


前提



Netty(一)之helloworld  

的基础之上修改


实现客户端和服务端的心跳


原理


实现心跳有好几种方式,比如客户端给服务端发送心跳,服务端是否回复又是两种;服务端给客户端发送心跳,客户端是否回复又是两种,还有互相发送心跳。


具体哪种方式比较好?www.baidu.com


本文采用是客户端给服务端发送消息,服务端收到心跳请求给服务端相应;服务端多少次没有收到客户端的请求就主动断开连接。


核心是


构造方法
IdleStateHandler( long readerIdleTime, long writerIdleTime, long allIdleTime,  TimeUnit unit) 
实例
new IdleStateHandler(3, 3, 3, TimeUnit.SECONDS)


第一个个参数是读超时时间,第二个是写超时时间,第三个事读写超时时间,第四个参数是时间单位


添加了这个Netty框架提供的IdleStateHandler,就会调用Handler链上的userEventTriggered方法实现自己的心跳逻辑


操作步骤


在client添加两个handler,一个心跳机制IdleStatehandler,一个是心跳机制触发器(基础ChannelHandlerAdapter 并且主要实现userEventTriggered方法)


//心跳机制
socketChannel.pipeline().addLast(new IdleStateHandler(5, 5, 5, TimeUnit.SECONDS));//添加心跳机制
socketChannel.pipeline().addLast(new TimeClientEventTrigger());//添加心跳机制触发器


创建TimeClientEventTrigger,实现userEventTriggered方法


package demo4;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent;
/**
 * @author CBeann
 * @create 2019-09-18 11:45
 */
public class TimeClientEventTrigger extends ChannelHandlerAdapter {
    //读写超时并且添加IdleStateHandler时,会触发这个方法
    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        //获取超时对象,读超时,写超时还是读写超时
        IdleState state = ((IdleStateEvent) evt).state();
        //如果是读写超时,我这里整的简单了
        if (state.equals(IdleState.ALL_IDLE)) {
            System.out.println("客户端发送心跳");
            //给服务端端发送字符为(HeartBeat-req)的心跳请求
            ctx.writeAndFlush(Unpooled.copiedBuffer("HeartBeat-req".getBytes()));
        }
    }
}


修改TimeServerHandler里的channelRead方法


    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        //服务器读客户端发送来的数据
        ByteBuf buf = (ByteBuf) msg;
        byte[] req = new byte[buf.readableBytes()];
        buf.readBytes(req);
        String body = new String(req, "UTF-8");
        //如果是心跳请求
        if ("HeartBeat-req".equals(body)) {
            System.out.println("服务端收到心跳,并且相应");
            //服务器向客户端回应请求,关键字为(HeartBeat-resp)
            ByteBuf response = Unpooled.copiedBuffer("HeartBeat-resp".getBytes());
            ctx.writeAndFlush(response);
        } else {
            System.out.println("The TimeServer receive :" + body);
            //服务器向客户端回应请求
            ByteBuf response = Unpooled.copiedBuffer(new Date().toString().getBytes());
            ctx.writeAndFlush(response);
        }
    }


修改TimeClientHandler里的channelRead方法


    //客户端读取服务器发送的数据
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        try {
            ByteBuf buf = (ByteBuf) msg;
            byte[] req = new byte[buf.readableBytes()];
            buf.readBytes(req);
            String body = new String(req, "UTF-8");
            //如果是服务端回应的心跳
            if ("HeartBeat-resp".equals(body)) {
                System.out.println("服务端回应心跳请求");
            } else {//如果是服务端回应的其它
                System.out.println("Now is:" + body);
            }
        } catch (Exception e) {
        } finally {
            //标配
            ReferenceCountUtil.release(msg);
        }
    }


此时运行的结果为:

客户端发送您好1 ,服务端回应 时间

客户端发送您好2 ,服务端回应 时间

while(1){

客户端发送心跳

服务端接收心跳并且相应

客户端接收心跳相应

}


1.jpg


2.jpg


客户端宕机通知服务端


在Hanlder中实现channelInactive方法就可以实现客户端宕机服务端会及时发现,在这里我在服务端的TimeServerHandler中实现的这个方法


 @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        System.out.println("客户端断开链接"+ctx.channel().localAddress().toString());
    }


这个方法客户端暴力关闭会触发服务端的方法;服务端暴力关闭会触发客户端的这个方法。


运行结果:


开启服务端和客户端一起运行,当手动关闭客户端时候,服务端端打印如左下图的最偶一行


1.jpg


2.jpg


心跳多少次没有应答断开处理


如果心跳多少次没有应答,则服务端主动和客户端断开连接


需求为:


客户端的超时为IdleStateHandler(5, 5, 5, TimeUnit.SECONDS),超时给服务端发送心跳消息,

服务端也要添加一个IdleStateHandler,超时记录此数,并且多次超时后断开连接

给TimServer添加

 socketChannel.pipeline().addLast(new IdleStateHandler(1, 1, 1, TimeUnit.SECONDS));//添加心跳机制
 socketChannel.pipeline().addLast(new TimeServerEventTrigger());//添加心跳机制监听器


新建TimeServerEventTrigger


属性为当前超时次数(timeoutNum),运行最大超时次数(timeoutNumMax),超时处理逻辑(userEventTriggered,超时timeoutNumMax断开连接)


package demo4;
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent;
/**
 * @author CBeann
 * @create 2019-09-20 14:00
 */
public class TimeServerEventTrigger extends ChannelHandlerAdapter {
    //当前超时次数
    private int timeoutNum = 0;
    //运行最大超时次数
    private int timeoutNumMax = 4;
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        //如果有读操作,将timeoutNum==0
        timeoutNum = 0;
        ctx.fireChannelRead(msg);
    }
    //读写超时发送心跳
    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        IdleState state = ((IdleStateEvent) evt).state();
        if (state.equals(IdleState.READER_IDLE)) {
            System.out.println("客户端read超时" + (timeoutNum++) + "次");
            if (timeoutNum > timeoutNumMax) {
                System.out.println("超时次数上限,关闭这个不活跃的连接");
                ctx.channel().close();
            }
        }
    }
    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        System.out.println("客户端断开链接" + ctx.channel().localAddress().toString());
    }
}


运行结果:


######################伪代码####################
//客户端
socketChannel.pipeline().addLast(new IdleStateHandler(5, 5, 5, TimeUnit.SECONDS));
//服务端
socketChannel.pipeline().addLast(new IdleStateHandler(1, 1, 1, TimeUnit.SECONDS));
//客户端发数据
 f.channel().writeAndFlush(Unpooled.copiedBuffer("您好1".getBytes()));
Thread.sleep(4000);
f.channel().writeAndFlush(Unpooled.copiedBuffer("您好2".getBytes()));


客户端向服务端发送   您好1,然后睡4秒


这时候服务端没有读到数据,而且客户每5秒发送心跳


服务端的心跳检测是1秒,所以打印了3次read超时后,


客户端睡醒,发送了 您好2


服务端的心跳检测是1秒,所以打印了4次read超时后,达到timeoutNumMax上限,关闭连接,调用了channelInactive


3.jpg


4.jpg


服务端宕机客户端重连


这个不清楚,下面的参考超链接里的demo很优秀,实现了服务端宕机客户端多次重连


基本思路,代码看下面参考里的超链接


初步思路,报错调用自己,会超过栈的深度,最后报错


/**
 * Client client = new Client();
 * System.out.println("连接服务器中。。。");
 * client.connect();
 * 问题:这样会出现一直递归,会把栈的深度用完,消耗内存
 */
class Client {
    public void connect() throws Exception {
        try {
            Thread.sleep(2000);
            int i = 1 / 0;
        } catch (Exception e) {
            System.out.println("重新连接服务器中。。。");
            Thread.sleep(2000);
            connect();
        }
    }
}


升级思路 :当报错的时候重新开启一个线程,总有一个线程存在调用connect方法


/**
 *        Client2 client2 = new Client2();
 *         System.out.println("连接服务器中。。。");
 *         client2.connect();
 */
class Client2 {
    public void connect() throws Exception {
        try {
            Thread.sleep(2000);
            int i = 1 / 0;
        } catch (Exception e) {
            System.out.println("重新连接服务器中。。。");
            Thread.sleep(2000);
            new Thread(new Runnable() {
                @Override
                public void run() {
                    try {
                        Client2 client2 = new Client2();
                        client2.connect();
                    } catch (Exception e) {
                    }
                }
            }).start();
        }
    }
}


参考


文章里有几个小问题

channelRead0上的  @Override去掉

重新 messageReceived 方法


 protected void channelRead0(ChannelHandlerContext channelHandlerContext, Object o) throws Exception {
        System.out.println("client say"+o.toString());
        //重置心跳次数
        counter = 0;
    }
    @Override
    protected void messageReceived(ChannelHandlerContext ctx, Object msg) throws Exception {
        System.out.println("message receive");
    }



目录
相关文章
|
3月前
|
监控 网络协议 调度
Netty Review - 深入探讨Netty的心跳检测机制:原理、实战、IdleStateHandler源码分析
Netty Review - 深入探讨Netty的心跳检测机制:原理、实战、IdleStateHandler源码分析
104 0
|
4月前
|
网络协议 调度
Netty心跳检测
客户端的心跳检测对于任何长连接的应用来说,都是一个非常基础的功能。要理解心跳的重要性,首先需要从网络连接假死的现象说起。
|
8月前
|
Nacos
Netty自定义消息协议的实现逻辑处理粘包拆包、心跳机制
Netty自定义消息协议的实现逻辑处理粘包拆包、心跳机制
106 0
|
JSON 前端开发 安全
Netty进阶 -- 非阻塞网络编程 实现群聊+私聊+心跳检测系统
Netty进阶 -- 非阻塞网络编程 实现群聊+私聊+心跳检测系统
146 0
Netty进阶 -- 非阻塞网络编程 实现群聊+私聊+心跳检测系统
|
监控 数据可视化 Java
Netty(一) SpringBoot 整合长连接心跳机制(下)
Netty 是一个高性能的 NIO 网络框架,本文基于 SpringBoot 以常见的心跳机制来认识 Netty。
Netty(一) SpringBoot 整合长连接心跳机制(中)
Netty 是一个高性能的 NIO 网络框架,本文基于 SpringBoot 以常见的心跳机制来认识 Netty。
|
监控 Java
Netty(一) SpringBoot 整合长连接心跳机制(上)
Netty 是一个高性能的 NIO 网络框架,本文基于 SpringBoot 以常见的心跳机制来认识 Netty。
|
JSON 安全 JavaScript
Netty进阶 -- 非阻塞网络编程 实现群聊+私聊+心跳检测系统
通俗易懂带你完成Netty进阶 -- 非阻塞网络编程 实现群聊+私聊+心跳检测系统
197 1
Netty进阶 -- 非阻塞网络编程 实现群聊+私聊+心跳检测系统
|
7月前
|
监控 Java Linux
由浅入深Netty基础知识NIO网络编程1
由浅入深Netty基础知识NIO网络编程
38 0