05、Netty学习笔记—(案例:聊天业务)(二)

本文涉及的产品
全局流量管理 GTM,标准版 1个月
云解析 DNS,旗舰版 1个月
公共DNS(含HTTPDNS解析),每月1000万次HTTP解析
简介: 05、Netty学习笔记—(案例:聊天业务)(二)

⑦加入群聊(gjoin [group name])


客户端:解析命令,封装成GroupJoinRequestMessage发送出去。


System.out.println("gjoin [group name]");
case "gjoin" :
    ctx.writeAndFlush(new GroupJoinRequestMessage(username, split[1]));
    break;


服务端:获取到群名、用户名,来执行业务操作。


import com.changlu.message.GroupJoinRequestMessage;
import com.changlu.message.GroupJoinResponseMessage;
import com.changlu.server.session.Group;
import com.changlu.server.session.GroupSessionFactory;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
/**
 * @ClassName GroupJoinRequestMessageHandler
 * @Author ChangLu
 * @Date 2022/1/14 16:04
 * @Description 加入群聊处理器:【gjoin [group name]】
 */
@ChannelHandler.Sharable
public class GroupJoinRequestMessageHandler extends SimpleChannelInboundHandler<GroupJoinRequestMessage> {
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, GroupJoinRequestMessage msg) throws Exception {
        String groupName = msg.getGroupName();
        String username = msg.getUsername();
        Group group = GroupSessionFactory.getGroupSession().joinMember(groupName, username);
        if (group != null){
            ctx.writeAndFlush(new GroupJoinResponseMessage(true, "群聊【"+ groupName +"】加入成功!"));
        }else{
            ctx.writeAndFlush(new GroupJoinResponseMessage(false, "当前无该群聊!"));
        }
    }
}



GroupJoinRequestMessageHandler GROUP_JOIN_HANDLER = new GroupJoinRequestMessageHandler();
ch.pipeline().addLast(GROUP_JOIN_HANDLER);//加入群聊处理handler


效果:




⑧退出群聊(gquit [group name])


客户端:解析命令,封装成GroupQuitRequestMessage发送出去。


System.out.println("gquit [group name]");
case "gquit" :
    ctx.writeAndFlush(new GroupQuitRequestMessage(username, split[1]));
    break;


服务端:获取到群名、用户名,来执行业务操作。


import com.changlu.message.GroupQuitRequestMessage;
import com.changlu.message.GroupQuitResponseMessage;
import com.changlu.server.session.Group;
import com.changlu.server.session.GroupSessionFactory;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
/**
 * @ClassName GroupQuitRequestMessageHandler
 * @Author ChangLu
 * @Date 2022/1/14 16:09
 * @Description 退出群聊处理器:【gquit [group name]】
 */
@ChannelHandler.Sharable
public class GroupQuitRequestMessageHandler extends SimpleChannelInboundHandler<GroupQuitRequestMessage> {
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, GroupQuitRequestMessage msg) throws Exception {
        final String username = msg.getUsername();
        final String groupName = msg.getGroupName();
        final Group group = GroupSessionFactory.getGroupSession().removeMember(groupName, username);
        if (group != null) {
            ctx.writeAndFlush(new GroupQuitResponseMessage(true, "退出群聊成功!"));
        }else{
            ctx.writeAndFlush(new GroupQuitResponseMessage(true, "该群聊不存在!"));
        }
    }
}



GroupQuitRequestMessageHandler GROUP_QUIT_HANDLER = new GroupQuitRequestMessageHandler();
ch.pipeline().addLast(GROUP_QUIT_HANDLER);//退出群聊处理handler


效果:



⑨退出登陆


客户端:解析命令,直接客户端关闭连接即可。


System.out.println("quit");
case "quit" :
    ctx.channel().close();
    break;


服务端:若是客户端自己主动断开连接,那么服务端也会触发指定的inactive操作,那么此时取消绑定的channel集合!


import com.changlu.server.session.SessionFactory;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import lombok.extern.slf4j.Slf4j;
/**
 * @ClassName QuitHandler
 * @Author ChangLu
 * @Date 2022/1/14 16:16
 * @Description 退出连接执行器
 */
@Slf4j
@ChannelHandler.Sharable
public class QuitHandler extends ChannelInboundHandlerAdapter {
    // 当连接断开时触发inactive事件
    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        SessionFactory.getSession().unbind(ctx.channel());
        log.debug("{} 连接已断开!", ctx.channel());
    }
    // 当出现异常时触发
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        SessionFactory.getSession().unbind(ctx.channel());
        log.debug("{} 已经异常断开 异常是{}", ctx.channel(), cause.getMessage());
    }
}


QuitHandler QUIT_HANDLER = new QuitHandler();
ch.pipeline().addLast(QUIT_HANDLER);//退出及处理异常handler


效果:



⑩空闲检测(发送心跳)


知识点说明


网络编程中容易出现的问题:连接假死。对于服务器高负荷的时候就是一个很大的问题。netty提供了这中假死的方式,就是空闲检测器。(就是一个handler,IdleStateHandler),对于服务器来讲若是很长时间没有收到客户端数据,就可能认为该连接有问题或者很长一段时间写出的数据没有写出去。只要去监测你是不是读或写空闲时间太长了就行了或者读和写加在一起空闲的长度。


IdleStateHandler·:三个参数构造,参数1检测读的空闲时间超过了某秒,参数2检测写的空闲时间超过了多少秒,参数3检测读写都空闲的时间上线。单位秒。

若是指定秒数中没有收到channel发来数据,那么就会触发事件(read or write …),可以编写ChannelDuplexHandler重写其中的userEventTriggered来进行判断触发了什么事件。

心跳:针对于连接假死,人没有发数据,但可以让客户端定时的发送数据,为了向服务器证明或者就像服务器发送心跳包,那么在服务端、客户端各自设置自动检测,服务端检测读事件,客户端检测写事件,通常客户端的检测时间是服务端1/2时间。


代码


server:IdleStateHandler来指定检测事件的秒数来触发事件,ChannelDuplexHandler中用于进行指定事件捕获来关闭连接


@Override
protected void initChannel(NioSocketChannel ch) throws Exception {
    ch.pipeline().addLast(new ProcotolFrameDecoder());
    ch.pipeline().addLast(LOGGING_HANDLER);
    ch.pipeline().addLast(MESSAGE_CODEC);
    //在服务端,主要用来判断是不是读空闲时间过长
    // 5s 内如果没有收到 channel 的数据,会触发一个 IdleState#READER_IDLE 事件
    ch.pipeline().addLast(new IdleStateHandler(5, 0, 0));
    ch.pipeline().addLast(new ChannelDuplexHandler(){ // 使用一个双向处理器ChannelDuplexHandler 可以同时作为入站和出站处理器
        //用来捕获IdleStateHandler的事件
        @Override
        public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
            IdleStateEvent event = (IdleStateEvent) evt;
            //触发读事件
            if (event.state() == IdleState.READER_IDLE) {
                log.debug("已经5s没有读到数据了!");
                //关闭channel连接
                ctx.channel().close();
            }
        }
    });
    //...其他handler
}



client:对于服务端这种检测机制,客户端需要每3秒(服务端监测的一半)自动发送一个心跳消息来表示当前一直在连接着


bootstrap.handler(new ChannelInitializer<SocketChannel>() {
    @Override
    protected void initChannel(SocketChannel ch) throws Exception {
        ch.pipeline().addLast(new ProcotolFrameDecoder());
        //                    ch.pipeline().addLast(LOGGING_HANDLER);
        ch.pipeline().addLast(MESSAGE_CODEC);
        //客户端,主要用来判断是不是写空闲时间过长,来进行发送心跳,表示当前用户并没有断开
        // 3s 内如果没有收到 channel 的数据,会触发一个 IdleState#READER_IDLE 事件
        ch.pipeline().addLast(new IdleStateHandler(0, 3, 0));
        ch.pipeline().addLast(new ChannelDuplexHandler(){ // 使用一个双向处理器ChannelDuplexHandler 可以同时作为入站和出站处理器
            //用来捕获IdleStateHandler的事件
            @Override
            public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
                IdleStateEvent event = (IdleStateEvent) evt;
                //触发读事件
                if (event.state() == IdleState.WRITER_IDLE) {
                    log.debug("已经3s没有读到数据了!");
                    ctx.channel().writeAndFlush(new PingMessage());
                }
            }
        });
    }
}



效果:当服务端一直收到消息,那么就一直不会触发指定的事件,自然也不会关闭channel



扩展

①指定用户下线(quitmember [username])

客户端:解析命令quitmember [username],封装成QuitMemberRequestMessage对象发出。


System.out.println("quitmember [username]");
case "quitmember":
    ctx.writeAndFlush(new QuitMemberRequestMessage(split[1]));
    break;


针对于断开连接,需要额外定义如下两个事情:

设置一个布尔变量EXIT(AtomicBoolean),用于进行线程通信表示断开连接,在三个控制台等待输入阻塞事件下分别来对其EXIT判断是否断开连接!

重写channelInactive、exceptionCaught事件,一个是断开连接会触发的事件,另一个是出现异常捕捉到的事件。一旦捕获就设置EXIT为true!

AtomicBoolean EXIT = new AtomicBoolean(false);//检测断开连接变量


ch.pipeline().addLast("client handler", new ChannelInboundHandlerAdapter(){
    // 在连接断开时触发
    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        log.debug("连接已断开,请按任意键退出...");
        EXIT.set(true);
    }
    // 在出现异常时触发
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        log.debug("连接已断开,请按任意键退出...,异常信息:{}",cause.getMessage());
        EXIT.set(true);
    }
    @Override
    public void channelActive(ChannelHandlerContext ctx){
        //负责接收用户在控制台上的输入,负责向服务器发送数据
        new Thread(()->{
            //...
            System.out.println("请输入用户名:");
            String username = scanner.nextLine();
            if (EXIT.get()){
                return;
            }
            System.out.println("请输入密码:");
            String password = scanner.nextLine();
            if (EXIT.get()){
                return;
            }
            //while(true)中等待输入命令下添加判断
            String command = scanner.nextLine();
            if (EXIT.get()){
                return;
            }
            //...
    }
}


服务端:编写针对于对QuitMemberRequestMessage感兴趣的handler,来取出指定下线的username,接着从sessionFactory中取到channel,进行手动断开即可!


import com.changlu.message.QuitMemberRequestMessage;
import com.changlu.message.QuitMemberResponseMessage;
import com.changlu.server.session.SessionFactory;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
/**
 * @ClassName QuitMemberRequestMessageHandler
 * @Author ChangLu
 * @Date 2022/1/14 21:48
 * @Description 指定用户下线执行器:【quitmember [username]】
 */
@ChannelHandler.Sharable
public class QuitMemberRequestMessageHandler extends SimpleChannelInboundHandler<QuitMemberRequestMessage> {
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, QuitMemberRequestMessage msg) throws Exception {
        String username = msg.getUsername();
        Channel channel = SessionFactory.getSession().getChannel(username);
        if (channel == null) {
            ctx.writeAndFlush(new QuitMemberResponseMessage(true, "该用户不在线!"));
        } else {
            //取消绑定以及强制关闭
            SessionFactory.getSession().unbind(channel);
            channel.close();
            ctx.writeAndFlush(new QuitMemberResponseMessage(true, "该用户已强制下线!"));
        }
    }
}


QuitMemberRequestMessageHandler QUIT_MEMBER_HANDLER = new QuitMemberRequestMessageHandler();
ch.pipeline().addLast(QUIT_MEMBER_HANDLER);//强制下线用户handler


效果:



相关文章
|
7月前
|
JSON 算法 Dubbo
Netty入门实践-模拟IM聊天
本文以入门实践为主,通过原理+代码的方式,实现一个简易IM聊天功能。分为2个部分:Netty的核心概念、IM聊天简易实现。
|
7月前
|
前端开发 网络协议 Java
Netty | 工作流程图分析 & 核心组件说明 & 代码案例实践
Netty | 工作流程图分析 & 核心组件说明 & 代码案例实践
384 0
|
网络协议
由浅入深Netty聊天室案例
由浅入深Netty聊天室案例
58 0
|
消息中间件 分布式计算 NoSQL
由浅入深Netty入门案例
由浅入深Netty入门案例
133 0
|
Rust Dubbo 网络协议
通过 HTTP/2 协议案例学习 Java & Netty 性能调优:工具、技巧与方法论
通过 HTTP/2 协议案例学习 Java & Netty 性能调优:工具、技巧与方法论
12657 12
|
开发框架 JavaScript 前端开发
如何使用SpringBoot和Netty实现一个WebSocket服务器,并配合Vue前端实现聊天功能?
如何使用SpringBoot和Netty实现一个WebSocket服务器,并配合Vue前端实现聊天功能?
316 0
Netty入门到超神系列-聊天室案例
对于服务端而言需要做如下事情 selector监听客户端的链接 如果有“读”事件,就从通道读取数据 把数据转发给其他所有的客户端,要过滤掉发消息过来的客户端不用转发 对于客户端而言需要做如下事情 selector监听服务端的“读”事件 如果有数据从通道中读取数据,打印到控制台 监听键盘输入,向服务端发送消息
115 0
|
前端开发 JavaScript
Netty异步NIO框架(二)websocket 前端后端聊天 私聊及群聊
Netty异步NIO框架(二)websocket 前端后端聊天 私聊及群聊
|
网络协议 前端开发 Java
Netty异步NIO框架(一)java服务端与客户端实现聊天 websocket通道
Netty异步NIO框架(一)java服务端与客户端实现聊天 websocket通道
|
JSON Java 关系型数据库
JAVA基于Swing和Netty,仿QQ界面聊天小项目
先说一下这个小项目也算是我在大学做得第一个应该算的上是的项目的项目,前前后后用了20天左右吧。先是用swing写好了仿QQ界面(界面很丑)最后逻辑实现都是后面断断续续加进去的。写这个项目之前没有很好的规划在逻辑实现方面与数据库逻辑交互过于频繁。走了很多的弯路