前端篇:https://lux-sun.blog.csdn.net/article/details/101354215
后端篇
本文思路是按顺序进行的,必要说明的地方会唠嗑几句。
0、辅助类
packagecom.wxgj.center.common; publicclassConst { publicstaticfinalByteOP_SUCCESS=1; // NettypublicstaticfinalIntegerINET_PORT=8888; publicstaticfinalIntegerCONNECT=1; publicstaticfinalIntegerHEART=2; publicstaticfinalStringWEBSOCKET_URL="/websocket"; }
packagecom.wxgj.center.netty; importio.netty.channel.ChannelId; importio.netty.channel.group.ChannelGroup; importio.netty.channel.group.DefaultChannelGroup; importio.netty.util.concurrent.GlobalEventExecutor; importjava.util.HashMap; importjava.util.Map; publicclassGlobal { // 存储类,以下类是用来存储访问的channle,channelGroup的原型是set集合,保证channle的唯一,如需根据参数标注存储,可以使用currentHashMap来存储publicstaticChannelGroupgroup=newDefaultChannelGroup(GlobalEventExecutor.INSTANCE); // 用户userId绑定用户自己的频道IDpublicstaticMap<String, Map<ChannelId,Integer>>channelMap=newHashMap<>(); /*** @Author Big Jin* @Description: 删除失效的频道* @Param: [c]* @Return: void* @Create: 2019/7/26 17:04*/publicstaticvoidremoveChannelMapByChannelId(ChannelIdc){ for (Map.Entry<String, Map<ChannelId,Integer>>entry:channelMap.entrySet()){ Map<ChannelId,Integer>channelIdMap=entry.getValue(); if(channelIdMap.get(c)!=null){ channelIdMap.remove(c); if(channelIdMap.size()==0){ channelMap.remove(entry.getKey()); } return; } } } }
packagecom.wxgj.center.netty; publicclassSocketRequestBean { privateIntegeroperand; privateStringuserId; publicSocketRequestBean() { } // ... set / get}
packagecom.wxgj.center.netty; publicclassSocketResultBean { privateIntegerstatus; privateStringmsg; privateStringdata; publicSocketResultBean() { } // ... set / get}
packagecom.wxgj.center.netty; importcom.alibaba.fastjson.JSON; importio.netty.channel.Channel; importio.netty.channel.ChannelId; importio.netty.channel.group.ChannelGroup; importio.netty.channel.group.DefaultChannelGroup; importio.netty.handler.codec.http.websocketx.TextWebSocketFrame; importio.netty.util.concurrent.GlobalEventExecutor; importjava.util.ArrayList; importjava.util.List; importjava.util.Map; publicclassNettyUtil { /*** @Author Big Jin* @Description: 发送单个用户消息带留言* @Param: [receiverUserId, msg]* @Return: void* @Create: 2019/4/18 17:33*/publicstaticvoidsendMsgOneWithChannelByMsg(StringreceiverUserId, Stringmsg) { SocketResultBeanresult=newSocketResultBean(); result.setMsg(msg); sendOne(receiverUserId, result); } /*** @Author Big Jin* @Description: 发送多个用户消息带留言* @Param: [receiverUserIdList, msg]* @Return: void* @Create: 2019/4/18 17:33*/publicstaticvoidsendMsgListWithChannelByMsg(List<String>receiverUserIdList, Stringmsg) { SocketResultBeanresult=newSocketResultBean(); result.setMsg(msg); sendList(receiverUserIdList, result); } /*** @Author Big Jin* @Description: 发送单个用户消息带留言和数据* @Param: [receiverUserId, msg, data]* @Return: void* @Create: 2019/7/26 16:34*/publicstaticvoidsendMsgOneWithChannelByMsgData(StringreceiverUserId, Stringmsg, Stringdata) { SocketResultBeanresult=newSocketResultBean(); result.setMsg(msg); result.setData(data); sendOne(receiverUserId, result); } /*** @Author Big Jin* @Description: 发送多个用户消息带留言和数据* @Param: [receiverUserIdList, msg, data]* @Return: void* @Create: 2019/7/26 16:34*/publicstaticvoidsendMsgListWithChannelByMsgData(List<String>receiverUserIdList, Stringmsg, Stringdata) { SocketResultBeanresult=newSocketResultBean(); result.setMsg(msg); result.setData(data); sendList(receiverUserIdList, result); } /*** @Author Big Jin* @Description: 发送单个用户* @Param: [receiverUserId, msgResp]* @Return: void* @Create: 2019/4/18 17:34*/publicstaticvoidsendOne(StringreceiverUserId, SocketResultBeanresult) { if(Global.channelMap.get(receiverUserId) !=null) { Map<ChannelId, Integer>channelIdMap=Global.channelMap.get(receiverUserId); if(channelIdMap!=null&&channelIdMap.size()>0) { List<Channel>channelList=newArrayList<>(); for (ChannelIdchannelId:channelIdMap.keySet()) { Channelch=Global.group.find(channelId); if(ch!=null) { channelList.add(ch); } } send(channelList,result); } } } /*** @Author Big Jin* @Description: 发送多个用户* @Param: [receiverUserIdList, msgResp]* @Return: void* @Create: 2019/4/18 17:34*/publicstaticvoidsendList(List<String>receiverUserIdList, SocketResultBeanresult) { List<Channel>channelList=newArrayList<>(); for(inti=0; i<receiverUserIdList.size(); i++){ StringreceiverUserId=receiverUserIdList.get(i); if(Global.channelMap.get(receiverUserId) !=null) { Map<ChannelId, Integer>channelIdMap=Global.channelMap.get(receiverUserId); if(channelIdMap!=null&&channelIdMap.size()>0) { for (ChannelIdchannelId:channelIdMap.keySet()) { Channelch=Global.group.find(channelId); if(ch!=null) { channelList.add(ch); } } } } } send(channelList,result); } /*** @Author Big Jin* @Description: 发送消息* @Param: [receiverUserId, result]* @Return: void* @Create: 2019/7/26 16:38*/privatestaticvoidsend(List<Channel>channelList, SocketResultBeanresult) { result.setStatus(2); // 业务逻辑处理StringresultStr=JSON.toJSONString(result); TextWebSocketFrametws=newTextWebSocketFrame(resultStr); if(channelList!=null&&channelList.size()>0) { ChannelGroupgroup=newDefaultChannelGroup(GlobalEventExecutor.INSTANCE);//群发对象group.addAll(channelList); group.writeAndFlush(tws); } } }
1、项目启动时,启动 Netty 监听器
// web.xml<listener><listener-class>com.wxgj.center.listener.NettyListener</listener-class></listener>
packagecom.wxgj.center.listener; importcom.wxgj.center.netty.NettyServer; importjavax.servlet.ServletContextEvent; importjavax.servlet.ServletContextListener; publicclassNettyListenerimplementsServletContextListener { @OverridepublicvoidcontextInitialized(ServletContextEventservletContextEvent) { NettyServern=newNettyServer(); n.initNetty(); } @OverridepublicvoidcontextDestroyed(ServletContextEventservletContextEvent) { } }
2、启动监听器后,就会调用 initNetty 方法
packagecom.wxgj.center.netty; importcom.wxgj.center.common.Const; importio.netty.bootstrap.ServerBootstrap; importio.netty.channel.Channel; importio.netty.channel.EventLoopGroup; importio.netty.channel.nio.NioEventLoopGroup; importio.netty.channel.socket.nio.NioServerSocketChannel; publicclassNettyServer { publicvoidinitNetty() { newThread() { publicvoidrun() { newNettyServer().run(); } }.start(); } /*** @Description: NioEventLoopGroup 是用来处理I/O操作的多线程事件循环器,* Netty提供了许多不同的EventLoopGroup的实现用来处理不同传输协议。 在这个例子中我们实现了一个服务端的应用,* 因此会有2个NioEventLoopGroup会被使用。 第一个经常被叫做‘boss’,用来接收进来的连接。* 第二个经常被叫做‘worker’,用来处理已经被接收的连接, 一旦‘boss’接收到连接,就会把连接信息注册到‘worker’上。* 如何知道多少个线程已经被使用,如何映射到已经创建的Channels上都需要依赖于EventLoopGroup的实现,* 并且可以通过构造函数来配置他们的关系。* @Param: []* @Return: void*/publicvoidrun() { System.out.println("=======Netty端口启动======="); // Boss线程:由这个线程池提供的线程是boss种类的,用于创建、连接、绑定socket, (有点像门卫)然后把这些socket传给worker线程池// 在服务器端每个监听的socket都有一个boss线程来处理。在客户端,只有一个boss线程来处理所有的socketEventLoopGroupbossGroup=newNioEventLoopGroup(); // Worker线程:Worker线程执行所有的异步I/O,即处理操作EventLoopGroupworkGroup=newNioEventLoopGroup(); try { // ServerBootstrap 启动NIO服务的辅助启动类,负责初始话netty服务器,并且开始监听端口的socket请求ServerBootstrapb=newServerBootstrap(); // 这一步是必须的,如果没有设置group将会报java.lang.IllegalStateException: group not set异常b.group(bossGroup, workGroup); // 设置非阻塞,用它来建立新accept的连接,用于构造serversocketchannel的工厂类b.channel(NioServerSocketChannel.class); // ChildChannelHandler 对出入的数据进行的业务操作,其继承ChannelInitializerb.childHandler(newChildChannelHandler()); System.out.println("服务端开启等待客户端连接..."); // 绑定端口并启动去接收进来的连接Channelch=b.bind(Const.INET_PORT).sync().channel(); // 这里会一直等待,直到socket被关闭ch.closeFuture().sync(); } catch (Exceptione) { e.printStackTrace(); } finally { // 关闭bossGroup.shutdownGracefully(); workGroup.shutdownGracefully(); } } }
3、当执行到 b.childHandler(new ChildChannelHandler()); 时候,触发该类,配置一些必要的参数
packagecom.wxgj.center.netty; importio.netty.channel.ChannelInitializer; importio.netty.channel.socket.SocketChannel; importio.netty.handler.codec.http.HttpObjectAggregator; importio.netty.handler.codec.http.HttpServerCodec; importio.netty.handler.stream.ChunkedWriteHandler; importio.netty.handler.timeout.IdleStateHandler; publicclassChildChannelHandlerextendsChannelInitializer<SocketChannel> { /*** @Description: 设置频道特性* @Param: [e]* @Return: void*/@OverrideprotectedvoidinitChannel(SocketChannele) throwsException { // 设置300秒没有读到数据,则触发一个READER_IDLE事件。e.pipeline().addLast(newIdleStateHandler(300, 0, 0)); // HttpServerCodec:将请求和应答消息解码为HTTP消息e.pipeline().addLast("http-codec",newHttpServerCodec()); // HttpObjectAggregator:将HTTP消息的多个部分合成一条完整的HTTP消息e.pipeline().addLast("aggregator",newHttpObjectAggregator(65536)); // ChunkedWriteHandler:向客户端发送HTML5文件e.pipeline().addLast("http-chunked",newChunkedWriteHandler()); // 在管道中添加我们自己的接收数据实现方法e.pipeline().addLast("handler",newMyWebSocketServerHandler()); } }
4、当执行到 e.pipeline().addLast(new IdleStateHandler(300, 0, 0)); 时,过了 300s 没有读取到数据,则触发下面这个函数 userEventTriggered
packagecom.wxgj.center.netty; importio.netty.channel.ChannelHandlerContext; importio.netty.channel.SimpleChannelInboundHandler; importio.netty.handler.codec.http.FullHttpRequest; importio.netty.handler.codec.http.websocketx.WebSocketFrame; importio.netty.handler.codec.http.websocketx.WebSocketServerHandshaker; importjava.util.logging.Logger; publicclassMyWebSocketServerHandlerextendsSimpleChannelInboundHandler<Object> { privatestaticfinalLoggerlogger=Logger.getLogger(WebSocketServerHandshaker.class.getName()); privateWebSocketServerHandshakerhandshaker=null; /*** @Author Big Jin* @Description: 读写空闲调用* @Param: [ctx, evt]* @Return: void* @Create: 2019/9/24 10:11*/@OverridepublicvoiduserEventTriggered(ChannelHandlerContextctx, Objectevt) throwsException { //System.out.println("------------------------userEventTriggered------------------------");super.userEventTriggered(ctx, evt); } // ... 下面再补充完整}