⑦加入群聊(gjoin [group name])
客户端:解析命令,封装成GroupJoinRequestMessage发送出去。
System.out.println("gjoin [group name]"); case "gjoin" : ctx.writeAndFlush(new GroupJoinRequestMessage(username, split[1])); break;
服务端:获取到群名、用户名,来执行业务操作。
import com.changlu.message.GroupJoinRequestMessage; import com.changlu.message.GroupJoinResponseMessage; import com.changlu.server.session.Group; import com.changlu.server.session.GroupSessionFactory; import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; /** * @ClassName GroupJoinRequestMessageHandler * @Author ChangLu * @Date 2022/1/14 16:04 * @Description 加入群聊处理器:【gjoin [group name]】 */ @ChannelHandler.Sharable public class GroupJoinRequestMessageHandler extends SimpleChannelInboundHandler<GroupJoinRequestMessage> { @Override protected void channelRead0(ChannelHandlerContext ctx, GroupJoinRequestMessage msg) throws Exception { String groupName = msg.getGroupName(); String username = msg.getUsername(); Group group = GroupSessionFactory.getGroupSession().joinMember(groupName, username); if (group != null){ ctx.writeAndFlush(new GroupJoinResponseMessage(true, "群聊【"+ groupName +"】加入成功!")); }else{ ctx.writeAndFlush(new GroupJoinResponseMessage(false, "当前无该群聊!")); } } }
GroupJoinRequestMessageHandler GROUP_JOIN_HANDLER = new GroupJoinRequestMessageHandler(); ch.pipeline().addLast(GROUP_JOIN_HANDLER);//加入群聊处理handler
效果:
⑧退出群聊(gquit [group name])
客户端:解析命令,封装成GroupQuitRequestMessage发送出去。
System.out.println("gquit [group name]"); case "gquit" : ctx.writeAndFlush(new GroupQuitRequestMessage(username, split[1])); break;
服务端:获取到群名、用户名,来执行业务操作。
import com.changlu.message.GroupQuitRequestMessage; import com.changlu.message.GroupQuitResponseMessage; import com.changlu.server.session.Group; import com.changlu.server.session.GroupSessionFactory; import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; /** * @ClassName GroupQuitRequestMessageHandler * @Author ChangLu * @Date 2022/1/14 16:09 * @Description 退出群聊处理器:【gquit [group name]】 */ @ChannelHandler.Sharable public class GroupQuitRequestMessageHandler extends SimpleChannelInboundHandler<GroupQuitRequestMessage> { @Override protected void channelRead0(ChannelHandlerContext ctx, GroupQuitRequestMessage msg) throws Exception { final String username = msg.getUsername(); final String groupName = msg.getGroupName(); final Group group = GroupSessionFactory.getGroupSession().removeMember(groupName, username); if (group != null) { ctx.writeAndFlush(new GroupQuitResponseMessage(true, "退出群聊成功!")); }else{ ctx.writeAndFlush(new GroupQuitResponseMessage(true, "该群聊不存在!")); } } }
GroupQuitRequestMessageHandler GROUP_QUIT_HANDLER = new GroupQuitRequestMessageHandler(); ch.pipeline().addLast(GROUP_QUIT_HANDLER);//退出群聊处理handler
效果:
⑨退出登陆
客户端:解析命令,直接客户端关闭连接即可。
System.out.println("quit"); case "quit" : ctx.channel().close(); break;
服务端:若是客户端自己主动断开连接,那么服务端也会触发指定的inactive操作,那么此时取消绑定的channel集合!
import com.changlu.server.session.SessionFactory; import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import lombok.extern.slf4j.Slf4j; /** * @ClassName QuitHandler * @Author ChangLu * @Date 2022/1/14 16:16 * @Description 退出连接执行器 */ @Slf4j @ChannelHandler.Sharable public class QuitHandler extends ChannelInboundHandlerAdapter { // 当连接断开时触发inactive事件 @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { SessionFactory.getSession().unbind(ctx.channel()); log.debug("{} 连接已断开!", ctx.channel()); } // 当出现异常时触发 @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { SessionFactory.getSession().unbind(ctx.channel()); log.debug("{} 已经异常断开 异常是{}", ctx.channel(), cause.getMessage()); } }
QuitHandler QUIT_HANDLER = new QuitHandler(); ch.pipeline().addLast(QUIT_HANDLER);//退出及处理异常handler
效果:
⑩空闲检测(发送心跳)
知识点说明
网络编程中容易出现的问题:连接假死。对于服务器高负荷的时候就是一个很大的问题。netty提供了这中假死的方式,就是空闲检测器。(就是一个handler,IdleStateHandler),对于服务器来讲若是很长时间没有收到客户端数据,就可能认为该连接有问题或者很长一段时间写出的数据没有写出去。只要去监测你是不是读或写空闲时间太长了就行了或者读和写加在一起空闲的长度。
IdleStateHandler·:三个参数构造,参数1检测读的空闲时间超过了某秒,参数2检测写的空闲时间超过了多少秒,参数3检测读写都空闲的时间上线。单位秒。
若是指定秒数中没有收到channel发来数据,那么就会触发事件(read or write …),可以编写ChannelDuplexHandler重写其中的userEventTriggered来进行判断触发了什么事件。
心跳:针对于连接假死,人没有发数据,但可以让客户端定时的发送数据,为了向服务器证明或者就像服务器发送心跳包,那么在服务端、客户端各自设置自动检测,服务端检测读事件,客户端检测写事件,通常客户端的检测时间是服务端1/2时间。
代码
server:IdleStateHandler来指定检测事件的秒数来触发事件,ChannelDuplexHandler中用于进行指定事件捕获来关闭连接
@Override protected void initChannel(NioSocketChannel ch) throws Exception { ch.pipeline().addLast(new ProcotolFrameDecoder()); ch.pipeline().addLast(LOGGING_HANDLER); ch.pipeline().addLast(MESSAGE_CODEC); //在服务端,主要用来判断是不是读空闲时间过长 // 5s 内如果没有收到 channel 的数据,会触发一个 IdleState#READER_IDLE 事件 ch.pipeline().addLast(new IdleStateHandler(5, 0, 0)); ch.pipeline().addLast(new ChannelDuplexHandler(){ // 使用一个双向处理器ChannelDuplexHandler 可以同时作为入站和出站处理器 //用来捕获IdleStateHandler的事件 @Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { IdleStateEvent event = (IdleStateEvent) evt; //触发读事件 if (event.state() == IdleState.READER_IDLE) { log.debug("已经5s没有读到数据了!"); //关闭channel连接 ctx.channel().close(); } } }); //...其他handler }
client:对于服务端这种检测机制,客户端需要每3秒(服务端监测的一半)自动发送一个心跳消息来表示当前一直在连接着
bootstrap.handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast(new ProcotolFrameDecoder()); // ch.pipeline().addLast(LOGGING_HANDLER); ch.pipeline().addLast(MESSAGE_CODEC); //客户端,主要用来判断是不是写空闲时间过长,来进行发送心跳,表示当前用户并没有断开 // 3s 内如果没有收到 channel 的数据,会触发一个 IdleState#READER_IDLE 事件 ch.pipeline().addLast(new IdleStateHandler(0, 3, 0)); ch.pipeline().addLast(new ChannelDuplexHandler(){ // 使用一个双向处理器ChannelDuplexHandler 可以同时作为入站和出站处理器 //用来捕获IdleStateHandler的事件 @Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { IdleStateEvent event = (IdleStateEvent) evt; //触发读事件 if (event.state() == IdleState.WRITER_IDLE) { log.debug("已经3s没有读到数据了!"); ctx.channel().writeAndFlush(new PingMessage()); } } }); } }
效果:当服务端一直收到消息,那么就一直不会触发指定的事件,自然也不会关闭channel
扩展
①指定用户下线(quitmember [username])
客户端:解析命令quitmember [username],封装成QuitMemberRequestMessage对象发出。
System.out.println("quitmember [username]"); case "quitmember": ctx.writeAndFlush(new QuitMemberRequestMessage(split[1])); break;
针对于断开连接,需要额外定义如下两个事情:
设置一个布尔变量EXIT(AtomicBoolean),用于进行线程通信表示断开连接,在三个控制台等待输入阻塞事件下分别来对其EXIT判断是否断开连接!
重写channelInactive、exceptionCaught事件,一个是断开连接会触发的事件,另一个是出现异常捕捉到的事件。一旦捕获就设置EXIT为true!
AtomicBoolean EXIT = new AtomicBoolean(false);//检测断开连接变量
ch.pipeline().addLast("client handler", new ChannelInboundHandlerAdapter(){ // 在连接断开时触发 @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { log.debug("连接已断开,请按任意键退出..."); EXIT.set(true); } // 在出现异常时触发 @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { log.debug("连接已断开,请按任意键退出...,异常信息:{}",cause.getMessage()); EXIT.set(true); } @Override public void channelActive(ChannelHandlerContext ctx){ //负责接收用户在控制台上的输入,负责向服务器发送数据 new Thread(()->{ //... System.out.println("请输入用户名:"); String username = scanner.nextLine(); if (EXIT.get()){ return; } System.out.println("请输入密码:"); String password = scanner.nextLine(); if (EXIT.get()){ return; } //while(true)中等待输入命令下添加判断 String command = scanner.nextLine(); if (EXIT.get()){ return; } //... } }
服务端:编写针对于对QuitMemberRequestMessage感兴趣的handler,来取出指定下线的username,接着从sessionFactory中取到channel,进行手动断开即可!
import com.changlu.message.QuitMemberRequestMessage; import com.changlu.message.QuitMemberResponseMessage; import com.changlu.server.session.SessionFactory; import io.netty.channel.Channel; import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; /** * @ClassName QuitMemberRequestMessageHandler * @Author ChangLu * @Date 2022/1/14 21:48 * @Description 指定用户下线执行器:【quitmember [username]】 */ @ChannelHandler.Sharable public class QuitMemberRequestMessageHandler extends SimpleChannelInboundHandler<QuitMemberRequestMessage> { @Override protected void channelRead0(ChannelHandlerContext ctx, QuitMemberRequestMessage msg) throws Exception { String username = msg.getUsername(); Channel channel = SessionFactory.getSession().getChannel(username); if (channel == null) { ctx.writeAndFlush(new QuitMemberResponseMessage(true, "该用户不在线!")); } else { //取消绑定以及强制关闭 SessionFactory.getSession().unbind(channel); channel.close(); ctx.writeAndFlush(new QuitMemberResponseMessage(true, "该用户已强制下线!")); } } }
QuitMemberRequestMessageHandler QUIT_MEMBER_HANDLER = new QuitMemberRequestMessageHandler(); ch.pipeline().addLast(QUIT_MEMBER_HANDLER);//强制下线用户handler
效果: