运行代码下载(亲测有效)
链接:https://pan.baidu.com/s/1YaCQisOfovSCm5xV1XWtlA
提取码:ecc2
复制这段内容后打开百度网盘手机App,操作更方便哦
前提
在
Netty(一)之helloworld
的基础之上修改
实现客户端和服务端的心跳
原理
实现心跳有好几种方式,比如客户端给服务端发送心跳,服务端是否回复又是两种;服务端给客户端发送心跳,客户端是否回复又是两种,还有互相发送心跳。
具体哪种方式比较好?www.baidu.com
本文采用是客户端给服务端发送消息,服务端收到心跳请求给服务端相应;服务端多少次没有收到客户端的请求就主动断开连接。
核心是
构造方法 IdleStateHandler( long readerIdleTime, long writerIdleTime, long allIdleTime, TimeUnit unit) 实例 new IdleStateHandler(3, 3, 3, TimeUnit.SECONDS)
第一个个参数是读超时时间,第二个是写超时时间,第三个事读写超时时间,第四个参数是时间单位
添加了这个Netty框架提供的IdleStateHandler,就会调用Handler链上的userEventTriggered方法实现自己的心跳逻辑
操作步骤
在client添加两个handler,一个心跳机制IdleStatehandler,一个是心跳机制触发器(基础ChannelHandlerAdapter 并且主要实现userEventTriggered方法)
//心跳机制 socketChannel.pipeline().addLast(new IdleStateHandler(5, 5, 5, TimeUnit.SECONDS));//添加心跳机制 socketChannel.pipeline().addLast(new TimeClientEventTrigger());//添加心跳机制触发器
创建TimeClientEventTrigger,实现userEventTriggered方法
package demo4; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandlerAdapter; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.timeout.IdleState; import io.netty.handler.timeout.IdleStateEvent; /** * @author CBeann * @create 2019-09-18 11:45 */ public class TimeClientEventTrigger extends ChannelHandlerAdapter { //读写超时并且添加IdleStateHandler时,会触发这个方法 @Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { //获取超时对象,读超时,写超时还是读写超时 IdleState state = ((IdleStateEvent) evt).state(); //如果是读写超时,我这里整的简单了 if (state.equals(IdleState.ALL_IDLE)) { System.out.println("客户端发送心跳"); //给服务端端发送字符为(HeartBeat-req)的心跳请求 ctx.writeAndFlush(Unpooled.copiedBuffer("HeartBeat-req".getBytes())); } } }
修改TimeServerHandler里的channelRead方法
@Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { //服务器读客户端发送来的数据 ByteBuf buf = (ByteBuf) msg; byte[] req = new byte[buf.readableBytes()]; buf.readBytes(req); String body = new String(req, "UTF-8"); //如果是心跳请求 if ("HeartBeat-req".equals(body)) { System.out.println("服务端收到心跳,并且相应"); //服务器向客户端回应请求,关键字为(HeartBeat-resp) ByteBuf response = Unpooled.copiedBuffer("HeartBeat-resp".getBytes()); ctx.writeAndFlush(response); } else { System.out.println("The TimeServer receive :" + body); //服务器向客户端回应请求 ByteBuf response = Unpooled.copiedBuffer(new Date().toString().getBytes()); ctx.writeAndFlush(response); } }
修改TimeClientHandler里的channelRead方法
//客户端读取服务器发送的数据 @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { try { ByteBuf buf = (ByteBuf) msg; byte[] req = new byte[buf.readableBytes()]; buf.readBytes(req); String body = new String(req, "UTF-8"); //如果是服务端回应的心跳 if ("HeartBeat-resp".equals(body)) { System.out.println("服务端回应心跳请求"); } else {//如果是服务端回应的其它 System.out.println("Now is:" + body); } } catch (Exception e) { } finally { //标配 ReferenceCountUtil.release(msg); } }
此时运行的结果为:
客户端发送您好1 ,服务端回应 时间
客户端发送您好2 ,服务端回应 时间
while(1){
客户端发送心跳
服务端接收心跳并且相应
客户端接收心跳相应
}
客户端宕机通知服务端
在Hanlder中实现channelInactive方法就可以实现客户端宕机服务端会及时发现,在这里我在服务端的TimeServerHandler中实现的这个方法
@Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { System.out.println("客户端断开链接"+ctx.channel().localAddress().toString()); }
这个方法客户端暴力关闭会触发服务端的方法;服务端暴力关闭会触发客户端的这个方法。
运行结果:
开启服务端和客户端一起运行,当手动关闭客户端时候,服务端端打印如左下图的最偶一行
心跳多少次没有应答断开处理
如果心跳多少次没有应答,则服务端主动和客户端断开连接
需求为:
客户端的超时为IdleStateHandler(5, 5, 5, TimeUnit.SECONDS),超时给服务端发送心跳消息,
服务端也要添加一个IdleStateHandler,超时记录此数,并且多次超时后断开连接
给TimServer添加
socketChannel.pipeline().addLast(new IdleStateHandler(1, 1, 1, TimeUnit.SECONDS));//添加心跳机制 socketChannel.pipeline().addLast(new TimeServerEventTrigger());//添加心跳机制监听器
新建TimeServerEventTrigger
属性为当前超时次数(timeoutNum),运行最大超时次数(timeoutNumMax),超时处理逻辑(userEventTriggered,超时timeoutNumMax断开连接)
package demo4; import io.netty.channel.ChannelHandlerAdapter; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.timeout.IdleState; import io.netty.handler.timeout.IdleStateEvent; /** * @author CBeann * @create 2019-09-20 14:00 */ public class TimeServerEventTrigger extends ChannelHandlerAdapter { //当前超时次数 private int timeoutNum = 0; //运行最大超时次数 private int timeoutNumMax = 4; @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { //如果有读操作,将timeoutNum==0 timeoutNum = 0; ctx.fireChannelRead(msg); } //读写超时发送心跳 @Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { IdleState state = ((IdleStateEvent) evt).state(); if (state.equals(IdleState.READER_IDLE)) { System.out.println("客户端read超时" + (timeoutNum++) + "次"); if (timeoutNum > timeoutNumMax) { System.out.println("超时次数上限,关闭这个不活跃的连接"); ctx.channel().close(); } } } @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { System.out.println("客户端断开链接" + ctx.channel().localAddress().toString()); } }
运行结果:
######################伪代码#################### //客户端 socketChannel.pipeline().addLast(new IdleStateHandler(5, 5, 5, TimeUnit.SECONDS)); //服务端 socketChannel.pipeline().addLast(new IdleStateHandler(1, 1, 1, TimeUnit.SECONDS)); //客户端发数据 f.channel().writeAndFlush(Unpooled.copiedBuffer("您好1".getBytes())); Thread.sleep(4000); f.channel().writeAndFlush(Unpooled.copiedBuffer("您好2".getBytes()));
客户端向服务端发送 您好1,然后睡4秒
这时候服务端没有读到数据,而且客户每5秒发送心跳
服务端的心跳检测是1秒,所以打印了3次read超时后,
客户端睡醒,发送了 您好2
服务端的心跳检测是1秒,所以打印了4次read超时后,达到timeoutNumMax上限,关闭连接,调用了channelInactive
服务端宕机客户端重连
这个不清楚,下面的参考超链接里的demo很优秀,实现了服务端宕机客户端多次重连
基本思路,代码看下面参考里的超链接
初步思路,报错调用自己,会超过栈的深度,最后报错
/** * Client client = new Client(); * System.out.println("连接服务器中。。。"); * client.connect(); * 问题:这样会出现一直递归,会把栈的深度用完,消耗内存 */ class Client { public void connect() throws Exception { try { Thread.sleep(2000); int i = 1 / 0; } catch (Exception e) { System.out.println("重新连接服务器中。。。"); Thread.sleep(2000); connect(); } } }
升级思路 :当报错的时候重新开启一个线程,总有一个线程存在调用connect方法
/** * Client2 client2 = new Client2(); * System.out.println("连接服务器中。。。"); * client2.connect(); */ class Client2 { public void connect() throws Exception { try { Thread.sleep(2000); int i = 1 / 0; } catch (Exception e) { System.out.println("重新连接服务器中。。。"); Thread.sleep(2000); new Thread(new Runnable() { @Override public void run() { try { Client2 client2 = new Client2(); client2.connect(); } catch (Exception e) { } } }).start(); } } }
参考
文章里有几个小问题
channelRead0上的 @Override去掉
重新 messageReceived 方法
protected void channelRead0(ChannelHandlerContext channelHandlerContext, Object o) throws Exception { System.out.println("client say"+o.toString()); //重置心跳次数 counter = 0; } @Override protected void messageReceived(ChannelHandlerContext ctx, Object msg) throws Exception { System.out.println("message receive"); }