前言
Netty 是一个高性能的 NIO 网络框架,本文基于 SpringBoot 以常见的心跳机制来认识 Netty。
最终能达到的效果:
- 客户端每隔 N 秒检测是否需要发送心跳。
- 服务端也每隔 N 秒检测是否需要发送心跳。
- 服务端可以主动 push 消息到客户端。
- 基于 SpringBoot 监控,可以查看实时连接以及各种应用信息。
效果如下:
IdleStateHandler
Netty 可以使用 IdleStateHandler 来实现连接管理,当连接空闲时间太长(没有发送、接收消息)时则会触发一个事件,我们便可在该事件中实现心跳机制。
客户端心跳
当客户端空闲了 N 秒没有给服务端发送消息时会自动发送一个心跳来维持连接。
核心代码代码如下:
public class EchoClientHandle extends SimpleChannelInboundHandler<ByteBuf> { private final static Logger LOGGER = LoggerFactory.getLogger(EchoClientHandle.class); @Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { if (evt instanceof IdleStateEvent){ IdleStateEvent idleStateEvent = (IdleStateEvent) evt ; if (idleStateEvent.state() == IdleState.WRITER_IDLE){ LOGGER.info("已经 10 秒没有发送信息!"); //向服务端发送消息 CustomProtocol heartBeat = SpringBeanFactory.getBean("heartBeat", CustomProtocol.class); ctx.writeAndFlush(heartBeat).addListener(ChannelFutureListener.CLOSE_ON_FAILURE) ; } } super.userEventTriggered(ctx, evt); } @Override protected void channelRead0(ChannelHandlerContext channelHandlerContext, ByteBuf in) throws Exception { //从服务端收到消息时被调用 LOGGER.info("客户端收到消息={}",in.toString(CharsetUtil.UTF_8)) ; } }
实现非常简单,只需要在事件回调中发送一个消息即可。
由于整合了 SpringBoot ,所以发送的心跳信息是一个单例的 Bean。
@Configuration public class HeartBeatConfig { @Value("${channel.id}") private long id ; @Bean(value = "heartBeat") public CustomProtocol heartBeat(){ return new CustomProtocol(id,"ping") ; } }
这里涉及到了自定义协议的内容,请继续查看下文。
当然少不了启动引导:
@Component public class HeartbeatClient { private final static Logger LOGGER = LoggerFactory.getLogger(HeartbeatClient.class); private EventLoopGroup group = new NioEventLoopGroup(); @Value("${netty.server.port}") private int nettyPort; @Value("${netty.server.host}") private String host; private SocketChannel channel; @PostConstruct public void start() throws InterruptedException { Bootstrap bootstrap = new Bootstrap(); bootstrap.group(group) .channel(NioSocketChannel.class) .handler(new CustomerHandleInitializer()) ; ChannelFuture future = bootstrap.connect(host, nettyPort).sync(); if (future.isSuccess()) { LOGGER.info("启动 Netty 成功"); } channel = (SocketChannel) future.channel(); } } public class CustomerHandleInitializer extends ChannelInitializer<Channel> { @Override protected void initChannel(Channel ch) throws Exception { ch.pipeline() //10 秒没发送消息 将IdleStateHandler 添加到 ChannelPipeline 中 .addLast(new IdleStateHandler(0, 10, 0)) .addLast(new HeartbeatEncode()) .addLast(new EchoClientHandle()) ; } }
所以当应用启动每隔 10 秒会检测是否发送过消息,不然就会发送心跳信息。
服务端心跳
服务器端的心跳其实也是类似,也需要在 ChannelPipeline 中添加一个 IdleStateHandler 。
public class HeartBeatSimpleHandle extends SimpleChannelInboundHandler<CustomProtocol> { private final static Logger LOGGER = LoggerFactory.getLogger(HeartBeatSimpleHandle.class); private static final ByteBuf HEART_BEAT = Unpooled.unreleasableBuffer(Unpooled.copiedBuffer(new CustomProtocol(123456L,"pong").toString(),CharsetUtil.UTF_8)); /** * 取消绑定 * @param ctx * @throws Exception */ @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { NettySocketHolder.remove((NioSocketChannel) ctx.channel()); } @Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { if (evt instanceof IdleStateEvent){ IdleStateEvent idleStateEvent = (IdleStateEvent) evt ; if (idleStateEvent.state() == IdleState.READER_IDLE){ LOGGER.info("已经5秒没有收到信息!"); //向客户端发送消息 ctx.writeAndFlush(HEART_BEAT).addListener(ChannelFutureListener.CLOSE_ON_FAILURE) ; } } super.userEventTriggered(ctx, evt); } @Override protected void channelRead0(ChannelHandlerContext ctx, CustomProtocol customProtocol) throws Exception { LOGGER.info("收到customProtocol={}", customProtocol); //保存客户端与 Channel 之间的关系 NettySocketHolder.put(customProtocol.getId(),(NioSocketChannel)ctx.channel()) ; } }