Spring Boot 整合 Netty 框架实现心跳检测和自动重连
1. 引入依赖
在 pom.xml
中添加 Netty 和 Spring Boot 相关依赖。
<dependencies> <!-- Spring Boot Starter --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter</artifactId> </dependency> <!-- Netty Dependency --> <dependency> <groupId>io.netty</groupId> <artifactId>netty-all</artifactId> <version>4.1.63.Final</version> </dependency> <!-- 其他相关依赖 --> </dependencies>
2. 配置 Netty 服务端
创建一个 Netty 服务器启动类,配置心跳检测机制。
import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.handler.timeout.IdleStateHandler; import org.springframework.boot.CommandLineRunner; import org.springframework.stereotype.Component; import java.util.concurrent.TimeUnit; @Component public class NettyServer implements CommandLineRunner { private final int port = 8080; @Override public void run(String... args) throws Exception { EventLoopGroup bossGroup = new NioEventLoopGroup(); EventLoopGroup workerGroup = new NioEventLoopGroup(); try { ServerBootstrap b = new ServerBootstrap(); b.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .childHandler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast(new IdleStateHandler(5, 7, 10, TimeUnit.SECONDS)); ch.pipeline().addLast(new HeartbeatHandler()); } }) .option(ChannelOption.SO_BACKLOG, 128) .childOption(ChannelOption.SO_KEEPALIVE, true); ChannelFuture f = b.bind(port).sync(); f.channel().closeFuture().sync(); } finally { workerGroup.shutdownGracefully(); bossGroup.shutdownGracefully(); } } }
3. 实现心跳检测处理器
创建一个 HeartbeatHandler
类处理心跳检测。
import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.handler.timeout.IdleStateEvent; import io.netty.handler.timeout.IdleState; public class HeartbeatHandler extends ChannelInboundHandlerAdapter { @Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { if (evt instanceof IdleStateEvent) { IdleStateEvent event = (IdleStateEvent) evt; if (event.state() == IdleState.READER_IDLE) { System.out.println("读空闲"); // 关闭连接 ctx.close(); } else if (event.state() == IdleState.WRITER_IDLE) { System.out.println("写空闲"); } else if (event.state() == IdleState.ALL_IDLE) { System.out.println("读写空闲"); // 发送心跳包 ctx.writeAndFlush("ping\n"); } } else { super.userEventTriggered(ctx, evt); } } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { cause.printStackTrace(); ctx.close(); } }
4. 配置 Netty 客户端
创建一个 Netty 客户端启动类,实现自动重连和心跳检测。
import io.netty.bootstrap.Bootstrap; import io.netty.channel.*; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.handler.timeout.IdleStateHandler; import org.springframework.stereotype.Component; import java.util.concurrent.TimeUnit; @Component public class NettyClient { private final String host = "localhost"; private final int port = 8080; private final int MAX_RETRY = 5; private int retry = 0; public void start() { EventLoopGroup group = new NioEventLoopGroup(); try { Bootstrap b = new Bootstrap(); b.group(group) .channel(NioSocketChannel.class) .option(ChannelOption.SO_KEEPALIVE, true) .handler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast(new IdleStateHandler(0, 4, 0, TimeUnit.SECONDS)); ch.pipeline().addLast(new ClientHeartbeatHandler()); } }); connect(b); } catch (Exception e) { e.printStackTrace(); } } private void connect(Bootstrap b) { b.connect(host, port).addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { if (future.isSuccess()) { System.out.println("连接服务器成功"); } else { System.out.println("连接服务器失败,尝试重连"); retry++; if (retry < MAX_RETRY) { future.channel().eventLoop().schedule(() -> connect(b), 2 << retry, TimeUnit.SECONDS); } else { System.out.println("重连失败次数达到最大,放弃连接"); } } } }); } }
5. 实现客户端心跳处理器
创建一个 ClientHeartbeatHandler
类处理心跳包。
import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.handler.timeout.IdleStateEvent; import io.netty.handler.timeout.IdleState; public class ClientHeartbeatHandler extends ChannelInboundHandlerAdapter { @Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { if (evt instanceof IdleStateEvent) { IdleStateEvent event = (IdleStateEvent) evt; if (event.state() == IdleState.WRITER_IDLE) { System.out.println("发送心跳包"); ctx.writeAndFlush("ping\n"); } } else { super.userEventTriggered(ctx, evt); } } @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { System.out.println("连接断开,尝试重连"); // 在这里实现重连逻辑 // 比如: ctx.channel().eventLoop().schedule(() -> connect(), 5, TimeUnit.SECONDS); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { cause.printStackTrace(); ctx.close(); } }
6. 启动 Spring Boot 应用
在 Spring Boot 的主类中启动 Netty 服务器和客户端。
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import javax.annotation.PostConstruct; @SpringBootApplication public class NettySpringBootApplication { @Autowired private NettyServer nettyServer; @Autowired private NettyClient nettyClient; public static void main(String[] args) { SpringApplication.run(NettySpringBootApplication.class, args); } @PostConstruct public void startNetty() { new Thread(() -> { try { nettyServer.run(); } catch (Exception e) { e.printStackTrace(); } }).start(); new Thread(() -> nettyClient.start()).start(); } }
关键点总结
依赖引入:确保引入了 Spring Boot 和 Netty 的必要依赖。
Netty 服务器配置:使用 ServerBootstrap 配置服务器端,包括心跳检测处理。
Netty 客户端配置:使用 Bootstrap 配置客户端,实现自动重连和心跳检测。
心跳处理器:在服务器端和客户端分别实现心跳检测处理器,处理心跳包和连接超时。
Spring Boot 集成:在 Spring Boot 应用启动时启动 Netty 服务器和客户端。