Netty的使用

简介: 一个好的通信框架是怎样的?同时如何使用netty?一个好的通信框架,必然是支持双工通信的,同时它能够对半包黏包进行处理,方便高效的编解码、序列化,拥有心跳超时机制,同时支持大量连接并发,方便调用。而这个通信的过程,我始终是觉得它的起源是三次握手和四次挥手。它们影响着消息中间件和通信框架以及SOA框架的发展。netty最简单的是它的EchoServer和EchoClient,两者有同时有自己对应的处理器EchoServerHandler、EchoClientHandler。

1.问题

一个好的通信框架是怎样的?同时如何使用netty?

一个好的通信框架,必然是支持双工通信的,同时它能够对半包黏包进行处理,方便高效的编解码、序列化,拥有心跳超时机制,同时支持大量连接并发,方便调用。而这个通信的过程,我始终是觉得它的起源是三次握手和四次挥手。它们影响着消息中间件和通信框架以及SOA框架的发展。

2.netty中的echo例子

netty最简单的是它的EchoServer和EchoClient,两者有同时有自己对应的处理器EchoServerHandler、EchoClientHandler。

在EchoServer中,通常需要创建两个线程boss线程组和worker线程组。而在于创建线程组NioEventLoop的过程中,会执行openSelector()方法。此时会开启多路复用器。因此这个时候联想到如果使用java的nio的时候,必然需要首先创建需要连接的对象,然后根据连接对象,打开多路复用选择器。然后执行读写方法。而Netty的思想就是这样的。首先创建好连接,然后根据连接执行,然后执行读事件。而这个过程的执行都会经过processSelectedKeys处理选中的keys,也即事件。当如果写满了,没法写的时候,此时就会注册写事件,当可以写的时候,然后会执行写操作。

读写是在I/O 事件由 ChannelInboundHandler 或ChannelOutboundHandler}处理并通过调用中定义的事件传播方法转发到ChannelHandlerContext,例如 ChannelHandlerContext#fireChannelRead(Object)和ChannelHandlerContext#write(Object),进行数据的处理和写操作。

入站InBound数据通常是通过事件输入操作从远程读取的,如:SocketChannel#read(Buffer)。

出站OutBound处理程序通常会生产或者转换出站数据,如写入数据,I/O线程经常执行实际的输出操作,如SocketChannel#write(ByteBuffer)

Netty中的相关事件ops:

OP_READ=1<<0; 读事件1OP_WRITE=1<<2; 写事件4OP_CONNECT=1<<3; 连接事件8OP_ACCEPT=1<<4; accpet事件16

3.业务中使用Netty

这里不对netty整合spring,同时设置编解码、相应的请求code和响应code,这里就不进行展开了。

在血透业务中,对接测温仪,首先需要采集人脸图片信息,方便人脸识别来测定人体温的时候,存储起来。

微信图片_20221214032925.png

业务处理整合Netty

为了将体温仪嵌入到我们的业务中,使用了Netty来保证高性能的信息通信和传输。自然我们需要考虑的首先是netty如何加入到血透系统中。

考虑到设备方需要进行人脸识别的对比,因此首先需要采集人脸数据,然后对人脸数据进行保存。然后后期以此进行对比。比对识别成功,则可以进行体温的测量,否则需要先进行人脸采集,然后进行体温的测量。

首先启动netty的过程中需要考虑加入到服务中,然后在启动nettyServer,而在启动的过程中,需要考虑传输过程中的编解码问题,这里需要根据传入信息的大小,进行内存的分配,为了合理的使用,分配一个新的接收缓冲区,其容量可能足够大以读取所有入站数据,而又足够小不要浪费其空间。

//设置bossGroup组配置serverBootstrap=serverBootstrap.option(ChannelOption.RCVBUF_ALLOCATOR, newAdaptiveRecvByteBufAllocator(64, 10496, 1048576));
// 设置workerGroup组配置serverBootstrap=serverBootstrap.childOption(ChannelOption.RCVBUF_ALLOCATOR, newAdaptiveRecvByteBufAllocator(64, 10496, 1048576));
// 设置处理事件链 进行编解码的设置以及适配器,考虑后期的扩展serverBootstrap=serverBootstrap.childHandler(newNettyChannelInitializer<SocketChannel>());

而适配器中,是我们进行业务处理的关键,执行入站操作。为什么是入站?

在netty中,入站是消息进入接收缓冲区,而出站是消息从发送缓冲区中刷出。也即入站操作主要是指读取数据的操作;而出站操作主要是指写入数据的操作。由于我们需要从设备方中读取数据,因此是入站操作。

那么这个过程首先需要请求设备方,然后根据设备方的响应,从而执行具体业务逻辑的处理。因为业务是基于人脸来来识别的,因此第一个信息是基于人脸的,服务端如果识别人脸,则基于深度学习根据人脸识别特征匹配的人脸,则可以根据人脸信息的数据信息,来记录人的数据信息推送到客户端,而客户端可以根据服务端的信息,进行记录。在这个过程中,同时支持进行离线识别上传的情况。

4.具体业务处理代码

这里省略编解码、序列化,只截取逻辑上的一部分:

@OverridepublicvoidchannelRead(ChannelHandlerContextctx, Objectmsg) throwsException {
super.channelRead(ctx, msg);
Messagemessage= (Message) msg;
MessageDatamessageData=JSONObject.parseObject(message.getData(), MessageData.class);
//创建响应,设置响应信息Responseresponse=newResponse();
response.setResult(CmdConstant.resultStatus);
response.setDesc(CmdConstant.descStr);
response.setMessageId(messageData.getMessageId());
log.debug("客户端消息:"+JSONObject.toJSONString(message));
autowired();
switch (message.getCmd()) {
caseCmdConstant.loginReq:
// 设备登录(login) 客户端请求服务端log.info("设备登录");
ChannelMap.channels.add(ctx.channel());
response.setMessage(CmdConstant.loginRspStr);
//将信息推送到设备中PushCmdUtils.pushCmdToServer(ctx.channel(), CmdConstant.loginRsp, response);
break;
caseCmdConstant.registerReq:
// 设备注册(register)log.info("设备注册");
RegisterRespregisterResp=newRegisterResp();
registerResp.setMachId("123456778888");
registerResp.setPasswd("123456");
response.setData(registerResp);
response.setMessage(CmdConstant.registerRspStr);
PushCmdUtils.pushCmdToServer(ctx.channel(), CmdConstant.registerRsp, response);
break;
caseCmdConstant.heartBeatReq:
// 设备心跳log.debug("设备心跳");
HeartBeatRespheartBeatResp=newHeartBeatResp();
response.setData(heartBeatResp);
response.setMessage(CmdConstant.heartbeatRspStr);
PushCmdUtils.pushCmdToServer(ctx.channel(), CmdConstant.heartBeatRsp, response);
break;
caseCmdConstant.getUserReq:
log.info("获取人员信息");
UserRequserReq=getUserReq(messageData);
response.setData(userReq);
response.setMessage(CmdConstant.getUserRspStr);
PushCmdUtils.pushCmdToServer(ctx.channel(), CmdConstant.getUserRsp, response);
break;
// 服务端传过来的,进行解析,然后入库caseCmdConstant.snapshotFaceReq:
log.info("上传识别记录");
SnapshotFacesnapshotFace=JSONObject.parseObject(messageData.getData(), SnapshotFace.class);
if (snapshotFace!=null) {   
LongfkPatientId=PushCmdUtils.intToPatientId(Integer.valueOf(snapshotFace.getUserId()), sysTenantService.getDefaultId());
HdPatienthdPatient=hdPatientManager.selectByPrimaryKey(fkPatientId);
BigDecimaltemperature=BigDecimal.valueOf(Double.parseDouble(snapshotFace.getTemperature()));
if (Objects.nonNull(hdPatient)) {
//更新患者透析体温数据,此时保存体温信息hdDrBaseService.saveTemperature(fkPatientId, temperature);
                    } 
                }
PushCmdUtils.pushCmdToServer(ctx.channel(), CmdConstant.snapshotFaceRsp, response);
break;
             ... //省略部分逻辑//离线上传和识别,通过系统    caseCmdConstant.offlineFaceReq:
log.info("离线识别记录上传(带图片)");
PushCmdUtils.pushCmdToServer(ctx.channel(), CmdConstant.snapshotFaceRsp, response);
break;
caseCmdConstant.offlineNoFaceReq:
log.info("离线识别记录上传(无图片)");
PushCmdUtils.pushCmdToServer(ctx.channel(), CmdConstant.snapshotFaceRsp, response);
break;
default:
log.info("客户端消息:"+JSONObject.toJSONString(message));
response.setData(newHashMap<>());
PushCmdUtils.pushCmdToServer(ctx.channel(), CmdConstant.heartBeatRsp, response);
log.info("无法识别命令");
break;
        }
    }

而服务端会根据客户端请求的reqCode来进行对应的处理,使用处理器进行处理。而这种处理方式在使用Netty的RocketMQ和sofa-bolt中是可以看到的。那么你是不是很好奇netty的应用,同时方便netty的使用。

下一篇来看sofa-bolt是如何封装netty来给我们带来方便使用的。

目录
相关文章
|
7月前
netty
netty
38 0
|
1月前
|
存储 Java API
Netty指南
Netty指南
39 2
|
3月前
|
设计模式 网络协议 Java
Netty | 一起来了解了解Netty吧
Netty | 一起来了解了解Netty吧
31 0
|
4月前
|
监控 前端开发 Java
Netty使用篇
Netty使用篇
|
5月前
|
安全 Java Linux
Netty4 使用总结
Netty4 使用总结
31 0
|
7月前
netty练习
netty练习
27 0
|
8月前
|
消息中间件 网络协议 Dubbo
Netty是什么,为什么要使用Netty?
最近,也不知道什么原因,经常有粉丝问我关于Netty的问题。难道是大厂面试更卷了,开始关注更加底层的框架了?先不深究什么原因了,今天,我给大家分享一下什么是Netty,它能解决什么问题?
46 0
|
11月前
|
JSON 移动开发 网络协议
Netty应用篇
Netty应用篇
62 0
|
设计模式 安全 Java
Netty是什么?怎么学?
Netty是什么?怎么学?
135 0
|
消息中间件 分布式计算 网络协议
Netty之初识
Netty之初识
89 0