5、当执行到 e.pipeline().addLast("handler",new MyWebSocketServerHandler()); 时,触发下面这个函数,这一步是最重要的,因为是自己实现逻辑,文章末尾会补充它的生命周期常见情况
packagecom.wxgj.center.netty; importio.netty.channel.ChannelHandlerContext; importio.netty.channel.SimpleChannelInboundHandler; importio.netty.handler.codec.http.FullHttpRequest; importio.netty.handler.codec.http.websocketx.WebSocketFrame; importio.netty.handler.codec.http.websocketx.WebSocketServerHandshaker; importjava.util.logging.Logger; publicclassMyWebSocketServerHandlerextendsSimpleChannelInboundHandler<Object> { privatestaticfinalLoggerlogger=Logger.getLogger(WebSocketServerHandshaker.class.getName()); privateWebSocketServerHandshakerhandshaker=null; /*** @Author Big Jin* @Description: 读写空闲调用* @Param: [ctx, evt]* @Return: void* @Create: 2019/9/24 10:11*/@OverridepublicvoiduserEventTriggered(ChannelHandlerContextctx, Objectevt) throwsException { //System.out.println("------------------------userEventTriggered------------------------");super.userEventTriggered(ctx, evt); } /*** @Description: 【建立连接时调用】:channel 通道 action 活跃的。当客户端主动连接服务端的连接后,这个通道就是活跃的了。* 也就是客户端与服务端建立了通信通道并且可以传输数据* @Param: [ctx]* @Return: void*/@OverridepublicvoidchannelActive(ChannelHandlerContextctx) throwsException { //System.out.println("------------------------channelActive------------------------");// 添加Global.group.add(ctx.channel()); System.out.println("客户端与服务端连接开启:"+ctx.channel().remoteAddress().toString()); } /*** @Description: 【进入掉线状态时调用】:channel 通道 Inactive 不活跃的。当客户端主动断开服务端的链接后,这个通道就是不活跃的。* 也就是说客户端与服务端关闭了通信通道并且不可以传输数据* @Param: [ctx]* @Return: void*/@OverridepublicvoidchannelInactive(ChannelHandlerContextctx) throwsException { //System.out.println("------------------------channelInactive------------------------");// 移除Global.group.remove(ctx.channel()); Global.removeChannelMapByChannelId(ctx.channel().id()); System.out.println("客户端与服务端连接关闭:"+ctx.channel().remoteAddress().toString()); ctx.channel().close(); } /*** @Description: 【收到消息】:接收客户端发送的消息 channel 通道 Read 读 简而言之就是从通道中读取数据。* 也就是服务端接收客户端发来的数据。但是这个数据在不进行解码时它是ByteBuf类型的* @Param: [ctx, msg]* @Return: void*/@OverrideprotectedvoidmessageReceived(ChannelHandlerContextctx, Objectmsg) throwsException { //System.out.println("------------------------messageReceived------------------------");if (msginstanceofFullHttpRequest) // 第一次HTTP接入 { handshaker=NettyMessageHandler.handleHttpRequest(ctx, ((FullHttpRequest) msg)); } elseif (msginstanceofWebSocketFrame) // 后续WebSocket接入 { NettyMessageHandler.handlerWebSocketFrame(handshaker, ctx, (WebSocketFrame) msg); } } /*** @Description: 【完成时调用】:channel 通道 Read 读取 Complete 完成 在通道读取完成后会在这个方法里通知,对应可以做刷新操作 ctx.flush()* @Param: [ctx]* @Return: void*/@OverridepublicvoidchannelReadComplete(ChannelHandlerContextctx) throwsException { //System.out.println("------------------------channelReadComplete------------------------");ctx.flush(); } /*** @Description: 【异常时调用】:exception 异常 Caught 抓住 抓住异常,当发生异常的时候,可以做一些相应的处理,比如打印日志、关闭链接* @Param: [ctx, cause]* @Return: void*/@OverridepublicvoidexceptionCaught(ChannelHandlerContextctx, Throwablecause) throwsException { //System.out.println("------------------------exceptionCaught------------------------");cause.printStackTrace(); ctx.close(); } /*** @Description:后台读取前端的消息* @Param: [ctx, msg]* @Return: void*/@OverridepublicvoidchannelRead(ChannelHandlerContextctx, Objectmsg) throwsException { //System.out.println("------------------------channelRead------------------------");super.channelRead(ctx, msg); } }
6、此时特别注意其中一个生命周期函数 messageReceived,因为这里面的方法体也是实现自己的业务方法
packagecom.wxgj.center.netty; importcom.alibaba.fastjson.JSON; importcom.wxgj.center.common.Const; importio.netty.buffer.ByteBuf; importio.netty.buffer.Unpooled; importio.netty.channel.ChannelFuture; importio.netty.channel.ChannelFutureListener; importio.netty.channel.ChannelHandlerContext; importio.netty.channel.ChannelId; importio.netty.handler.codec.http.*; importio.netty.handler.codec.http.websocketx.*; importio.netty.util.CharsetUtil; importjava.util.HashMap; importjava.util.Map; publicclassNettyMessageHandler { /*** @Description: WebSocket消息处理器* @Param: [handshaker, ctx, frame]* @Return: void*/publicstaticvoidhandlerWebSocketFrame(WebSocketServerHandshakerhandshaker,ChannelHandlerContextctx, WebSocketFrameframe) { // 判断是否关闭链路的指令if (frameinstanceofCloseWebSocketFrame) { handshaker.close(ctx.channel(), (CloseWebSocketFrame) frame.retain()); return; } // 判断是否ping消息if (frameinstanceofPingWebSocketFrame) { ctx.channel().write(newPongWebSocketFrame(frame.content().retain())); return; } // 本例程仅支持文本消息,不支持二进制消息if (!(frameinstanceofTextWebSocketFrame)) { System.out.println("仅支持文本消息,不支持二进制消息"); thrownewUnsupportedOperationException( String.format("%s frame types not supported", frame.getClass().getName())); } // 返回应答消息Stringrequest= ((TextWebSocketFrame) frame).text(); //System.out.println("服务端收到:" + request);SocketRequestBeansocketRequestBean=JSON.parseObject(request, SocketRequestBean.class); SocketResultBeanresp=newSocketResultBean(); if(socketRequestBean.getOperand() ==Const.CONNECT){ // 建立链接登录、关联Map<ChannelId, Integer>channelIdMap=Global.channelMap.get(socketRequestBean.getUserId());//根据建立连接的用户id获取全部频道mapif(channelIdMap==null) { channelIdMap=newHashMap<>(); } channelIdMap.put(ctx.channel().id(),0); //将新建立的频道放入对应map中。 0: 标记是否存在该对象Global.channelMap.put(socketRequestBean.getUserId(), channelIdMap); resp.setStatus(1); // 成功 } elseif(socketRequestBean.getOperand() ==Const.HEART){ // 心跳resp.setStatus(1); } else{ resp.setStatus(0); resp.setMsg("无效的操作数"); } // 返回应答消息Stringresult=JSON.toJSONString(resp); TextWebSocketFrametws=newTextWebSocketFrame(result); // 私发 处理完成后回传消息给请求通道的用户ctx.channel().writeAndFlush(tws); // 群发// Global.group.writeAndFlush(tws); } /*** @Description: HTTP消息处理器* @Param: [handshaker, ctx, req]* @Return: void*/publicstaticWebSocketServerHandshakerhandleHttpRequest(ChannelHandlerContextctx, FullHttpRequestreq) { // 如果HTTP解码失败,返回HTTP异常// 第一次建立连接,协议规定,不是自定的if (!req.getDecoderResult().isSuccess() || (!"websocket".equals(req.headers().get("Upgrade")))) { sendHttpResponse(ctx, req, newDefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.BAD_REQUEST)); returnnull; } // 获取url后置参数/*HttpMethod method=req.getMethod();String uri=req.getUri();QueryStringDecoder queryStringDecoder = new QueryStringDecoder(uri);Map<String, List<String>> parameters = queryStringDecoder.parameters();System.out.println(parameters.get("request").get(0));if(method==HttpMethod.GET && "/webssss".equals(uri)){//....处理ctx.attr(AttributeKey.valueOf("type")).set("anzhuo");}else if(method==HttpMethod.GET&&"/websocket".equals(uri)){//...处理ctx.attr(AttributeKey.valueOf("type")).set("live");}*/// 构造握手响应返回WebSocketServerHandshakerFactorywsFactory=newWebSocketServerHandshakerFactory( "ws://"+req.headers().get(HttpHeaders.Names.HOST) +Const.WEBSOCKET_URL, null, false); WebSocketServerHandshakerhandshaker=wsFactory.newHandshaker(req); if (handshaker==null) { WebSocketServerHandshakerFactory.sendUnsupportedWebSocketVersionResponse(ctx.channel()); } else { handshaker.handshake(ctx.channel(), req); } returnhandshaker; } /*** @Description: 返回应答给客户端* @Param: [ctx, req, res]* @Return: void*/privatestaticvoidsendHttpResponse(ChannelHandlerContextctx, FullHttpRequestreq, DefaultFullHttpResponseres) { if (res.getStatus().code() !=200) { ByteBufbuf=Unpooled.copiedBuffer(res.getStatus().toString(), CharsetUtil.UTF_8); res.content().writeBytes(buf); // 请求已经返回前端buf.release(); } // 如果是非Keep-Alive,关闭连接ChannelFuturef=ctx.channel().writeAndFlush(res); if (!HttpHeaders.isKeepAlive(req) ||res.getStatus().code() !=200) { f.addListener(ChannelFutureListener.CLOSE); } } }
7、接下来是 Controller 层的应用情景
packagecom.wxgj.center.web; importcom.alibaba.fastjson.JSONObject; importcom.github.pagehelper.util.StringUtil; importcom.luxsuen.requestjson.annotation.RequestJson; importcom.wxgj.center.common.ServerResponse; importcom.wxgj.center.netty.NettyUtil; importorg.slf4j.Logger; importorg.slf4j.LoggerFactory; importorg.springframework.stereotype.Controller; importorg.springframework.web.bind.annotation.RequestMapping; importorg.springframework.web.bind.annotation.RequestMethod; importorg.springframework.web.bind.annotation.ResponseBody; importjava.net.URLDecoder; importjava.net.URLEncoder; importjava.util.List; @Controller@RequestMapping("/ws/") publicclassWebsocketController { privatefinalLoggerlogger=LoggerFactory.getLogger(this.getClass()); //type 1-家长申请数量变更通知/*** @Author Big Jin* @Description: 单发ws消息* @Param: [receiverUserId, msg]* @Return: com.wxgj.center.common.ServerResponse* @Create: 2019/9/9 14:14*/@RequestMapping(value="sendMsgOneWithChannel", method=RequestMethod.POST) @ResponseBodypublicServerResponsesendMsgOneWithChannel( @RequestJson(value="receiverUserId") StringreceiverUserId, @RequestJson(value="msg") Stringmsg, @RequestJson(value="type") Integertype, @RequestJson(value="data") Stringdata){ try { if(StringUtil.isEmpty(data)) { NettyUtil.sendMsgOneWithChannelByMsg(receiverUserId, URLDecoder.decode(msg,"UTF-8")); }else{ JSONObjectjo=newJSONObject(); jo.put("type",type); jo.put("data", JSONObject.parse(data)); NettyUtil.sendMsgOneWithChannelByMsgData(receiverUserId, URLDecoder.decode(msg,"UTF-8"),jo.toJSONString()); } returnServerResponse.createSuccByMsg("send Success"); }catch (Exceptione){ returnServerResponse.createErrorByMsg(e.getMessage()); } } /*** @Author Big Jin* @Description: 群发ws消息* @Param: [receiverUserIdList, msg]* @Return: com.wxgj.center.common.ServerResponse* @Create: 2019/9/9 14:15*/@RequestMapping(value="sendMsgListWithChannel", method=RequestMethod.POST) @ResponseBodypublicServerResponsesendMsgListWithChannel( @RequestJson(value="receiverUserIdList") List<String>receiverUserIdList, @RequestJson(value="msg") Stringmsg, @RequestJson(value="type") Integertype, @RequestJson(value="data") Stringdata){ try { if(StringUtil.isEmpty(data)) { NettyUtil.sendMsgListWithChannelByMsg(receiverUserIdList, URLDecoder.decode(msg,"UTF-8")); }else{ JSONObjectjo=newJSONObject(); jo.put("type",type); jo.put("data", JSONObject.parse(data)); NettyUtil.sendMsgListWithChannelByMsgData(receiverUserIdList, URLDecoder.decode(msg,"UTF-8"),jo.toString()); } returnServerResponse.createSuccByMsg("send Success"); }catch (Exceptione){ returnServerResponse.createErrorByMsg(e.getMessage()); } } }
Ps:比如,有新的数据进入数据库,那么在进入之后,用 HTTPClient doPost 在后台直接触发上面这个 Restful URI 即可
附:生命周期常见情况
接入:
channelActive——接入
channelRead——读取
messageReceived——处理(握手)
channelReadComplete——读取完毕
发送:
channelRead——读取
messageReceived——处理(逻辑)
channelReadComplete——读取完毕
关闭:
channelRead——读取
messageReceived——处理(关闭请求)
channelReadComplete——读取完毕
channelInactive——掉线
异常(逻辑处理异常):
channelRead——读取
messageReceived——处理(逻辑)
exceptionCaught——异常
channelReadComplete——读取完毕
channelInactive——掉线
异常(读取异常):
channelRead——读取
exceptionCaught——异常
channelReadComplete——读取完毕
channelInactive——掉线
异常(读取异常+掉线处理异常):
channelRead——读取
exceptionCaught——异常(读取)
channelReadComplete——读取完毕
channelInactive——掉线
exceptionCaught——异常(掉线处理)
异常(读取异常+异常处理异常):
channelRead——读取
exceptionCaught——异常(读取)
(插入了一条异常处理自己的异常)
channelReadComplete——读取完毕
channelInactive——掉线