05、Netty学习笔记—(案例:聊天业务)(一)

本文涉及的产品
公共DNS(含HTTPDNS解析),每月1000万次HTTP解析
云解析 DNS,旗舰版 1个月
全局流量管理 GTM,标准版 1个月
简介: 05、Netty学习笔记—(案例:聊天业务)(一)

坑点说明


1、自己实现SimpleChannelInboundHandler的子类不添加@Sharable注解导致第二个客户端连接不上


针对于自己写继承SimpleChannelInboundHandler的类若是不加上@ChannelHandler.Sharable注解,当第二个客户端连接时就会立刻执行INACTIVE、UNREGISTERED事件,直接就会连接失败!




出现的问题展示:第二个客户端直接连不上



解决方案:在自定义handler上加上@Sharable注解即可!



具体业务草稿

客户端、服务端定义好指定的传输协议,之后根据指定的传输协议来进行传输数据。


首先是登陆业务:客户端在启动时创建一个线程来构造出指定的消息对象并进行发送数据。(该过程会按照指定的方式来进行对数据写协议,最终发送出去);服务端按照指定的协议规则进行处理取到发过来的协议数据并将其转为指定的对象。


①登陆-线程通信:客户端接收到服务端响应过来的数据后,如何进行两个线程见的交互处理。(一个是客户端active事件执行时创建的线程来发送给服务端数据的,另一个是客户端在执行channelread读取到服务端的数据(该线程是nio中),如何让这两个线程进行通信?)


使用一个countdownlatch工具类进行线程通信确定是否登陆接收到信息,对于登陆是否成功的状态则使用一个并发变量AtomicBoolean来表示。

②业务消息发送(客户端):登陆成功之后,使用一个线程来不断取得控制台的命令信息,根据不同的命令来进行发送指定的消息对象。


③单聊消息业务:在服务器端维护一个session集合,若是登陆成功就加入到该集合中(username,channel)。对于单聊业务消息主要有三部分内容:发送方,目的方,内容。服务端收到单聊消息时,从session集合中根据目的方取到指定channel,再使用该channel向目的方方发送数据。


④群聊建群拉人处理:create 群名 人1,人2,人3,使用一个map集合来临时存储用户名及对应channel的关系,在创建的过程中会向另外几个人的channel写提示数据,表示已经拉入群。


⑤群聊消息发送:gsend 群名 信息,根据群名称,先从sessionGroup中取到所有的用户名,接着根据每个用户取到sessionFactory中的所有channel,来依次发送群聊消息。


⑥获取群成员信息:gmembers 群民 信息,发送之后根据群名取到所有在群中的channel,接着依次向各个channel来进行写数据。


⑦加入群聊:gjoin [group name],直接添加到指定群集合中。


⑧退出群聊:gquit [group name],对相关联的集合进行解绑。


⑨退出登陆:客户端进行断开连接,服务端的话要进行一系列解绑动作!正常退出、异常退出:实现一个handleradapter即可,重写其中的channelInactive、exceptionActive方法


⑩空闲检测(发送心跳):设置IdleStateHandler来指定读、写、读+写监控时间描述,若是指定秒数后依旧没有事件发生,那么就会触发IdleState的事件,可以使用ChannelDuplexHandler重写其中的userEventTriggered来进行捕获。一般服务端对读进行设置5s,客户端进行写监控3秒(服务端一半时间)来进行发送心跳,表示当前还在连接中!


扩展:


①指定用户下线:quitmember [username]


客户端:解析命令来发送给服务端进行用户下线操作,指定某个用户名。

使用一个并发布尔变量俩进行表示下线,重写channelInactive、exceptionCaught事件来进行EXIT变量的设置,并且对于客户端三个读取控制台的阻塞事件下对EXIT变量进行判断提示已断开连接!

服务端:类似相同的对指定对象感兴趣handler,来进行业务操作,取到指定username的channel进行close()操作!


具体业务实现


①登陆业务


客户端:


①发送:自定义线程在channelActive事件中运行一个线程来主要与我们控制台进行交互,登陆业务同样也是如此,首先需要输入用户名密码,接着将其包装成预先设置好的LoginRequestMessage对象由channel发送出去。

②接收:eventloop中的线程接收到经过自定义协议解码取到的对象,将其转为LoginResponseMessage对象,判断其是否登陆成功。

核心:对于如何让eventloop中线程来进行通知主线程登陆成功,我们可以使用一个countdownlatch+AtomicBoolean,前者用于通知主线程拿到登陆结果,后者用于表示登陆的状态成功与否!

服务端:编写一个实现SimpleChannelInboundHandler的子类,指定接收LoginRequestMessage对象,接着来编写对应的channelRead()方法来进行业务操作,最终根据实际情况来向客户端返回一个LoginResponseMessage。


客户端


client:
//登陆消息通知计数器
CountDownLatch WAIT_FOR_LOGIN = new CountDownLatch(1);
//成功状态变量
AtomicBoolean LOGIN = new AtomicBoolean(false);
//负责接收服务器的响应数据
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    log.debug("msg: {}", msg);
    //单独处理登陆的响应结果,其他结果直接输出消息内容
    if (msg instanceof LoginResponseMessage) {
        LoginResponseMessage response = (LoginResponseMessage) msg;
        if (response.isSuccess()){
            LOGIN.set(true);//设置登陆状态为true
        }
        WAIT_FOR_LOGIN.countDown();//计数-1,若是为0,则会通知使用该计数器阻塞等待的线程
    }
}
@Override
public void channelActive(ChannelHandlerContext ctx){
    //负责接收用户在控制台上的输入,负责向服务器发送数据
    new Thread(()->{
        Scanner scanner = new Scanner(System.in);
        System.out.println("请输入用户名:");
        String username = scanner.nextLine();
        System.out.println("请输入密码:");
        String password = scanner.nextLine();
        //构造登陆消息对象发送给服务端
        Message message = new LoginRequestMessage(username, password);
        ctx.channel().writeAndFlush(message);
        try {
            //等待其他线程进行计数为0,此时才会唤醒向下执行
            WAIT_FOR_LOGIN.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        ...
    }, "system in").start();
}


服务端


import com.changlu.message.ChatRequestMessage;
import com.changlu.message.ChatResponseMessage;
import com.changlu.server.session.SessionFactory;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
/**
 * @ClassName ChatRequestMessageHandler
 * @Author ChangLu
 * @Date 2022/1/13 18:54
 * @Description 聊天请求对象处理器:针对于ChatRequestMessage
 */
//该处理器表示对解码得到的ChatRequestMessage对象感兴趣(根据自定义协议解码时读取到的对象决定,实际就是与客户端发送来的对象类型有关!)  可以看到这个ChatRequestMessage对象与在客户端封装的ChatRequestMessage一致
@ChannelHandler.Sharable
public class ChatRequestMessageHandler extends SimpleChannelInboundHandler<ChatRequestMessage> {
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, ChatRequestMessage msg) throws Exception {
        String to = msg.getTo();
        final Channel channel = SessionFactory.getSession().getChannel(to);
        //若是对方已下线,告知发送方消息
        if (channel == null){
            ctx.writeAndFlush(new ChatResponseMessage(msg.getFrom(), "对方用户不存在或已下线!"));
            return;
        }
        channel.writeAndFlush(new ChatResponseMessage(msg.getFrom(), msg.getContent()));
    }
}


server:


//handler
LoginRequestMessageHandler LOGIN_HANDLER = new LoginRequestMessageHandler();
 @Override
protected void initChannel(NioSocketChannel ch) throws Exception {
    ch.pipeline().addLast(new ProcotolFrameDecoder());//定长解码器(自己做的实现,已经定义好了解码规则)
    ch.pipeline().addLast(LOGGING_HANDLER);//日志处理器
    ch.pipeline().addLast(MESSAGE_CODEC);//协议解码器
    ch.pipeline().addLast(LOGIN_HANDLER);//【登陆处理handler】
}


效果:


服务端:



客户端



②客户端根据命令实现业务发送


需求:登陆成功之后,显示指定的提示信息来进行处理用户输入的命令;登陆失败,关闭连接。


客户端:同样在发送登陆请求的线程中执行。


@Override
public void channelActive(ChannelHandlerContext ctx){
    //负责接收用户在控制台上的输入,负责向服务器发送数据
    new Thread(()->{
        //...
        try {
            //接收响应后计数-1,停止阻塞继续向下执行
            WAIT_FOR_LOGIN.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        //获取登陆状态变量
        if (!LOGIN.get()) {
            System.out.println("登陆失败!");
            ctx.channel().close();
            return;
        }
        System.out.println("登陆成功!");
        while (true) {
            System.out.println("==================================");
            System.out.println("send [username] [content]");
            System.out.println("gsend [group name] [content]");
            System.out.println("gcreate [group name] [m1,m2,m3...]");
            System.out.println("gmembers [group name]");
            System.out.println("gjoin [group name]");
            System.out.println("gquit [group name]");
            System.out.println("quit");
            System.out.println("==================================");
            String command = scanner.nextLine();
            String[] split = command.split(" ");
            switch (split[0]){
                case "send" :
                    ctx.writeAndFlush(new ChatRequestMessage(username, split[1], split[2]));
                    break;
                case "gsend" :
                    ctx.writeAndFlush(new GroupChatRequestMessage(username, split[1], split[2]));
                    break;
                case "gcreate" :
                    Set<String> users = new HashSet<>(Arrays.asList(split[2].split(",")));
                    ctx.writeAndFlush(new GroupCreateRequestMessage(split[0],users));
                    break;
                case "gmembers" :
                    ctx.writeAndFlush(new GroupMembersRequestMessage(split[1]));
                    break;
                case "gjoin" :
                    ctx.writeAndFlush(new GroupJoinRequestMessage(username, split[1]));
                    break;
                case "gquit" :
                    ctx.writeAndFlush(new GroupQuitRequestMessage(username, split[1]));
                    break;
                case "quit" :
                    ctx.channel().close();
                    break;
            }
        }
    }, "system in").start();
}



③单聊业务(send [username] [content])


首先看一下基于内存的session实现:


import io.netty.channel.Channel;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
public class SessionMemoryImpl implements Session {
    //保存用户名与channel映射的map集合
    private final Map<String, Channel> usernameChannelMap = new ConcurrentHashMap<>();
    //保存channel与用户名映射的map集合
    private final Map<Channel, String> channelUsernameMap = new ConcurrentHashMap<>();
    //用于保存channel与绑定具体属性的map集合
    private final Map<Channel,Map<String,Object>> channelAttributesMap = new ConcurrentHashMap<>();
    //登陆成功:将对应的映射保存到三个map即可中
    @Override
    public void bind(Channel channel, String username) {
        usernameChannelMap.put(username, channel);
        channelUsernameMap.put(channel, username);
        channelAttributesMap.put(channel, new ConcurrentHashMap<>());
    }
    @Override
    public void unbind(Channel channel) {
        String username = channelUsernameMap.remove(channel);
        usernameChannelMap.remove(username);
        channelAttributesMap.remove(channel);
    }
    @Override
    public Object getAttribute(Channel channel, String name) {
        return channelAttributesMap.get(channel).get(name);
    }
    @Override
    public void setAttribute(Channel channel, String name, Object value) {
        channelAttributesMap.get(channel).put(name, value);
    }
    //根据username获取指定的channel
    @Override
    public Channel getChannel(String username) {
        return usernameChannelMap.get(username);
    }
    @Override
    public String toString() {
        return usernameChannelMap.toString();
    }
}


client:读取到控制台输入的命令信息,封装成一个ChatRequestMessage发送出去。


case "send" :
ctx.writeAndFlush(new ChatRequestMessage(username, split[1], split[2]));
break;


server:订阅经过自定义协议解码得到ChatRequestMessage对象,并对其来急性业务处理。


具体业务操作:①取到目的方名称,从session对象中取出指定的channel。②判断channel是否为空,若是为空表示该用户不在线,那么回发一条提示数据;若是不为空,则直接将数据内容由取到的channel发出。
新增channelUnregistered重写事件:解除绑定登陆的session channel。
import com.changlu.message.ChatRequestMessage;
import com.changlu.message.ChatResponseMessage;
import com.changlu.server.session.SessionFactory;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
/**
 * @ClassName ChatRequestMessageHandler
 * @Author ChangLu
 * @Date 2022/1/13 18:54
 * @Description 聊天请求对象处理器:针对于ChatRequestMessage
 */
@ChannelHandler.Sharable
public class ChatRequestMessageHandler extends SimpleChannelInboundHandler<ChatRequestMessage> {
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, ChatRequestMessage msg) throws Exception {
        String to = msg.getTo();
        final Channel channel = SessionFactory.getSession().getChannel(to);
        //若是对方已下线,告知发送方消息(取到null说明对方压根没有登陆上线过)
        if (channel == null){
            ctx.writeAndFlush(new ChatResponseMessage(msg.getFrom(), "对方用户不存在或已下线!"));
            return;
        }
        channel.writeAndFlush(new ChatResponseMessage(msg.getFrom(), msg.getContent()));
    }
}


服务端:


//该handler是可共享的,线程安全的
ChatRequestMessageHandler CHAT_HANDLER = new ChatRequestMessageHandler();
serverBootstrap.childHandler(new ChannelInitializer<NioSocketChannel>() {
    @Override
    protected void initChannel(NioSocketChannel ch) throws Exception {
           //...
           ch.pipeline().addLast(CHAT_HANDLER);//聊天请求处理handler
    }
});


效果:服务端先启动,之后启动两个client,并依次登陆lisi、wangwu。


登陆好之后lisi client执行命令send wangwu 123,此时服务器端收到消息,并使用wangwu的channel进行发送数据:



只登陆lisi client,接着直接执行命令send wangwu hello,此时lisi client收到消息,wangwu client还未上线



④群聊建群拉人处理(gcreate [group name] [m1,m2,m3…])


客户端:解析命令,读取到群名以及成员,邀请成员+自己组成一个set集合,最终包装成GroupCreateRequestMessage发送出去。


case "gcreate" :
    Set<String> users = new HashSet<>(Arrays.asList(split[2].split(",")));
    users.add(username);//将自己也添加到群聊中
    ctx.writeAndFlush(new GroupCreateRequestMessage(split[1],users));
    break;


服务端:编写一个对GroupCreateRequestMessage感兴趣的handler,来进行业务处理,核心本质就是向HashMap中添加群组、set成员集合。最终根据是否添加成功来进行相应的处理。


import com.changlu.message.GroupCreateRequestMessage;
import com.changlu.message.GroupCreateResponseMessage;
import com.changlu.server.session.*;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import java.util.List;
import java.util.Set;
/**
 * @ClassName GroupCreateRequestMessageHandler
 * @Author ChangLu
 * @Date 2022/1/14 14:46
 * @Description 新建群聊处理:创建群聊,并且拉入指定成员
 */
@ChannelHandler.Sharable
public class GroupCreateRequestMessageHandler  extends SimpleChannelInboundHandler<GroupCreateRequestMessage> {
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, GroupCreateRequestMessage msg) throws Exception {
        String groupName = msg.getGroupName();
        Set<String> members = msg.getMembers();
        //群组管理器
        final GroupSession groupSession = GroupSessionFactory.getGroupSession();
        Group group = groupSession.createGroup(groupName, members);
        //若是返回为null,表示原先没有,当前插入成功
        if (group == null){
            //响应一:创建成功,向原始客户端发送一条创建成功消息
            ctx.channel().writeAndFlush(new GroupCreateResponseMessage(true, "群组创建成功!"));
            List<Channel> membersChannel = groupSession.getMembersChannel(groupName);
            //响应二:向所有被拉入群聊的客户端发送一条被拉入群聊消息
            for (Channel channel : membersChannel) {
                channel.writeAndFlush(new GroupCreateResponseMessage(true, "你已经被拉取群聊:" + groupName));
            }
        } else {
            //响应三:创建失败,向源客户端发送提示信息
            ctx.channel().writeAndFlush(new GroupCreateResponseMessage(false, "群组已存在!"));
        }
    }
}


//处理GroupCreateRequestMessage的handler处理器
GroupCreateRequestMessageHandler GROUP_CREATE_HANDLER = new GroupCreateRequestMessageHandler();
@Override
protected void initChannel(NioSocketChannel ch) throws Exception {
    ...
    ch.pipeline().addLast(GROUP_CREATE_HANDLER);//创建群聊拉人处理handler
}


效果:创建三个客户端lisi、wangwu、zhaoliu,记着lisi client执行命令gcreate DreamGroup wangwu,zhaoliu


1、群聊创建成功,lisi client首先收到创建群聊成功消息,接着lisi自己以及新拉入群聊的wangwu、zhaoliu都收到被拉入群聊消息



其他人收到被拉入信息这里就不展示了


2、群聊创建失败,lisi client收到创建群聊失败消息。



⑤群聊消息发送(gsend [group name] [content])


客户端:解析命令,封装成GroupChatRequestMessage对象发送出去。


case "gsend" :
    ctx.writeAndFlush(new GroupChatRequestMessage(username, split[1], split[2]));
    break;


服务端:编写对GroupChatRequestMessage感兴趣的handler,紧接着根据群名获取到所有的channel,接着依次根据channel向外发送出去数据。


import com.changlu.message.GroupChatRequestMessage;
import com.changlu.message.GroupChatResponseMessage;
import com.changlu.server.session.GroupSessionFactory;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import java.util.List;
/**
 * @ClassName GroupChatRequestMessageHandler
 * @Author ChangLu
 * @Date 2022/1/14 15:26
 * @Description 群组聊天:向群组发送一条消息。【gcreate [group name] [m1,m2,m3...]】
 */
@ChannelHandler.Sharable
public class GroupChatRequestMessageHandler extends SimpleChannelInboundHandler<GroupChatRequestMessage> {
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, GroupChatRequestMessage msg) throws Exception {
        String groupName = msg.getGroupName();
        String content = msg.getContent();
        //根据群名取出所有的channel来进行发送数据
        List<Channel> membersChannel = GroupSessionFactory.getGroupSession().getMembersChannel(groupName);
        GroupChatResponseMessage responseMessage = new GroupChatResponseMessage(msg.getFrom(), content);
        responseMessage.setSuccess(true);
        for (Channel channel : membersChannel) {
            channel.writeAndFlush(responseMessage);
        }
    }
}


//指定的执行器
GroupChatRequestMessageHandler GROUP_CHAT_HANDLER = new GroupChatRequestMessageHandler();
@Override
protected void initChannel(NioSocketChannel ch) throws Exception {
    ...
    ch.pipeline().addLast(GROUP_CHAT_HANDLER);//向群聊发送消息处理handler
}


效果:在执行命令之前,首先需要执行gcreate命令,先创建群聊以及拉人,创建成功之后才能够进行群聊消息命令的发送执行





⑥获取群成员信息(gmembers [group name])


客户端:解析命令,封装成GroupMembersRequestMessage对象发送出去。


System.out.println("gmembers [group name]");
case "gmembers" :
    ctx.writeAndFlush(new GroupMembersRequestMessage(split[1]));
    break;


服务端:编写对GroupMembersRequestMessage感兴趣的handler,接着来执行业务操作,向源channel发送群成员信息。


import com.changlu.message.GroupMembersRequestMessage;
import com.changlu.message.GroupMembersResponseMessage;
import com.changlu.server.session.GroupSessionFactory;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import java.util.Set;
/**
 * @ClassName GroupMembersRequestMessageHandler
 * @Author ChangLu
 * @Date 2022/1/14 15:59
 * @Description 查看群成员信息:根据群名获取所有群成员信息。【gmembers [group name]】
 */
@ChannelHandler.Sharable
public class GroupMembersRequestMessageHandler extends SimpleChannelInboundHandler<GroupMembersRequestMessage> {
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, GroupMembersRequestMessage msg) throws Exception {
        final String groupName = msg.getGroupName();
        final Set<String> members = GroupSessionFactory.getGroupSession().getMembers(groupName);
        ctx.channel().writeAndFlush(new GroupMembersResponseMessage(members));
    }
}


GroupMembersRequestMessageHandler GROUP_MEMBERS_HANDLER = new GroupMembersRequestMessageHandler();
ch.pipeline().addLast(GROUP_MEMBERS_HANDLER);//获取指定群聊所有群成员处理handler


效果:




相关文章
|
7月前
|
JSON 算法 Dubbo
Netty入门实践-模拟IM聊天
本文以入门实践为主,通过原理+代码的方式,实现一个简易IM聊天功能。分为2个部分:Netty的核心概念、IM聊天简易实现。
|
7月前
|
前端开发 网络协议 Java
Netty | 工作流程图分析 & 核心组件说明 & 代码案例实践
Netty | 工作流程图分析 & 核心组件说明 & 代码案例实践
402 0
|
网络协议
由浅入深Netty聊天室案例
由浅入深Netty聊天室案例
59 0
|
消息中间件 分布式计算 NoSQL
由浅入深Netty入门案例
由浅入深Netty入门案例
133 0
|
Rust Dubbo 网络协议
通过 HTTP/2 协议案例学习 Java & Netty 性能调优:工具、技巧与方法论
通过 HTTP/2 协议案例学习 Java & Netty 性能调优:工具、技巧与方法论
12659 12
|
开发框架 JavaScript 前端开发
如何使用SpringBoot和Netty实现一个WebSocket服务器,并配合Vue前端实现聊天功能?
如何使用SpringBoot和Netty实现一个WebSocket服务器,并配合Vue前端实现聊天功能?
322 0
Netty入门到超神系列-聊天室案例
对于服务端而言需要做如下事情 selector监听客户端的链接 如果有“读”事件,就从通道读取数据 把数据转发给其他所有的客户端,要过滤掉发消息过来的客户端不用转发 对于客户端而言需要做如下事情 selector监听服务端的“读”事件 如果有数据从通道中读取数据,打印到控制台 监听键盘输入,向服务端发送消息
116 0
|
前端开发 JavaScript
Netty异步NIO框架(二)websocket 前端后端聊天 私聊及群聊
Netty异步NIO框架(二)websocket 前端后端聊天 私聊及群聊
|
网络协议 前端开发 Java
Netty异步NIO框架(一)java服务端与客户端实现聊天 websocket通道
Netty异步NIO框架(一)java服务端与客户端实现聊天 websocket通道
|
JSON Java 关系型数据库
JAVA基于Swing和Netty,仿QQ界面聊天小项目
先说一下这个小项目也算是我在大学做得第一个应该算的上是的项目的项目,前前后后用了20天左右吧。先是用swing写好了仿QQ界面(界面很丑)最后逻辑实现都是后面断断续续加进去的。写这个项目之前没有很好的规划在逻辑实现方面与数据库逻辑交互过于频繁。走了很多的弯路