1 基本架构
2 代码实现
2.1 Netty服务端:
# 目录结构 sp_netty_server ... - handler - NettyServerHandler.java - rpc - NettyServer.java - ServerChannelInitializer.java SpNettyServerApplication.java
依赖和配置文件:
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <!--lombok--> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <version>1.18.2</version> <optional>true</optional> </dependency> <!--netty--> <dependency> <groupId>io.netty</groupId> <artifactId>netty-all</artifactId> <version>4.1.17.Final</version> </dependency>
server.port=8801
具体代码:
NettyServerHandler.java
@Slf4j public class NettyServerHandler extends ChannelInboundHandlerAdapter { /** * 客户端连接会触发 */ @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { log.info("Channel active ..."); } /** * 客户端发消息会触发 */ @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { log.info("服务器收到消息: {}", msg.toString()); ctx.write(msg.toString()+",Hello too !"); ctx.flush(); } /** * 发生异常触发 */ @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); } }
NettyServer.java
@Slf4j public class NettyServer { public void start() { InetSocketAddress socketAddress = new InetSocketAddress("127.0.0.1", 8082); //new 一个主线程组 EventLoopGroup bossGroup = new NioEventLoopGroup(1); //new 一个工作线程组 EventLoopGroup workGroup = new NioEventLoopGroup(200); ServerBootstrap bootstrap = new ServerBootstrap() .group(bossGroup, workGroup) .channel(NioServerSocketChannel.class) .childHandler(new ServerChannelInitializer()) .localAddress(socketAddress) //设置队列大小 .option(ChannelOption.SO_BACKLOG, 1024) // 两小时内没有数据的通信时,TCP会自动发送一个活动探测数据报文 .childOption(ChannelOption.SO_KEEPALIVE, true); //绑定端口,开始接收进来的连接 try { ChannelFuture future = bootstrap.bind(socketAddress).sync(); log.info("服务器启动开始监听端口: {}", socketAddress.getPort()); future.channel().closeFuture().sync(); } catch (InterruptedException e) { log.error("服务器开启失败", e); } finally { //关闭主线程组 bossGroup.shutdownGracefully(); //关闭工作线程组 workGroup.shutdownGracefully(); } } }
ServerChannelInitializer.java
public class ServerChannelInitializer extends ChannelInitializer<SocketChannel> { @Override protected void initChannel(SocketChannel socketChannel) throws Exception { //添加编解码 socketChannel.pipeline().addLast("decoder", new StringDecoder(CharsetUtil.UTF_8)); socketChannel.pipeline().addLast("encoder", new StringEncoder(CharsetUtil.UTF_8)); socketChannel.pipeline().addLast(new NettyServerHandler()); } }
SpNettyServerApplication.java
@Slf4j @SpringBootApplication public class SpNettyServerApplication extends SpringBootServletInitializer { @Override protected SpringApplicationBuilder configure(SpringApplicationBuilder application) { return application.sources(SpNettyServerApplication.class); } public static void main(String[] args) { SpringApplication.run(SpNettyServerApplication.class, args); //开启Netty服务 NettyServer nettyServer = new NettyServer(); nettyServer.start(); log.info("server is running ..."); } }
2.2 Netty客户端:
# 目录结构 sp_netty_client ... - controller - HelloController.java - data - ResponseResult.java - handler - NettyClientHandler.java - rpc - NettyClientUtil.java SpNettyClientApplication.java
具体代码:
SpNettyClientApplication.java
@SpringBootApplication public class SpNettyClientApplication { public static void main(String[] args) { SpringApplication.run(SpNettyClientApplication.class, args); } }
NettyClientUtil.java
@Slf4j public class NettyClientUtil { public static ResponseResult helloNetty(String msg) { NettyClientHandler nettyClientHandler = new NettyClientHandler(); EventLoopGroup group = new NioEventLoopGroup(); Bootstrap bootstrap = new Bootstrap() .group(group) //该参数的作用就是禁止使用Nagle算法,使用于小数据即时传输 .option(ChannelOption.TCP_NODELAY, true) .channel(NioSocketChannel.class) .handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel socketChannel) throws Exception { socketChannel.pipeline().addLast("decoder", new StringDecoder()); socketChannel.pipeline().addLast("encoder", new StringEncoder()); socketChannel.pipeline().addLast(nettyClientHandler); } }); try { ChannelFuture future = bootstrap.connect("127.0.0.1", 8082).sync(); log.info("客户端发送成功...."); //发送消息 future.channel().writeAndFlush(msg); // 等待连接被关闭 future.channel().closeFuture().sync(); return nettyClientHandler.getResponseResult(); } catch (Exception e) { log.error("客户端Netty失败", e); } finally { //以一种优雅的方式进行线程退出 group.shutdownGracefully(); } return new ResponseResult(500, null, "server error"); } }
NettyClientHandler.java
@Slf4j @Setter @Getter public class NettyClientHandler extends ChannelInboundHandlerAdapter { private ResponseResult responseResult; @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { log.info("client is running ..."); } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { log.info("客户端收到消息: {}", msg.toString()); this.responseResult = new ResponseResult(200, null, msg.toString()); ctx.close(); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); } }
ResponseResult.java
@Setter @Getter @ToString @NoArgsConstructor @AllArgsConstructor public class ResponseResult implements Serializable { private int code; private Object data; private String msg; }
HelloController.java
@RestController public class HelloController { @RequestMapping("/hello/{name}") public ResponseResult helloNetty(@PathVariable("name") String name) { return NettyClientUtil.helloNetty(name); } }
2.3 测试
参考: