这里有点需要注意:
当有多个客户端连上来时,服务端需要区分开,不然响应消息就会发生混乱。
所以每当有个连接上来的时候,我们都将当前的 Channel 与连上的客户端 ID 进行关联(因此每个连上的客户端 ID 都必须唯一)。
这里采用了一个 Map 来保存这个关系,并且在断开连接时自动取消这个关联。
public class NettySocketHolder { private static final Map<Long, NioSocketChannel> MAP = new ConcurrentHashMap<>(16); public static void put(Long id, NioSocketChannel socketChannel) { MAP.put(id, socketChannel); } public static NioSocketChannel get(Long id) { return MAP.get(id); } public static Map<Long, NioSocketChannel> getMAP() { return MAP; } public static void remove(NioSocketChannel nioSocketChannel) { MAP.entrySet().stream().filter(entry -> entry.getValue() == nioSocketChannel).forEach(entry -> MAP.remove(entry.getKey())); } }
启动引导程序:
Component public class HeartBeatServer { private final static Logger LOGGER = LoggerFactory.getLogger(HeartBeatServer.class); private EventLoopGroup boss = new NioEventLoopGroup(); private EventLoopGroup work = new NioEventLoopGroup(); @Value("${netty.server.port}") private int nettyPort; /** * 启动 Netty * * @return * @throws InterruptedException */ @PostConstruct public void start() throws InterruptedException { ServerBootstrap bootstrap = new ServerBootstrap() .group(boss, work) .channel(NioServerSocketChannel.class) .localAddress(new InetSocketAddress(nettyPort)) //保持长连接 .childOption(ChannelOption.SO_KEEPALIVE, true) .childHandler(new HeartbeatInitializer()); ChannelFuture future = bootstrap.bind().sync(); if (future.isSuccess()) { LOGGER.info("启动 Netty 成功"); } } /** * 销毁 */ @PreDestroy public void destroy() { boss.shutdownGracefully().syncUninterruptibly(); work.shutdownGracefully().syncUninterruptibly(); LOGGER.info("关闭 Netty 成功"); } } public class HeartbeatInitializer extends ChannelInitializer<Channel> { @Override protected void initChannel(Channel ch) throws Exception { ch.pipeline() //五秒没有收到消息 将IdleStateHandler 添加到 ChannelPipeline 中 .addLast(new IdleStateHandler(5, 0, 0)) .addLast(new HeartbeatDecoder()) .addLast(new HeartBeatSimpleHandle()); } }
也是同样将IdleStateHandler 添加到 ChannelPipeline 中,也会有一个定时任务,每5秒校验一次是否有收到消息,否则就主动发送一次请求。
因为测试是有两个客户端连上所以有两个日志。
自定义协议
上文其实都看到了:服务端与客户端采用的是自定义的 POJO 进行通讯的。
所以需要在客户端进行编码,服务端进行解码,也都只需要各自实现一个编解码器即可。
CustomProtocol:
public class CustomProtocol implements Serializable{ private static final long serialVersionUID = 4671171056588401542L; private long id ; private String content ; //省略 getter/setter }
客户端的编码器:
public class HeartbeatEncode extends MessageToByteEncoder<CustomProtocol> { @Override protected void encode(ChannelHandlerContext ctx, CustomProtocol msg, ByteBuf out) throws Exception { out.writeLong(msg.getId()) ; out.writeBytes(msg.getContent().getBytes()) ; } }
也就是说消息的前八个字节为 header,剩余的全是 content。
服务端的解码器:
public class HeartbeatDecoder extends ByteToMessageDecoder { @Override protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception { long id = in.readLong() ; byte[] bytes = new byte[in.readableBytes()] ; in.readBytes(bytes) ; String content = new String(bytes) ; CustomProtocol customProtocol = new CustomProtocol() ; customProtocol.setId(id); customProtocol.setContent(content) ; out.add(customProtocol) ; } }
只需要按照刚才的规则进行解码即可。
实现原理
其实联想到 IdleStateHandler 的功能,自然也能想到它实现的原理:
应该会存在一个定时任务的线程去处理这些消息。
来看看它的源码:
首先是构造函数:
public IdleStateHandler( int readerIdleTimeSeconds, int writerIdleTimeSeconds, int allIdleTimeSeconds) { this(readerIdleTimeSeconds, writerIdleTimeSeconds, allIdleTimeSeconds, TimeUnit.SECONDS); }
其实就是初始化了几个数据:
- readerIdleTimeSeconds:一段时间内没有数据读取
- writerIdleTimeSeconds:一段时间内没有数据发送
- allIdleTimeSeconds:以上两种满足其中一个即可
因为 IdleStateHandler 也是一种 ChannelHandler,所以会在 channelActive
中初始化任务:
@Override public void channelActive(ChannelHandlerContext ctx) throws Exception { // This method will be invoked only if this handler was added // before channelActive() event is fired. If a user adds this handler // after the channelActive() event, initialize() will be called by beforeAdd(). initialize(ctx); super.channelActive(ctx); } private void initialize(ChannelHandlerContext ctx) { // Avoid the case where destroy() is called before scheduling timeouts. // See: https://github.com/netty/netty/issues/143 switch (state) { case 1: case 2: return; } state = 1; initOutputChanged(ctx); lastReadTime = lastWriteTime = ticksInNanos(); if (readerIdleTimeNanos > 0) { readerIdleTimeout = schedule(ctx, new ReaderIdleTimeoutTask(ctx), readerIdleTimeNanos, TimeUnit.NANOSECONDS); } if (writerIdleTimeNanos > 0) { writerIdleTimeout = schedule(ctx, new WriterIdleTimeoutTask(ctx), writerIdleTimeNanos, TimeUnit.NANOSECONDS); } if (allIdleTimeNanos > 0) { allIdleTimeout = schedule(ctx, new AllIdleTimeoutTask(ctx), allIdleTimeNanos, TimeUnit.NANOSECONDS); } }
也就是会按照我们给定的时间初始化出定时任务。