基于Netty实现群聊系统

简介: 基于Netty实现群聊系统

在之前的文章中,我们介绍过基于Netty实现一个RPC框架。除此之外,在工作中Netty也被广泛应用于实现即时通讯的技术方案之一,今天我们就来看一看,基于Netty如何实现一个简单的群聊系统。

服务端启动代码中,创建两个EventLoopGroup事件循环线程组,bossGroup专门负责接收客户端的连接,workerGroup专门负责网络的读写。之后使用ServerBootstrap服务端启动引导类配置整个Netty程序,串联各个组件进行启动,这里对于参数的配置不再赘述。

public class GroupChatServer {
    private int port;
    public GroupChatServer(int port) {
        this.port = port;
    }
    public void run() {
        EventLoopGroup bossGroup = new NioEventLoopGroup(1);
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            ServerBootstrap bootstrap = new ServerBootstrap();
            bootstrap.group(bossGroup, workerGroup)
                    .channel(NioServerSocketChannel.class)
                    .option(ChannelOption.SO_BACKLOG, 128)
                    .childOption(ChannelOption.SO_KEEPALIVE, true)
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            ChannelPipeline pipeline = ch.pipeline();
                            pipeline.addLast("decoder", new StringDecoder());
                            pipeline.addLast("encoder", new StringEncoder());
                            pipeline.addLast(new GroupChatServerHandler());
                        }
                    });
            System.out.println("NETTY SERVER IS READY");
            ChannelFuture channelFuture = bootstrap.bind(port).sync();
            channelFuture.channel().closeFuture().sync();
        }catch (Exception e){
            e.printStackTrace();
        }finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }
}

在服务端启动时核心逻辑为:

  • 获取管道pipeline
  • pipeline中加入解码器和编码器
  • 加入业务处理器handler

具体的业务处理器在下面进行定义,继承SimpleChannelInboundHandler类,处理接收的消息:

public class GroupChatServerHandler extends SimpleChannelInboundHandler<String> {
    private static ChannelGroup channelGroup=new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
    ...
}

首先定义一个ChannelGroup 组,管理所有channel

GlobalEventExecutor.INSTANCE 表示是全局的事件执行器,是一个单例模式。

下面看看需要重写的几个核心方法:

@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
    Channel channel = ctx.channel();
    channelGroup.writeAndFlush("[客户端]"+channel.remoteAddress()+"加入聊天"+sdf.format(new Date())+"\n");
    channelGroup.add(channel);
}

handlerAdded 表示连接建立,一旦连接将被第一个被执行。在该方法中,将当前channel加入到channelGroup。并将该客户加入聊天的信息推送给其他在线的客户端,这里遍历channelGroup中所有channel,并发送消息。

@Override
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
    Channel channel = ctx.channel();
    channelGroup.writeAndFlush("[客户端]"+channel.remoteAddress()+"离开了\n");
    System.out.println("channelGroup size:"+channelGroup.size());
}

断开连接会触发handlerRemoved方法。在该方法中,将下线消息推送给当前在线的客户。需要注意,执行了当前方法时就相当于已经执行了:

channelGroup.remove(channel);

这里会自动执行remove方法,所以就不需要我们再额外手动调用remove方法了。

@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
    System.out.println(ctx.channel().remoteAddress()+" 上线了");
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
    System.out.println(ctx.channel().remoteAddress()+" 离线了");
}

channelActive方法和channelInactive方法表示channel处于活动状态或不活动状态,这里仅打印上下线信息。

@Override
protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
    Channel channel = ctx.channel();
    channelGroup.forEach(ch->{
        if(channel!=ch){
            ch.writeAndFlush("[客户]"+channel.remoteAddress()+" 发送了消息:"+msg+"\n");
        }else{显示自己发送的信息
            ch.writeAndFlush("[自己]发送了消息:"+msg+"\n");
        }
    });
}

channelRead0()是读取数据方法,在该方法中,遍历channelGroup,根据不同情况,发送不同消息。

  • 如果不是当前的channel,那么显示其他客户端发送了消息
  • 如果是当前的channel,显示自己发送了信息
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
    ctx.close();
}

最后,在发生异常时执行exceptionCaught方法,关闭ChannelHandlerContext

客户端详细代码如下:

public class GroupChatClient {
    private final String host;
    private final int port;
    public GroupChatClient(String host, int port) {
        this.host = host;
        this.port = port;
    }
    public void run(){
        EventLoopGroup eventLoopGroup = new NioEventLoopGroup();
        try{
            Bootstrap bootstrap = new Bootstrap()
                    .group(eventLoopGroup)
                    .channel(NioSocketChannel.class)
                    .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            ChannelPipeline pipeline = ch.pipeline();
                            pipeline.addLast("decoder", new StringDecoder());
                            pipeline.addLast("encoder", new StringEncoder());
                            pipeline.addLast(new GroupChatClientHandler());
                        }
                    });
            System.out.println("NETTY CLIENT IS READY");
            ChannelFuture channelFuture = bootstrap.connect(host, port).sync();
            Channel channel = channelFuture.channel();
            System.out.println("----"+channel.localAddress()+"---");
            Scanner scanner=new Scanner(System.in);
            while (scanner.hasNext()){
                String msg = scanner.nextLine();
                channel.writeAndFlush(msg+"\r\n");
            }
        }catch (Exception e){
            e.printStackTrace();
        }finally {
            eventLoopGroup.shutdownGracefully();
        }
    }
}

获取管道、向管道加入编码解码器和设置业务处理器handler的过程与服务端基本相同。与服务端明显不同的是,客户端需要输入发送给别人的信息,因此创建一个扫描器,接收来自键盘的输入。

客户端业务处理器,同样继承SimpleChannelInboundHandler

public class GroupChatClientHandler extends SimpleChannelInboundHandler<String> {
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
        System.out.println(msg.trim());
    }
}

相对于服务端相比非常简单,只需要打印接收的信息即可。

下面对这个过程进行一下测试,首先启动一个服务端和三个客户端,在服务端会打印客户端的上线信息:

image.png

服务端启动时打印的信息:

image.png

较早登录的客户端会收到服务端转发的其他客户端的上线信息:

image.png

发送接收消息测试,会根据发送者的不同进行显示的区分:

image.png

其余客户端下线时,当前客户端会显示下线信息:

image.png

服务端显示的下线信息:

image.png

到这里,一个群聊系统的基本功能就已经实现了。需要注意的是,这里是不支持点对点的聊天的,如果再需要点对点的聊天,那么就不能使用ChannelGroup了,可以使用HashMap缓存各个Channel的信息,实现定向的消息发送。

相关文章
|
4月前
|
消息中间件 缓存 算法
基于Netty的自研流系统缓存实现挑战: 内存碎片与OOM困境
基于Netty的自研流系统缓存实现挑战: 内存碎片与OOM困境
53 1
基于Netty的自研流系统缓存实现挑战: 内存碎片与OOM困境
|
3月前
|
前端开发 网络协议
Netty实战巅峰:从零构建高性能IM即时通讯系统,解锁并发通信新境界
【8月更文挑战第3天】Netty是一款高性能、异步事件驱动的网络框架,适用于开发高并发网络应用,如即时通讯(IM)系统。本文将指导你利用Netty从零构建高性能IM程序,介绍Netty基础及服务器/客户端设计。服务器端使用`ServerBootstrap`启动,客户端通过`Bootstrap`连接服务器。示例展示了简单的服务器启动过程。通过深入学习,可进一步实现用户认证等功能,打造出更完善的IM系统。
141 1
|
存储 编解码 网络协议
Netty各组件基本用法、入站和出站详情、群聊系统的实现、粘包和拆包
Netty各组件基本用法、入站和出站详情、群聊系统的实现、粘包和拆包
119 0
|
前端开发 JavaScript
Netty异步NIO框架(二)websocket 前端后端聊天 私聊及群聊
Netty异步NIO框架(二)websocket 前端后端聊天 私聊及群聊
|
消息中间件 前端开发 JavaScript
太顶了,使用 Netty 实现了一个 IM 即时通讯系统
太顶了,使用 Netty 实现了一个 IM 即时通讯系统
Netty实战与源码剖析(二)——基于NIO的群聊系统
Netty实战与源码剖析(二)——基于NIO的群聊系统
191 1
|
存储 Java Go
基于Netty,从零开发IM(三):编码实践篇(群聊功能)
接上两篇《IM系统设计篇》、《编码实践篇(单聊功能)》,本篇主要讲解的是通过实战编码实现IM的群聊功能,内容涉及群聊技术实现原理、编码实践等知识。
220 0
基于Netty,从零开发IM(三):编码实践篇(群聊功能)
|
Java Unix
小六六学Netty系列之Netty群聊
前言 文本已收录至我的GitHub仓库,欢迎Star:github.com/bin39232820… 种一棵树最好的时间是十年前,其次是现在
141 0
|
JSON 前端开发 安全
Netty进阶 -- 非阻塞网络编程 实现群聊+私聊+心跳检测系统
Netty进阶 -- 非阻塞网络编程 实现群聊+私聊+心跳检测系统
170 0
Netty进阶 -- 非阻塞网络编程 实现群聊+私聊+心跳检测系统