相信如果是搞java的大伙在工作一两年后一定听过 netty
的大名,使用netty
我们可以简单快速的开发出tcp、udp连接协议的程序。
本文带给大家一个netty程序的server端实战演练,麻雀虽小,五脏俱全,具体讲解都在代码中
pom配置
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.wayn.nettyproject</groupId> <artifactId>netty-project</artifactId> <version>1.0-SNAPSHOT</version> <name>netty-project</name> <!-- FIXME change it to the project's website --> <url>http://www.example.com</url> <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <maven.compiler.source>1.8</maven.compiler.source> <maven.compiler.target>1.8</maven.compiler.target> </properties> <dependencies> <!--netty依赖--> <dependency> <groupId>io.netty</groupId> <artifactId>netty-all</artifactId> <!-- Use 'netty-all' for 4.0 or above --> <version>4.1.72.Final</version> </dependency> <!--lombok 依赖--> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <version>1.18.22</version> </dependency> <!--##### log4j2 start ####--> <!--log4j2核心包--> <!-- https://mvnrepository.com/artifact/org.apache.logging.log4j/log4j-core --> <dependency> <groupId>org.apache.logging.log4j</groupId> <artifactId>log4j-core</artifactId> <version>2.17.1</version> </dependency> <!--用于与slf4j保持桥接(里面自动依赖了slf4j-api)--> <dependency> <groupId>org.apache.logging.log4j</groupId> <artifactId>log4j-slf4j-impl</artifactId> <version>2.17.1</version> </dependency> <!-- 异步日志,需要加入disruptor依赖 --> <dependency> <groupId>com.lmax</groupId> <artifactId>disruptor</artifactId> <version>3.4.4</version> </dependency> <!--##### log4j2 end ####--> <!-- fastjson --> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.79</version> </dependency> <dependency> <groupId>cn.hutool</groupId> <artifactId>hutool-all</artifactId> <version>5.7.18</version> </dependency> </dependencies> <build> </build> </project>
tcp服务端
/** * tcp服务端 */ @Slf4j public class NettyServer { // 绑定端口 private static final int port = 99; // 维护channel列表,将用户信息和channel做绑定,一个channel代表一个tcp连接 private static final Map<String, Object> channelMap = new HashMap<>(); public static void main(String[] args) throws InterruptedException { // 创建服务端启动器 ServerBootstrap serverBootstrap = new ServerBootstrap(); // 指定连接接收线程,处理所有用户连接,转给工作线程 NioEventLoopGroup boss = null; // 指定工作线程,处理用户连接 NioEventLoopGroup worker = null; try { boss = new NioEventLoopGroup(); worker = new NioEventLoopGroup(); serverBootstrap.group(boss, worker); // 绑定线程组 serverBootstrap.channel(NioServerSocketChannel.class); // 底层使用nio处理 serverBootstrap.childHandler(new ChannelInitializer<NioSocketChannel>() { protected void initChannel(NioSocketChannel ch) { // 创建一个心跳检测的handle,服务端创建连接后,5秒内没有收到客户端的心跳就会触发IdleState.READER_IDLE IdleStateHandler idleStateHandler = new IdleStateHandler(5, 0, 0) { // 5秒内没有收到客户端的心跳会触发userEventTriggered方法 @Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) { if (evt instanceof IdleStateEvent) { IdleStateEvent e = (IdleStateEvent) evt; // 检测是读状态就发送pong消息 if (e.state() == IdleState.READER_IDLE) { ByteBuf buffer = Unpooled.buffer(); buffer.writeBytes("pong".getBytes(StandardCharsets.UTF_8)); ctx.channel().writeAndFlush(buffer).addListener(ChannelFutureListener.CLOSE_ON_FAILURE); String id = ctx.channel().id().asLongText(); // 删除channelMap中客户端连接 channelMap.forEach((s, o) -> { if (s.equals(id)) { channelMap.remove(s); } }); } } } // 发生异常时,关闭连接 @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { log.error(cause.getMessage(), cause); String id = ctx.channel().id().asLongText(); channelMap.remove(id); ctx.close(); } }; // 通过对象实例,而不是new创建,避免大量客户端连接导致浪费空间 ch.pipeline().addLast("idleStateHandler", idleStateHandler); // 换行符解码器,指定包与包之间的分隔符是"\n"和"\r\n",避免拆包粘包 ch.pipeline().addLast(new LineBasedFrameDecoder(1024)); // netty日志记录,打印包信息 ch.pipeline().addLast(new LoggingHandler(LogLevel.INFO)); // 入站字符解码器 ch.pipeline().addLast("decoder", new StringDecoder(CharsetUtil.UTF_8)); // 自定义解码器,实现自定义业务逻辑,使用SimpleChannelInboundHandler可以避免ByteBuf的回收问题 ch.pipeline().addLast("MyHandle", new SimpleChannelInboundHandler<String>() { // 连接第一次创建会调用这个方法 @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { String id = ctx.channel().id().asLongText(); log.info("客户端建立连接:{}", id); channelMap.put(id, ctx.channel()); ctx.channel().writeAndFlush("this is come from server!"); super.channelActive(ctx); } // 获取来自客户端的数据 @Override protected void channelRead0(ChannelHandlerContext ctx, String s) { log.info("收到客户端消息:{}", s); ctx.channel().writeAndFlush("server reply\r\n"); } }); // 出站字符编码器 ch.pipeline().addLast("encoder", new StringEncoder(CharsetUtil.UTF_8)); } }); ChannelFuture channelFuture = serverBootstrap.bind(port).sync().addListener(future -> { if (future.isSuccess()) { log.info("server start up on {}", port); } }); channelFuture.channel().closeFuture().sync(); } finally { // 不要忘了在finally中关闭线程组 if (boss != null) { boss.shutdownGracefully(); } if (worker != null) { worker.shutdownGracefully(); } } } }
本地测试