netty客户端为多实例,每个实例绑定一个线程,持续阻塞到客户端关闭为止,每个客户端中可以保存自己的业务数据,以便在后续与服务端交互时处理业务使用。客户端执行连接时,给了2次重试的机会,如果3次都没连接成功则放弃。后续可以选择将该消息重新入列消费。我们实际项目中,此处还应该预先给服务端发送一条登录消息,待服务端确认后才能执行后续通讯,这需要视实际情况进行调整。
另一个需要注意的点是EventLoopGroup
是从构造函数传入的,而不是在客户端中创建的,因为当客户端数量非常多时,每个客户端都创建自己的线程组会极大的消耗服务器资源,因此我们在实际使用中是按业务去创建统一的线程组给该业务下的所有客户端共同使用的,线程组的大小需要根据业务需求灵活配置。
在init方法中,我们给客户端加上了一个handler来处理与服务端的交互,下面来看一下具体实现。
package org.example.client.handler; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.timeout.IdleState; import io.netty.handler.timeout.IdleStateEvent; import lombok.extern.slf4j.Slf4j; import org.example.client.NettyClient; import org.springframework.context.annotation.Scope; import org.springframework.stereotype.Component; import java.util.Map; /** * @author ReWind00 * @date 2023/2/15 10:09 */ @Slf4j @Component @Scope("prototype") public class DemoClientHandler extends BaseClientHandler { private final String imei; private final Map<String, Object> bizData; private final NettyClient nettyClient; private int allIdleCounter = 0; private static final int MAX_IDLE_TIMES = 3; public DemoClientHandler(NettyClient nettyClient) { this.nettyClient = nettyClient; this.imei = nettyClient.getImei(); this.bizData = nettyClient.getBizData(); } @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { log.info("客户端imei={},通道激活成功", this.imei); synchronized (this.nettyClient) { //当通道激活后解锁队列线程,然后再发送消息 this.nettyClient.notify(); } } @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { log.warn("客户端imei={},通道断开连接", this.imei); } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { log.info("客户端imei={},收到消息: {}", this.imei, msg); //处理业务... if ("shutdown".equals(msg)) { this.nettyClient.close(); } } @Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { if (evt instanceof IdleStateEvent) { IdleStateEvent e = (IdleStateEvent) evt; boolean flag = false; if (e.state() == IdleState.ALL_IDLE) { this.allIdleCounter++; log.info("客户端imei={}触发闲读或写第{}次", this.imei, this.allIdleCounter); if (this.allIdleCounter >= MAX_IDLE_TIMES) { flag = true; } } if (flag) { log.warn("读写超时达到{}次,主动断开连接", MAX_IDLE_TIMES); ctx.channel().close(); } } } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { log.error("客户端imei={},连接异常{}", imei, cause.getMessage(), cause); } }
DemoClientHandler
也是多实例bean,每个实例持有自己的NettyClient引用,以便在后续处理具体业务。在channelActive方法中,我们可以看到执行了客户端实例的notify方法,此处就是在客户端创建成功并且通道激活后解除wait锁的地方。channelRead方法就是我们处理服务端发送过来的消息的方法,我们的具体业务应该在该方法执行,当然不建议长时间阻塞客户端的工作线程,可以考虑异步处理。
最后我们看一下客户端缓存类。
package org.example.client; import java.util.concurrent.ConcurrentHashMap; /** * @author ReWind00 * @date 2023/2/15 11:01 */ public class NettyClientHolder { private static final ConcurrentHashMap<String, NettyClient> clientMap = new ConcurrentHashMap<>(); public static ConcurrentHashMap<String, NettyClient> get() { return clientMap; } }
由于netty的通道无法序列化,因此不能存入redis,只能缓存在本地内存中,其本质就是一个ConcurrentHashMap。
五、测试
package org.example.client.controller; import org.example.client.QueueHolder; import org.example.client.model.NettyMsgModel; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RequestParam; import org.springframework.web.bind.annotation.RestController; /** * @author ReWind00 * @date 2023/2/15 13:48 */ @RestController @RequestMapping("/demo") public class DemoController { /** * 间隔发送两条消息 */ @GetMapping("testOne") public void testOne() { QueueHolder.get().offer(NettyMsgModel.create("87654321", "Hello World!")); try { Thread.sleep(5000); } catch (InterruptedException e) { e.printStackTrace(); } QueueHolder.get().offer(NettyMsgModel.create("87654321", "Hello World Too!")); } /** * 任意发送消息 * * @param imei * @param msg */ @GetMapping("testTwo") public void testTwo(@RequestParam String imei, @RequestParam String msg) { QueueHolder.get().offer(NettyMsgModel.create(imei, msg)); } /** * 连续发送两条消息 第二条由于redis锁将会重新放回队列延迟消费 */ @GetMapping("testThree") public void testThree() { QueueHolder.get().offer(NettyMsgModel.create("12345678", "Hello World!")); QueueHolder.get().offer(NettyMsgModel.create("12345678", "Hello World Too!")); } }
测试接口代码如上,调用testOne,日志如下:
可以看到第一条消息触发了客户端创建流程,创建后发送了消息,而5秒后的第二条消息直接通过已有通道发送了。
测试接口代码如上,调用testTwo,日志如下:
发送shutdown可以主动断开已有连接。
测试接口代码如上,调用testThree,日志如下:
可以看到第二条消息重新入列并被延迟消费了。
六、源码
https://gitee.com/jaster/netty-tcp-demo
基于 Spring Cloud Alibaba + Gateway + Nacos + RocketMQ + Vue & Element 实现的后台管理系统 + 用户小程序,支持 RBAC 动态权限、多租户、数据权限、工作流、三方登录、支付、短信、商城等功能
后记
本demo项目仅作学习交流使用,如果要应用到生产环境还有些许不足,有问题的同学可以留言交流。