基于Netty实现群聊功能

简介: 基于Netty实现群聊功能

前言:

在前面有了对NIOBIO知识的学习,以及对netty结构组的基本了解,接下来将学习一下如何使用netty去实现一个群聊功能,读者可自行去对比基于NIOBIO、Netty实现群聊功能的不同方式,以更深刻的理解IO网络编程

学习内容:

整体思路:

1) 编写一个Netty群聊系统,实现服务器端和客户端之间的数据简单通讯(非阻塞)

2) 实现多人聊天

3) 服务器端:可以监测用户上线,离线,并实现消息转发

4) 客户端: 通过channel可以无阻塞发送消息给其他所有用户,同时可接收到其他用户发送的消息)

5)目的:进一步理解Netty非阻塞网络编程机制

具体实现

服务端代码

import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.Unpooled;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.util.CharsetUtil;
import java.util.Scanner;
/**
* @PackageName: com.netty.demo.chart
* @author: youjp
* @create: 2021-01-22 10:05
* @description: 群聊服务端
* @Version: 1.0
*/
public class NettyChartServer {
    /**
     * 服务端口号
     */
    private int port;
    public NettyChartServer(int port) {
        this.port = port;
    }
    /**
     * 服务执行:用于处理客户端请求
     */
    public void run() {
        //服务线程组
        EventLoopGroup bossgroup = new NioEventLoopGroup(1);
        //工作组线程池,默认为CPU核数*2
        EventLoopGroup workgroup = new NioEventLoopGroup();
        ServerBootstrap bootstrap = new ServerBootstrap();
        try {
            bootstrap.group(bossgroup, workgroup)
                    .channel(NioServerSocketChannel.class) //可以监听新进来的TCP连接的通道
                    .option(ChannelOption.SO_BACKLOG, 128) // 设置线程队列得到连接个数
                    .childOption(ChannelOption.SO_KEEPALIVE, true) // 设置保持活动连接状态
                     .handler(new LoggingHandler(LogLevel.INFO))//心跳检测日志
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel socketChannel) throws Exception {
                            //获取到piepline管道
                            ChannelPipeline pipeline = socketChannel.pipeline();
                            //向pipeline加入解码器
                            pipeline.addLast("decoder", new StringDecoder());
                            //向pipeline加入编码器
                            pipeline.addLast("encoder", new StringEncoder());
                            //心跳检测handle
                          pipeline.addLast("idcheck",new IdleStateHandler(3,5,8, TimeUnit.SECONDS));
                            //加入自己的业务处理handler
                            pipeline.addLast("handle",new MyNettyServerHandler());
                        }
                    });
            ChannelFuture ch = bootstrap.bind(port).sync();
            System.out.println("服务器 is ready-------");
            //异步监听端口
            ch.addListeners(e->{
                if(e.isSuccess()){
                    System.out.println("监听端口 "+port+" 成功");
                }else {
                    System.out.println("监听端口 "+port+" 失败");
                }
            });
            //异步关闭通道
            ch.channel().closeFuture().sync();
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            //优雅退出线程池
            bossgroup.shutdownGracefully();
            workgroup.shutdownGracefully();
        }
    }
    public static void main(String[] args) {
        NettyChartServer server=new NettyChartServer(8888);
        server.run();
    }
}

服务端主要作用是对8888端口进行监听,等待客户端的请求。使用了主从Reactor模式,去实现接收处理多个客户端请求。利用自定义业务处理handler 完成对信息的获取,以及客户端直接信息转发。

服务端业务处理器

import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.util.concurrent.GlobalEventExecutor;
import java.text.SimpleDateFormat;
import java.util.HashMap;
/**
* @PackageName: com.netty.demo.chart
* @author: youjp
* @create: 2021-01-22 10:43
* @description: 业务自定义的逻辑处理类:它是入站 ChannelInboundHandler 类型的处理器,负责接收解码后的 HTTP 请求数据,并将请求处理结果写回客户端。
* @Version: 1.0
*/
public class MyNettyServerHandler  extends SimpleChannelInboundHandler<String> {
    //定义一个channerl集合,类似于List<Channel> ,用于存储不同的channel通道转发信息
    private static ChannelGroup channelGroup=new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
    //設計一個集合可存儲通道通道对应的用户,未开发改功能
    private static HashMap<String,Channel> channelHashMap=new HashMap<>();
    private SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
    /**
     * 该channel建立连接后: 生命周期 handlerAdded》 channelRegistered-》channelActive,
     * 表示连接建立,一旦连接,第一个执行
     * @param ctx
     * @throws Exception
     */
    @Override
    public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
        System.out.println("------------handlerAdded------");
        Channel channel = ctx.channel();
        channelGroup.add(channel);
        channelGroup.writeAndFlush("【客戶端】"+channel.remoteAddress()+"加入聊天");
    }
    /**
     * 连接中断
     * @param ctx
     * @throws Exception
     */
    @Override
    public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
        System.out.println("------------handlerRemoved------");
        Channel channel=ctx.channel();
        //服务下线,移除该channel
        channelGroup.remove(channel);
        //并通知,已离线
        channelGroup.writeAndFlush("【客戶端】"+channel.remoteAddress()+"离开了~");
    }
    /**
     * 表示该channel处于活动状态
     * @param ctx
     * @throws Exception
     */
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        System.out.println("------------channelActive------");
        System.out.println(ctx.channel().remoteAddress() + " 上线了~");
    }
    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        System.out.println("------------channelInactive------");
        System.out.println(ctx.channel().remoteAddress() + " 下线了~");
    }
    /**
     * 读取消息
     * @param channelHandlerContext
     * @param s
     * @throws Exception
     */
    @Override
    protected void channelRead0(ChannelHandlerContext channelHandlerContext, String msg) throws Exception {
        System.out.println("------------channelRead0------");
        Channel channel = channelHandlerContext.channel();
        //将消息转发给其他客户端,并且排除自己
        channelGroup.forEach(e->{
            if (channel!=e){
                //
                e.writeAndFlush("【客戶端】"+channel.remoteAddress()+"说:"+msg);
            }else {
                e.writeAndFlush("【自己】"+"说:"+msg);
            }
        });
    }
    /**
     * 消息读取后
     * @param ctx
     * @throws Exception
     */
    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        System.out.println("------------channelReadComplete------");
    }
    @Override
    public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
        System.out.println("------------channelRegistered------");
    }
    /**
     * 离线生命:
     * @param ctx exceptionCaught-》channelInactive》channelUnregistered》handlerRemoved
     * @throws Exception
     */
    @Override
    public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
        System.out.println("------------channelUnregistered------");
    }
    /**
     * 处理心跳检测
     * @param ctx
     * @param evt
     * @throws Exception
     */
    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
    //    System.out.println("------------userEventTriggered------");
        if (evt instanceof IdleStateEvent){
             //向下转型
            IdleStateEvent event=(IdleStateEvent)evt;
            String eventType=null;
            switch (event.state()){
                case  READER_IDLE:
                    eventType="读空闲";
                    break;
                case WRITER_IDLE:
                    eventType="写空闲";
                    break;
                case  ALL_IDLE:
                    eventType="读写空闲";
                    break;
                default:break;
            }
            System.out.println(ctx.channel().remoteAddress()+"---超时时间---"+eventType);
            System.out.println("服务器做相应操作");
        }
    }
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        System.out.println("------------exceptionCaught------");
    }
}

业务自定义的逻辑处理类:它是入站 ChannelInboundHandler 类型的处理器,负责接收解码后的 HTTP 请求数据,并将请求处理结果写回客户端。当channle通道建立以后,便开启了生命周期。其中 channelRegistered 是用于channnel通道注册,channelActive用于查看通道是否活跃 、channelRead0用于读取客户端信息、exceptionCaught用于异常处理

客户端代码

客户端启动类

import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.Unpooled;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.util.CharsetUtil;
import java.util.Scanner;
/**
* @PackageName: com.netty.demo.chart
* @author: youjp
* @create: 2021-01-22 10:06
* @description: 客户端
* @Version: 1.0
*/
public class NettyChartClient {
    private String ip;
    private int port;
    public NettyChartClient(String ip, int port) {
        this.ip = ip;
        this.port = port;
    }
    /**
     * 客户端启动
     */
    public void run() {
        //创建工作线程池 CPU核数*2
        EventLoopGroup group = new NioEventLoopGroup();
        Bootstrap bootstrap = new Bootstrap();
        try {
            bootstrap.group(group)
                    .channel(NioSocketChannel.class)
                    .option(ChannelOption.SO_KEEPALIVE,true)
                    .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel socketChannel) throws Exception {
                            //得到Pipeline
                            ChannelPipeline pipeline = socketChannel.pipeline();
                            //加入相关handler
                            pipeline.addLast("decoder", new StringDecoder());
                            pipeline.addLast("encoder", new StringEncoder());
                            pipeline.addLast("handler", (ChannelHandler) new MyNettyClientHandler());
                        }
                    });
            System.out.println("---客户端启动了----");
            //连接服务端
            ChannelFuture channelFuture = bootstrap.connect(ip, port).sync();
            Scanner can=new Scanner(System.in);
            while (can.hasNext()){
                String str=can.nextLine();
                channelFuture.channel().writeAndFlush(Unpooled.copiedBuffer(""+str, CharsetUtil.UTF_8));
            }
            //异步关闭通道
            channelFuture.channel().closeFuture().sync();
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            group.shutdownGracefully();
        }
    }
    public static void main(String[] args) {
         NettyChartClient chartClient=new NettyChartClient("127.0.0.1",8888);
         chartClient.run();
    }
}

客户端处理器

import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.util.CharsetUtil;
/**
* @PackageName: com.netty.demo.chart
* @author: youjp
* @create: 2021-01-22 11:16
* @description: 客户端处理器
* @Version: 1.0
*/
public class MyNettyClientHandler  extends SimpleChannelInboundHandler<String> {
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        ctx.writeAndFlush(Unpooled.copiedBuffer("hello, server: (>^ω^<)喵", CharsetUtil.UTF_8));
    }
    @Override
    protected void channelRead0(ChannelHandlerContext channelHandlerContext, String msg) throws Exception {
        System.out.println(""+msg);
    }
}

客户端只需要获取服务端传来的消息,并且可以自己编写消息发送即可。

运行测试

接下来,先将服务端启动类启动,然后再运行多个客户端。查看控制台,并发送消息,查看channel的生命周期

20200401134307494.png

分别在各自的客户端控制台输入信息,测试。基于Netty实现的群聊功能就这样实现了

20200401134307494.png

有兴趣的老爷,可以关注我的公众号【一起收破烂】,回复【006】获取2021最新java面试资料以及简历模型120套哦~

相关文章
|
6月前
|
编解码 缓存 网络协议
Netty核心功能学习
Netty核心功能学习
61 0
|
前端开发 Java 应用服务中间件
SpringBoot整合Netty搭建高性能Websocket服务器(实现聊天功能)
之前使用Springboot整合了websocket,实现了一个后端向前端推送信息的基本小案例,这篇文章主要是增加了一个新的框架就是Netty,实现一个高性能的websocket服务器,并结合前端代码,实现一个基本的聊天功能。你可以根据自己的业务需求进行更改。 这里假设你已经了解了Netty和websocket的相关知识,仅仅是想通过Springboot来整合他们。根据之前大家的需求,代码已经上传到了github上。在文末给出。 废话不多说,直接看步骤代码。
1871 0
SpringBoot整合Netty搭建高性能Websocket服务器(实现聊天功能)
|
开发框架 JavaScript 前端开发
如何使用SpringBoot和Netty实现一个WebSocket服务器,并配合Vue前端实现聊天功能?
如何使用SpringBoot和Netty实现一个WebSocket服务器,并配合Vue前端实现聊天功能?
307 0
|
存储 编解码 网络协议
Netty各组件基本用法、入站和出站详情、群聊系统的实现、粘包和拆包
Netty各组件基本用法、入站和出站详情、群聊系统的实现、粘包和拆包
119 0
|
前端开发 JavaScript
Netty异步NIO框架(二)websocket 前端后端聊天 私聊及群聊
Netty异步NIO框架(二)websocket 前端后端聊天 私聊及群聊
|
前端开发 网络协议 Java
Netty(一)Netty核心功能与线程模型1
Netty(一)Netty核心功能与线程模型
167 0
Netty实战与源码剖析(二)——基于NIO的群聊系统
Netty实战与源码剖析(二)——基于NIO的群聊系统
191 1
Netty(一)Netty核心功能与线程模型2
Netty(一)Netty核心功能与线程模型
72 0
|
存储 fastjson Java
|
存储 Java Go
基于Netty,从零开发IM(三):编码实践篇(群聊功能)
接上两篇《IM系统设计篇》、《编码实践篇(单聊功能)》,本篇主要讲解的是通过实战编码实现IM的群聊功能,内容涉及群聊技术实现原理、编码实践等知识。
220 0
基于Netty,从零开发IM(三):编码实践篇(群聊功能)