Netty - 从启蒙到搬砖(完整应用篇)(后端篇)(下)

简介: Netty - 从启蒙到搬砖(完整应用篇)(后端篇)(下)

5、当执行到 e.pipeline().addLast("handler",new MyWebSocketServerHandler()); 时,触发下面这个函数,这一步是最重要的,因为是自己实现逻辑,文章末尾会补充它的生命周期常见情况

packagecom.wxgj.center.netty;
importio.netty.channel.ChannelHandlerContext;
importio.netty.channel.SimpleChannelInboundHandler;
importio.netty.handler.codec.http.FullHttpRequest;
importio.netty.handler.codec.http.websocketx.WebSocketFrame;
importio.netty.handler.codec.http.websocketx.WebSocketServerHandshaker;
importjava.util.logging.Logger;
publicclassMyWebSocketServerHandlerextendsSimpleChannelInboundHandler<Object> {
privatestaticfinalLoggerlogger=Logger.getLogger(WebSocketServerHandshaker.class.getName());
privateWebSocketServerHandshakerhandshaker=null;
/*** @Author Big Jin* @Description: 读写空闲调用* @Param: [ctx, evt]* @Return: void* @Create: 2019/9/24 10:11*/@OverridepublicvoiduserEventTriggered(ChannelHandlerContextctx, Objectevt) throwsException {
//System.out.println("------------------------userEventTriggered------------------------");super.userEventTriggered(ctx, evt);
    }
/*** @Description: 【建立连接时调用】:channel 通道 action 活跃的。当客户端主动连接服务端的连接后,这个通道就是活跃的了。*               也就是客户端与服务端建立了通信通道并且可以传输数据* @Param: [ctx]* @Return: void*/@OverridepublicvoidchannelActive(ChannelHandlerContextctx) throwsException {
//System.out.println("------------------------channelActive------------------------");// 添加Global.group.add(ctx.channel());
System.out.println("客户端与服务端连接开启:"+ctx.channel().remoteAddress().toString());
    }
/*** @Description: 【进入掉线状态时调用】:channel 通道 Inactive 不活跃的。当客户端主动断开服务端的链接后,这个通道就是不活跃的。*               也就是说客户端与服务端关闭了通信通道并且不可以传输数据* @Param: [ctx]* @Return: void*/@OverridepublicvoidchannelInactive(ChannelHandlerContextctx) throwsException {
//System.out.println("------------------------channelInactive------------------------");// 移除Global.group.remove(ctx.channel());
Global.removeChannelMapByChannelId(ctx.channel().id());
System.out.println("客户端与服务端连接关闭:"+ctx.channel().remoteAddress().toString());
ctx.channel().close();
    }
/*** @Description: 【收到消息】:接收客户端发送的消息 channel 通道 Read 读 简而言之就是从通道中读取数据。*               也就是服务端接收客户端发来的数据。但是这个数据在不进行解码时它是ByteBuf类型的* @Param: [ctx, msg]* @Return: void*/@OverrideprotectedvoidmessageReceived(ChannelHandlerContextctx, Objectmsg) throwsException {
//System.out.println("------------------------messageReceived------------------------");if (msginstanceofFullHttpRequest) // 第一次HTTP接入        {
handshaker=NettyMessageHandler.handleHttpRequest(ctx, ((FullHttpRequest) msg));
        }
elseif (msginstanceofWebSocketFrame) // 后续WebSocket接入        {
NettyMessageHandler.handlerWebSocketFrame(handshaker, ctx, (WebSocketFrame) msg);
        }
    }
/*** @Description: 【完成时调用】:channel 通道 Read 读取 Complete 完成 在通道读取完成后会在这个方法里通知,对应可以做刷新操作 ctx.flush()* @Param: [ctx]* @Return: void*/@OverridepublicvoidchannelReadComplete(ChannelHandlerContextctx) throwsException {
//System.out.println("------------------------channelReadComplete------------------------");ctx.flush();
    }
/*** @Description: 【异常时调用】:exception 异常 Caught 抓住 抓住异常,当发生异常的时候,可以做一些相应的处理,比如打印日志、关闭链接* @Param: [ctx, cause]* @Return: void*/@OverridepublicvoidexceptionCaught(ChannelHandlerContextctx, Throwablecause) throwsException {
//System.out.println("------------------------exceptionCaught------------------------");cause.printStackTrace();
ctx.close();
    }
/*** @Description:后台读取前端的消息* @Param: [ctx, msg]* @Return: void*/@OverridepublicvoidchannelRead(ChannelHandlerContextctx, Objectmsg) throwsException {
//System.out.println("------------------------channelRead------------------------");super.channelRead(ctx, msg);
    }
}

6、此时特别注意其中一个生命周期函数 messageReceived,因为这里面的方法体也是实现自己的业务方法

packagecom.wxgj.center.netty;
importcom.alibaba.fastjson.JSON;
importcom.wxgj.center.common.Const;
importio.netty.buffer.ByteBuf;
importio.netty.buffer.Unpooled;
importio.netty.channel.ChannelFuture;
importio.netty.channel.ChannelFutureListener;
importio.netty.channel.ChannelHandlerContext;
importio.netty.channel.ChannelId;
importio.netty.handler.codec.http.*;
importio.netty.handler.codec.http.websocketx.*;
importio.netty.util.CharsetUtil;
importjava.util.HashMap;
importjava.util.Map;
publicclassNettyMessageHandler {
/*** @Description: WebSocket消息处理器* @Param: [handshaker, ctx, frame]* @Return: void*/publicstaticvoidhandlerWebSocketFrame(WebSocketServerHandshakerhandshaker,ChannelHandlerContextctx, WebSocketFrameframe) {
// 判断是否关闭链路的指令if (frameinstanceofCloseWebSocketFrame) {
handshaker.close(ctx.channel(), (CloseWebSocketFrame) frame.retain());
return;
        }
// 判断是否ping消息if (frameinstanceofPingWebSocketFrame) {
ctx.channel().write(newPongWebSocketFrame(frame.content().retain()));
return;
        }
// 本例程仅支持文本消息,不支持二进制消息if (!(frameinstanceofTextWebSocketFrame)) {
System.out.println("仅支持文本消息,不支持二进制消息");
thrownewUnsupportedOperationException(
String.format("%s frame types not supported", frame.getClass().getName()));
        }
// 返回应答消息Stringrequest= ((TextWebSocketFrame) frame).text();
//System.out.println("服务端收到:" + request);SocketRequestBeansocketRequestBean=JSON.parseObject(request, SocketRequestBean.class);
SocketResultBeanresp=newSocketResultBean();
if(socketRequestBean.getOperand() ==Const.CONNECT){
// 建立链接登录、关联Map<ChannelId, Integer>channelIdMap=Global.channelMap.get(socketRequestBean.getUserId());//根据建立连接的用户id获取全部频道mapif(channelIdMap==null) {
channelIdMap=newHashMap<>();
            }
channelIdMap.put(ctx.channel().id(),0); //将新建立的频道放入对应map中。 0: 标记是否存在该对象Global.channelMap.put(socketRequestBean.getUserId(), channelIdMap);
resp.setStatus(1); // 成功        }
elseif(socketRequestBean.getOperand() ==Const.HEART){
// 心跳resp.setStatus(1);
        }
else{
resp.setStatus(0);
resp.setMsg("无效的操作数");
        }
// 返回应答消息Stringresult=JSON.toJSONString(resp);
TextWebSocketFrametws=newTextWebSocketFrame(result);
// 私发 处理完成后回传消息给请求通道的用户ctx.channel().writeAndFlush(tws);
// 群发//        Global.group.writeAndFlush(tws);    }
/*** @Description: HTTP消息处理器* @Param: [handshaker, ctx, req]* @Return: void*/publicstaticWebSocketServerHandshakerhandleHttpRequest(ChannelHandlerContextctx, FullHttpRequestreq) {
// 如果HTTP解码失败,返回HTTP异常// 第一次建立连接,协议规定,不是自定的if (!req.getDecoderResult().isSuccess() || (!"websocket".equals(req.headers().get("Upgrade")))) {
sendHttpResponse(ctx, req,
newDefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.BAD_REQUEST));
returnnull;
        }
// 获取url后置参数/*HttpMethod method=req.getMethod();String uri=req.getUri();QueryStringDecoder queryStringDecoder = new QueryStringDecoder(uri);Map<String, List<String>> parameters = queryStringDecoder.parameters();System.out.println(parameters.get("request").get(0));if(method==HttpMethod.GET && "/webssss".equals(uri)){//....处理ctx.attr(AttributeKey.valueOf("type")).set("anzhuo");}else if(method==HttpMethod.GET&&"/websocket".equals(uri)){//...处理ctx.attr(AttributeKey.valueOf("type")).set("live");}*/// 构造握手响应返回WebSocketServerHandshakerFactorywsFactory=newWebSocketServerHandshakerFactory(
"ws://"+req.headers().get(HttpHeaders.Names.HOST) +Const.WEBSOCKET_URL, null, false);
WebSocketServerHandshakerhandshaker=wsFactory.newHandshaker(req);
if (handshaker==null) {
WebSocketServerHandshakerFactory.sendUnsupportedWebSocketVersionResponse(ctx.channel());
        }
else {
handshaker.handshake(ctx.channel(), req);
        }
returnhandshaker;
    }
/*** @Description: 返回应答给客户端* @Param: [ctx, req, res]* @Return: void*/privatestaticvoidsendHttpResponse(ChannelHandlerContextctx, FullHttpRequestreq, DefaultFullHttpResponseres) {
if (res.getStatus().code() !=200) {
ByteBufbuf=Unpooled.copiedBuffer(res.getStatus().toString(), CharsetUtil.UTF_8);
res.content().writeBytes(buf); // 请求已经返回前端buf.release();
        }
// 如果是非Keep-Alive,关闭连接ChannelFuturef=ctx.channel().writeAndFlush(res);
if (!HttpHeaders.isKeepAlive(req) ||res.getStatus().code() !=200) {
f.addListener(ChannelFutureListener.CLOSE);
        }
    }
}

7、接下来是 Controller 层的应用情景

packagecom.wxgj.center.web;
importcom.alibaba.fastjson.JSONObject;
importcom.github.pagehelper.util.StringUtil;
importcom.luxsuen.requestjson.annotation.RequestJson;
importcom.wxgj.center.common.ServerResponse;
importcom.wxgj.center.netty.NettyUtil;
importorg.slf4j.Logger;
importorg.slf4j.LoggerFactory;
importorg.springframework.stereotype.Controller;
importorg.springframework.web.bind.annotation.RequestMapping;
importorg.springframework.web.bind.annotation.RequestMethod;
importorg.springframework.web.bind.annotation.ResponseBody;
importjava.net.URLDecoder;
importjava.net.URLEncoder;
importjava.util.List;
@Controller@RequestMapping("/ws/")
publicclassWebsocketController {
privatefinalLoggerlogger=LoggerFactory.getLogger(this.getClass());
//type 1-家长申请数量变更通知/*** @Author Big Jin* @Description: 单发ws消息* @Param: [receiverUserId, msg]* @Return: com.wxgj.center.common.ServerResponse* @Create: 2019/9/9 14:14*/@RequestMapping(value="sendMsgOneWithChannel", method=RequestMethod.POST)
@ResponseBodypublicServerResponsesendMsgOneWithChannel(
@RequestJson(value="receiverUserId") StringreceiverUserId,
@RequestJson(value="msg") Stringmsg,
@RequestJson(value="type") Integertype,
@RequestJson(value="data") Stringdata){
try {
if(StringUtil.isEmpty(data)) {
NettyUtil.sendMsgOneWithChannelByMsg(receiverUserId, URLDecoder.decode(msg,"UTF-8"));
            }else{
JSONObjectjo=newJSONObject();
jo.put("type",type);
jo.put("data", JSONObject.parse(data));
NettyUtil.sendMsgOneWithChannelByMsgData(receiverUserId, URLDecoder.decode(msg,"UTF-8"),jo.toJSONString());
            }
returnServerResponse.createSuccByMsg("send Success");
        }catch (Exceptione){
returnServerResponse.createErrorByMsg(e.getMessage());
        }
    }
/*** @Author Big Jin* @Description: 群发ws消息* @Param: [receiverUserIdList, msg]* @Return: com.wxgj.center.common.ServerResponse* @Create: 2019/9/9 14:15*/@RequestMapping(value="sendMsgListWithChannel", method=RequestMethod.POST)
@ResponseBodypublicServerResponsesendMsgListWithChannel(
@RequestJson(value="receiverUserIdList") List<String>receiverUserIdList,
@RequestJson(value="msg") Stringmsg,
@RequestJson(value="type") Integertype,
@RequestJson(value="data") Stringdata){
try {
if(StringUtil.isEmpty(data)) {
NettyUtil.sendMsgListWithChannelByMsg(receiverUserIdList, URLDecoder.decode(msg,"UTF-8"));
            }else{
JSONObjectjo=newJSONObject();
jo.put("type",type);
jo.put("data", JSONObject.parse(data));
NettyUtil.sendMsgListWithChannelByMsgData(receiverUserIdList, URLDecoder.decode(msg,"UTF-8"),jo.toString());
            }
returnServerResponse.createSuccByMsg("send Success");
        }catch (Exceptione){
returnServerResponse.createErrorByMsg(e.getMessage());
        }
    }
}

Ps:比如,有新的数据进入数据库,那么在进入之后,用 HTTPClient doPost 在后台直接触发上面这个 Restful URI 即可

 

附:生命周期常见情况

接入:
channelActive——接入
channelRead——读取
messageReceived——处理(握手)
channelReadComplete——读取完毕



发送:

channelRead——读取

messageReceived——处理(逻辑)

channelReadComplete——读取完毕



关闭:

channelRead——读取

messageReceived——处理(关闭请求)

channelReadComplete——读取完毕

channelInactive——掉线



异常(逻辑处理异常):

channelRead——读取

messageReceived——处理(逻辑)

exceptionCaught——异常

channelReadComplete——读取完毕

channelInactive——掉线



异常(读取异常):

channelRead——读取

exceptionCaught——异常

channelReadComplete——读取完毕

channelInactive——掉线



异常(读取异常+掉线处理异常):

channelRead——读取

exceptionCaught——异常(读取)

channelReadComplete——读取完毕

channelInactive——掉线

exceptionCaught——异常(掉线处理)



异常(读取异常+异常处理异常):

channelRead——读取

exceptionCaught——异常(读取)

(插入了一条异常处理自己的异常)

channelReadComplete——读取完毕

channelInactive——掉线



目录
相关文章
|
11月前
|
Java 关系型数据库 API
探索后端技术:构建高效、可靠的服务器端应用
在当今数字化时代,后端技术是任何成功应用程序的基石。它涉及服务器、数据库和应用程序之间的交互,处理数据存储、业务逻辑和系统性能等关键任务。本文将深入探讨后端开发的核心概念、常见技术栈及其实际应用,帮助读者更好地理解和掌握构建高效、可靠后端系统的技巧与策略。
|
11月前
|
监控 中间件 Java
后端技术:构建高效、稳定的服务器端应用
【10月更文挑战第5天】后端技术:构建高效、稳定的服务器端应用
344 0
|
11月前
|
监控 关系型数据库 Serverless
探索后端技术:构建高效、可靠的服务器端应用
本文将深入探讨后端开发的核心概念和关键技术,从服务器架构到数据库管理,再到安全防护,为读者提供全面的后端技术指南。无论是初学者还是经验丰富的开发者,都能从中汲取灵感,提升自己的技术水平。
|
11月前
|
JavaScript 前端开发 API
探索后端技术:Node.js的优势和实际应用
【10月更文挑战第6天】 在当今数字化时代,后端开发是任何成功软件应用的关键组成部分。本文将深入探讨一种流行的后端技术——Node.js,通过分析其核心优势和实际应用案例,揭示其在现代软件开发中的重要性和潜力。
538 2
|
11月前
|
设计模式 算法 搜索推荐
后端开发中的设计模式应用与实践
在软件开发的广袤天地中,后端技术如同构筑高楼大厦的钢筋水泥,支撑起整个应用程序的骨架。本文旨在通过深入浅出的方式,探讨后端开发领域内不可或缺的设计模式,这些模式犹如精雕细琢的工具箱,能够助力开发者打造出既健壮又灵活的系统架构。从单例模式到工厂模式,从观察者模式到策略模式,每一种设计模式都蕴含着深刻的哲理与实践价值,它们不仅仅是代码的组织方式,更是解决复杂问题的智慧结晶。
|
11月前
|
算法 安全 关系型数据库
后端技术在现代软件开发中的重要性与应用
本文将深入探讨后端技术在现代软件开发中的关键作用及其广泛应用。我们将从后端开发的基本概念入手,逐步解析其在构建高性能、可扩展和安全的软件系统中的核心地位。通过具体案例,展示不同后端技术如何满足各种复杂业务需求,从而帮助企业实现数字化转型。最后,文章还将探讨未来后端技术的发展趋势,为开发者提供前瞻性的指导。
|
12月前
|
前端开发 JavaScript API
后端技术在现代软件开发中的应用与挑战
本文将深入探讨后端技术在当前软件开发中的重要性及其面临的主要挑战。通过分析后端技术的发展脉络,揭示其在数据处理、业务逻辑和系统安全等方面的关键作用。同时,本文还将讨论如何在快速变化的技术环境中保持后端技术的先进性和竞争力。
160 6
|
12月前
|
设计模式 算法 搜索推荐
后端开发中的设计模式应用
在软件开发的浩瀚海洋中,设计模式犹如一座座灯塔,为后端开发者指引方向。本文将深入探讨后端开发中常见的设计模式,并通过实例展示如何在实际项目中巧妙应用这些模式,以提升代码的可维护性、扩展性和复用性。通过阅读本文,您将能够更加自信地应对复杂后端系统的设计与实现挑战。
189 3
|
11月前
|
存储 安全 关系型数据库
后端技术深度剖析:构建高效稳定的企业级应用
【10月更文挑战第5天】后端技术深度剖析:构建高效稳定的企业级应用
230 0
|
10月前
|
JavaScript 前端开发 API
深入理解Node.js事件循环及其在后端开发中的应用
本文旨在揭示Node.js的核心特性之一——事件循环,并探讨其对后端开发实践的深远影响。通过剖析事件循环的工作原理和关键组件,我们不仅能够更好地理解Node.js的非阻塞I/O模型,还能学会如何优化我们的后端应用以提高性能和响应能力。文章将结合实例分析事件循环在处理大量并发请求时的优势,以及如何避免常见的编程陷阱,从而为读者提供从理论到实践的全面指导。