原创Se7enSe7en的架构笔记 2021-03-27 20:52
为什么使用 Netty
Netty 是一个异步事件驱动的网络应用程序框架,用于快速开发可维护的高性能和高伸缩性的服务器和客户端。Netty 拥有高性能,吞吐量更高,延迟更低,减少资源消耗,最小化不必要的内存复制等优点。
Netty 和 NIO
NIO 的缺点
- NIO 的类库和 API 繁杂,学习成本高,你需要熟练掌握 Selector、ServerSocketChannel、SocketChannel、ByteBuffer等。
- 需要熟悉 Java 多线程编程。这是因为 NIO 编程涉及到 Reactor 模式,你必须对多线程和网络编程非常熟悉,才能写出高质量的 NIO 程序。
- 臭名昭著的 epoll bug。它会导致 Selector 空轮询,最终导致 CPU 使用率飙升至 100%。直到 JDK1.7 版本依然没得到根本性的解决。
Netty 的优点
- Netty 对 JDK 自带的 NIO 的 API 进行了良好的封装,API使用简单,学习成本低。
- 功能强大,内置了多种解码编码器,支持多种协议。
- 性能高,对比其他主流的 NIO 框架,Netty 的性能最优。
- 社区活跃,发现 BUG 会及时修复,迭代版本周期短,不断加入新的功能。Dubbo、Elasticsearch 都采用了 Netty,质量得到验证。
Netty线程模型
模型解释
- 1.Netty 抽象出两组线程池 BossGroup 和 WorkerGroup,BossGroup 专门负责接收客户端的连接, WorkerGroup 专门负责网络的读写。
- 2.BossGroup 和 WorkerGroup 类型都是 NioEventLoopGroup。NioEventLoopGroup 相当于是一个事件循环线程组,这个组内含有多个事件循环线程,每一个事件循环线程都是 NioEventLoop。
- 3.每个 NioEventLoop 都有一个 Selector,用于监听注册在其上的 SocketChannel 的网络通讯。
- 4.每个 Boss NioEventLoop 线程内部循环执行的3个步骤:
- 处理 Accept 事件,与客户端建立连接,生成 NioSocketChannel。
- 将 NioSocketChannel 注册到某个 Worker NioEventLoop 上的 Selector。
- 处理任务队列的任务,即 runAllTask。
- 5.每个 Worker NioEventLoop 线程内部循环执行的3个步骤:
- 轮询注册到自己 Selector 上的所有 NioSocketChannel 的 read,write 事件。
- 处理 I/O 事件,即 read,write 事件,在对应的 NioSocketChannel 处理业务。
- runAllTasks 处理任务队列 TaskQueue 的任务,一些耗时的业务处理一般可以放入 TaskQueue 中慢慢处理,这样不影响数据在 Pipeline 中的流动处理。
- 6.每个 Worker NioEventLoop 处理 NioSocketChannel 业务时,会使用 Pipeline(管道),Pipeline 中维护了很多的 handler 处理器用来处理 NioSocketChannel 中的数据。
Netty 客户端 & 服务器开发
创建并配置服务器启动器
Bootstrap、ServerBootstrap
Bootstrap 意思是引导,一个 Netty 应用通常由一个 Bootstrap 开始,主要作用是配置整个 Netty 程序,串联各个组件,Netty 中 Bootstrap 类是客户端程序的启动引导类,ServerBootstrap 是服务端启动引导类。
Bootstrap 和 ServerBootStrap 是 Netty 提供的一个创建客户端和服务端启动器的工厂类,使用这个工厂类非常便利地创建启动类。
group()
服务端要使用两个线程组:
- bossGroup 用于监听客户端连接,专门负责与客户端创建连接,并把连接注册到 workerGroup 的 Selector 中。
- workerGroup 用于处理每一个连接发生的读写事件。
一般创建线程组直接使用以下new就完事了:
EventLoopGroup bossGroup = new NioEventLoopGroup(); EventLoopGroup workerGroup = new NioEventLoopGroup();
默认的线程数是cpu核数的两倍。假设想自定义线程数,可以使用有参构造器:
//设置bossGroup线程数为1 EventLoopGroup bossGroup = new NioEventLoopGroup(1); //设置workerGroup线程数为8 EventLoopGroup workerGroup = new NioEventLoopGroup(8);
channel()
这个方法用于设置通道类型,当建立连接后,会根据这个设置创建对应的 Channel 实例。
常用的就是这两个通道类型,因为是异步非阻塞的。所以是首选:
- NioSocketChannel:异步非阻塞的客户端 TCP Socket 连接。
- NioServerSocketChannel:异步非阻塞的服务器端 TCP Socket 连接。
还有就是同步阻塞的,一般没什么人用:
- OioSocketChannel:同步阻塞的客户端 TCP Socket 连接。
- OioServerSocketChannel:同步阻塞的服务器端 TCP Socket 连接。
option() 与 childOption()
option() 设置的是服务端用于接收进来的连接,也就是 boosGroup 线程。option() 常用参数:
SO_BACKLOG //Socket 参数,服务端接受连接的队列长度,如果队列已满,客户端连接将被拒绝。默认值,Windows 为200,其他为128。
childOption() 是提供给父管道接收到的连接,也就是 workerGroup 线程。childOption() 常用的参数:
SO_RCVBUF //Socket 参数,TCP 数据接收缓冲区大小。 TCP_NODELAY //TCP 参数,立即发送数据,默认值为 True。 SO_KEEPALIVE //Socket 参数,连接保活,默认值为 False。启用该功能时,TCP 会主动探测空闲连接的有效性。
pipeline(ChannelPipeline)
ChannelPipeline 是 Netty 处理请求的责任链,ChannelHandler 则是具体处理请求的处理器。在 Netty 中每个 Channel 都有且仅有一个 ChannelPipeline 与之对应,它们的组成关系如下:处理器 Handler 分为两种:ChannelInboundHandlerAdapter(入站处理器)、ChannelOutboundHandler(出站处理器)。
一个 Channel 包含了一个 ChannelPipeline,而 ChannelPipeline 中又维护了一个由 ChannelHandlerContext 组成的双向链表,并且每个 ChannelHandlerContext 中又关联着一个 ChannelHandler,通过 ChannelHandlerContext 上下文对象,就可以拿到 Channel、Pipeline 等对象,就可以进行读写等操作。
read事件(入站事件)和write事件(出站事件)在一个双向链表中,入站事件会从链表 head 往后传递到最后一个入站的 handler,出站事件会从链表 tail 往前传递到最前一个出站的 Handler。两种类型的 Handler 互不干扰,相同类型的 Handler 的处理顺序是有影响的。
在 Bootstrap 中 childHandler() 方法需要初始化通道,实例化一个 ChannelInitializer,这时候需要重写 initChannel() 初始化通道的方法,装配流水线就是在这个地方进行。代码如下:
//使用匿名内部类的形式初始化通道对象 bootstrap.childHandler(new ChannelInitializer<SocketChannel>() { //创建通道初始化对象,设置初始化参数,在 SocketChannel 建立起来之前执行 @Override protected void initChannel(SocketChannel ch) throws Exception { //对 workerGroup的SocketChannel 设置处理器,调用我们自定义的 NettyServerHandler ch.pipeline().addLast(new NettyServerHandler()); } });
自定义 NettyServerHandler 代码如下:
package com.chengzw.netty.base; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.util.CharsetUtil; /** * 自定义 Handler 需要继承 netty 规定好的某个 HandlerAdapter(规范) * @author 程治玮 * @since 2021/3/25 9:31 下午 */ public class NettyServerHandler extends ChannelInboundHandlerAdapter { /** * 读取客户端发送的数据 * * @param ctx 上下文对象, 含有通道 channel,管道 pipeline * @param msg 就是客户端发送的数据 * @throws Exception */ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { System.out.println("服务器读取线程 " + Thread.currentThread().getName()); //将 msg 转成一个 ByteBuf,类似 NIO 的 ByteBuffer ByteBuf buf = (ByteBuf) msg; System.out.println("客户端发送的消息是:" + buf.toString(CharsetUtil.UTF_8)); } /** * 数据读取完毕后的处理方法 * * @param ctx 上下文对象, 含有通道 channel,管道 pipeline * @throws Exception */ @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { ByteBuf buf = Unpooled.copiedBuffer("HelloClient".getBytes(CharsetUtil.UTF_8)); ctx.writeAndFlush(buf); } /** * 处理异常, 一般是需要关闭通道 * * @param ctx * @param cause * @throws Exception */ @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { ctx.close(); } }
bind()
提供用于服务端或者客户端绑定服务器地址和端口号,默认是异步启动,如果加上 sync() 方法则是同步。
// sync 同步 ChannelFuture channelFuture = bootstrap.bind(9000).sync(); // 异步 // 给cf注册监听器,监听我们关心的事件 channelFuture.addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { if (channelFuture.isSuccess()) { System.out.println("监听端口9000成功"); } else { System.out.println("监听端口9000失败"); } } });
优雅地关闭 EventLoopGroup
//释放掉所有的资源,包括创建的线程 bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully();
服务端启动器完整代码
package com.chengzw.netty.base; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.*; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; /** * Netty 服务端 * @author 程治玮 * @since 2021/3/25 9:31 下午 */ public class NettyServer { public static void main(String[] args) throws InterruptedException { // 创建两个线程组 bossGroup 和 workerGroup, 含有的子线程 NioEventLoop 的个数默认为cpu核数的两倍 // bossGroup 只是处理连接请求 ,真正的和客户端业务处理,会交给 workerGroup 完成 EventLoopGroup bossGroup = new NioEventLoopGroup(1); //1个线程 EventLoopGroup workerGroup = new NioEventLoopGroup(8); //8个线程 try { // 创建服务器端的启动对象 ServerBootstrap bootstrap = new ServerBootstrap(); // 使用链式编程来配置参数 bootstrap.group(bossGroup, workerGroup) //设置两个线程组 // 使用 NioServerSocketChannel 作为服务器的通道实现,该类用于实例化新的 Channel 来接收客户端的连接 .channel(NioServerSocketChannel.class) // 初始化服务器连接队列大小,服务端处理客户端连接请求是顺序处理的,所以同一时间只能处理一个客户端连接。 // 多个客户端同时来的时候,服务端将不能处理的客户端连接请求放在队列中等待处理 .option(ChannelOption.SO_BACKLOG, 1024) .childHandler(new ChannelInitializer<SocketChannel>() { //创建通道初始化对象,设置初始化参数,在 SocketChannel 建立起来之前执行 @Override protected void initChannel(SocketChannel ch) throws Exception { //对 workerGroup的SocketChannel 设置处理器,调用我们自定义的 NettyServerHandler ch.pipeline().addLast(new NettyServerHandler()); } }); System.out.println("netty server start ..."); // 绑定一个端口并且同步, 生成了一个 ChannelFuture 异步对象,通过 isDone() 等方法可以判断异步事件的执行情况 // 启动服务器(并绑定端口),bind 是异步操作,sync 方法是等待异步操作执行完毕 // sync 同步 ChannelFuture channelFuture = bootstrap.bind(9000).sync(); // 异步 // 给cf注册监听器,监听我们关心的事件 /*channelFuture.addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { if (channelFuture.isSuccess()) { System.out.println("监听端口9000成功"); } else { System.out.println("监听端口9000失败"); } } });*/ // 等待服务端监听端口关闭,closeFuture是异步操作 // 通过sync方法同步等待通道关闭处理完毕,这里会阻塞等待通道关闭完成,内部调用的是Object的wait()方法 // 在这里面cf.channel().closeFuture().sync();这个语句的主要目的是,如果缺失上述代码,则main方法所在的线程, // 即主线程会在执行完bind().sync()方法后,会进入finally 代码块,之前的启动的nettyserver也会随之关闭掉,整个程序都结束了。 // 原文的例子有英文注释: // Wait until the server socket is closed,In this example, this does not happen, but you can do that to gracefully shut down your server. // 线程进入wait状态,也就是main线程暂时不会执行到finally里面,nettyserver也持续运行,如果监听到关闭事件,可以优雅的关闭通道和nettyserver, channelFuture.channel().closeFuture().sync(); } finally { // 资源优雅释放 bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } }
创建并配置客户端启动器
客户端只需要一个 NioEventLoopGroup,其余代码和服务器类似。首先自定义 NettyClientHandler 用于处理客户端 ChannelPipeline 的业务。
package com.chengzw.netty.base; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.util.CharsetUtil; /** * 自定义 Handler 需要继承 netty 规定好的某个 HandlerAdapter(规范) * @author 程治玮 * @since 2021/3/25 9:50 下午 */ public class NettyClientHandler extends ChannelInboundHandlerAdapter { /** * 当客户端连接服务器完成就会触发该方法 * * @param ctx * @throws Exception */ @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { ByteBuf buf = Unpooled.copiedBuffer("HelloServer".getBytes(CharsetUtil.UTF_8)); ctx.writeAndFlush(buf); } /** * 当通道有读取事件时会触发,即服务端发送数据给客户端 * * @param ctx * @param msg * @throws Exception */ @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { ByteBuf buf = (ByteBuf) msg; System.out.println("收到服务端的消息:" + buf.toString(CharsetUtil.UTF_8)); System.out.println("服务端的地址: " + ctx.channel().remoteAddress()); } /** * 处理异常, 一般是需要关闭通道 * * @param ctx * @param cause * @throws Exception */ @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); } }
然后配置客户端启动器:
package com.chengzw.netty.base; import io.netty.bootstrap.Bootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; /** * Netty 客户端 * @author 程治玮 * @since 2021/3/25 9:52 下午 */ public class NettyClient { public static void main(String[] args) throws Exception { //客户端需要一个事件循环组 EventLoopGroup group = new NioEventLoopGroup(); try { //创建客户端启动对象 //注意客户端使用的不是ServerBootstrap而是Bootstrap Bootstrap bootstrap = new Bootstrap(); //设置相关参数 bootstrap.group(group) //设置线程组 .channel(NioSocketChannel.class) // 使用NioSocketChannel作为客户端的通道实现 .handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { //加入处理器 ch.pipeline().addLast(new NettyClientHandler()); } }); System.out.println("netty client start.."); //启动客户端去连接服务器端 ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 9000).sync(); //对通道关闭进行监听 channelFuture.channel().closeFuture().sync(); } finally { group.shutdownGracefully(); } } }