Netty框架的主要线程就是I/O线程,线程模型设计的好坏,决定了系统的吞吐量、并发性和安全性等架构质量属性。Netty的线程模型被精心地设计,既提升了框架的并发性能,又能在很大程度避免锁,局部实现了无锁化设计。
线程模型
一般首先会想到的是经典的Reactor线程模型,尽管不同的NIO框架对于Reactor模式的实现存在差异,但本质上还是遵循了Reactor的基础线程模型。
Reactor单线程模型
Reactor单线程模型,是指所有的I/O操作都在同一个NIO线程上面完成。
NIO线程的职责如下:
- 作为NIO服务端,接收客户端的TCP连接;
- 作为NIO客户端,向服务端发起TCP连接;
- 读取通信对端的请求或者应答消息;
- 向通信对端发送消息请求或者应答消息。
由于Reactor模式使用的是异步非阻塞I/O,所有的I/O操作都不会导致阻塞,理论上一个线程可以独立处理所有I/O相关的操作。从架构层面看,一个NIO线程确实可以完成其承担的职责。例如,通过Acceptor类接收客户端的TCP连接请求消息,当链路建立成功之后,通过Dispatch将对应的ByteBuffer派发到指定的Handler上,进行消息解码。用户线程消息编码后通过NIO线程将消息发送给客户端。
在一些小容量应用场景下,可以使用单线程模型。但是这对于高负载、大并发的应用场景却不合适,主要原因如下:
- 一个NIO线程同时处理成百上千的链路,性能上无法支撑,即便NIO线程的CPU负荷达到100%,也无法满足海量消息的编码、解码、读取和发送。
- 当NIO线程负载过重之后,处理速度将变慢,这会导致大量客户端连接超时,超时之后往往会进行重发,这更加重了NIO线程的负载,最终会导致大量消息积压和处理超时,成为系统的性能瓶颈。
- 可靠性问题:一旦NIO线程意外跑飞,或者进入死循环,会导致整个系统通信模块不可用,不能接收和处理外部消息,造成节点故障。
Reactor多线程模型
Rector多线程模型与单线程模型最大的区别就是有一组NIO线程来处理I/O操作。
Reactor多线程模型的特点如下:
- 有专门一个NIO线程——Acceptor线程用于监听服务端,接收客户端的TCP连接请求。
- 网络I/O操作——读、写等由一个NIO线程池负责,线程池可以采用标准的JDK线程池实现,它包含一个任务队列和N个可用的线程,由这些NIO线程负责消息的读取、解码、编码和发送。
- 一个NIO线程可以同时处理N条链路,但是一个链路只对应一个NIO线程,防止发生并发操作问题。
在绝大多数场景下,Reactor多线程模型可以满足性能需求。但是,在个别特殊场景中,一个NIO线程负责监听和处理所有的客户端连接可能会存在性能问题。例如并发百万客户端连接,或者服务端需要对客户端握手进行安全认证,但是认证本身非常损耗性能。在这类场景下,单独一个Acceptor线程可能会存在性能不足的问题,为了解决性能问题,产生了第三种Reactor线程模型——主从Reactor多线程模型。
主从Reactor多线程模型
主从Reactor线程模型的特点是:服务端用于接收客户端连接的不再是一个单独的NIO线程,而是一个独立的NIO线程池。Acceptor接收到客户端TCP连接请求并处理完成后(可能包含接入认证等),将新创建的SocketChannel注册到I/O线程池(sub reactor线程池)的某个I/O线程上,由它负责SocketChannel的读写和编解码工作。Acceptor线程池仅仅用于客户端的登录、握手和安全认证,一旦链路建立成功,就将链路注册到后端subReactor线程池的I/O线程上,由I/O线程负责后续的I/O操作。
利用主从NIO线程模型,可以解决一个服务端监听线程无法有效处理所有客户端连接的性能不足问题。因此,在Netty的官方demo中,推荐使用该线程模型。
Netty的线程模型
Netty的线程模型并不是一成不变的,它实际取决于用户的启动参数配置。通过设置不同的启动参数,Netty可以同时支持Reactor单线程模型、多线程模型和主从Reactor多线层模型。
下面让我们通过一张原理图(图18-4)来快速了解Netty的线程模型。
可以通过Netty服务端启动代码来了解它的线程模型:
EventLoopGroup bossGroup = new NioEventLoopGroup(); EventLoopGroup workerGroup = new NioEventLoopGroup(); ServerBootstrap b = new ServerBootstrap(); b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class) .option(ChannelOption.SO_BACKLOG, 100) .handler(new LoggingHandler(LogLevel.INFO)) .childHandler(new ChannelInitializer() { @Override public void initChannel(Channel ch)throws IOException{ ch.pipeline().addLast(new NettyMessageDecoder(1024 * 1024, 4, 4)); ch.pipeline().addLast(new NettyMessageEncoder()); ch.pipeline().addLast("readTimeoutHandler",new ReadTimeoutHandler(50)); ch.pipeline().addLast(new LoginAuthRespHandler()); ch.pipeline().addLast("HeartBeatHandler",new HeartBeatRespHandler()); } }); // 绑定端口,同步等待成功 b.bind(NettyConstant.REMOTEIP, NettyConstant.PORT).sync();
服务端启动的时候,创建了两个NioEventLoopGroup,它们实际是两个独立的Reactor线程池。一个用于接收客户端的TCP连接,另一个用于处理I/O相关的读写操作,或者执行系统Task、定时任务Task等。
Netty用于接收客户端请求的线程池职责如下。
(1)接收客户端TCP连接,初始化Channel参数;
(2)将链路状态变更事件通知给ChannelPipeline。
Netty处理I/O操作的Reactor线程池职责如下。
(1)异步读取通信对端的数据报,发送读事件到ChannelPipeline;
(2)异步发送消息到通信对端,调用ChannelPipeline的消息发送接口;
(3)执行系统调用Task;
(4)执行定时任务Task,例如链路空闲状态监测定时任务。
通过调整线程池的线程个数、是否共享线程池等方式,Netty的Reactor线程模型可以在单线程、多线程和主从多线程间切换,这种灵活的配置方式可以最大程度地满足不同用户的个性化定制。
为了尽可能地提升性能,Netty在很多地方进行了无锁化的设计,例如在I/O线程内部进行串行操作,避免多线程竞争导致的性能下降问题。表面上看,串行化设计似乎CPU利用率不高,并发程度不够。但是,通过调整NIO线程池的线程参数,可以同时启动多个串行化的线程并行运行,这种局部无锁化的串行线程设计相比一个队列—多个工作线程的模型性能更优。
它的设计原理如图:
Netty的NioEventLoop读取到消息之后,直接调用ChannelPipeline的fireChannelRead (Object msg)。只要用户不主动切换线程,一直都是由NioEventLoop调用用户的Handler,期间不进行线程切换。这种串行化处理方式避免了多线程操作导致的锁的竞争,从性能角度看是最优的。
最佳实践
Netty的多线程编程最佳实践如下。
(1)创建两个NioEventLoopGroup,用于逻辑隔离NIO Acceptor和NIO I/O线程。
(2)尽量不要在ChannelHandler中启动用户线程(解码后用于将POJO消息派发到后端业务线程的除外)。
(3)解码要放在NIO线程调用的解码Handler中进行,不要切换到用户线程中完成消息的解码。
(4)如果业务逻辑操作非常简单,没有复杂的业务逻辑计算,没有可能会导致线程被阻塞的磁盘操作、数据库操作、网路操作等,可以直接在NIO线程上完成业务逻辑编排,不需要切换到用户线程。
(5)如果业务逻辑处理复杂,不要在NIO线程上完成,建议将解码后的POJO消息封装成Task,派发到业务线程池中由业务线程执行,以保证NIO线程尽快被释放,处理其他的I/O操作。
NioEventLoop
Netty的NioEventLoop并不是一个纯粹的I/O线程,它除了负责I/O的读写之外,还兼顾处理以下两类任务:
- 系统Task:通过调用NioEventLoop的execute(Runnable task)方法实现,Netty有很多系统Task,创建它们的主要原因是:当I/O线程和用户线程同时操作网络资源时,为了防止并发操作导致的锁竞争,将用户线程的操作封装成Task放入消息队列中,由I/O线程负责执行,这样就实现了局部无锁化。
- 定时任务:通过调用NioEventLoop的schedule(Runnable command, long delay, TimeUnit unit)方法实现。
正是因为NioEventLoop具备多种职责,所以它的实现比较特殊,它并不是个简单的Runnable。
它实现了EventLoop接口、EventExecutorGroup接口和ScheduledExecutorService接口,正是因为这种设计,导致NioEventLoop和其父类功能实现非常复杂。
NioEventLoop源码分析
Selector的初始化
作为NIO框架的Reactor线程,NioEventLoop需要处理网络I/O读写事件,因此它必须聚合一个多路复用器对象。
Selector的初始化非常简单,直接调用Selector.open()方法就能创建并打开一个新的Selector。Netty对Selector的selectedKeys进行了优化,用户可以通过io.netty.noKeySetOptimization开关决定是否启用该优化项。默认不打开selectedKeys优化功能。
Selector selector; private SelectedSelectionKeySet selectedKeys; private final SelectorProvider provider; NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider) { super(parent, executor, false); if (selectorProvider == null) { throw new NullPointerException("selectorProvider"); } provider = selectorProvider; selector = openSelector(); } private Selector openSelector() { final Selector selector; try { //通过provider.openSelector()创建并打开多路复用器 selector = provider.openSelector(); } catch (IOException e) { throw new ChannelException("failed to open a new selector", e); } //如果没有开启selectedKeys优化开关,就立即返回。 if (DISABLE_KEYSET_OPTIMIZATION) { return selector; } //如果开启了优化开关,需要通过反射的方式从Selector实例中获取selectedKeys和publicSelectedKeys, //将上述两个成员变量设置为可写,通过反射的方式使用Netty构造的selectedKeys包装类selectedKeySet将原JDK的selectedKeys替换掉。 try { SelectedSelectionKeySet selectedKeySet = new SelectedSelectionKeySet(); Class<?> selectorImplClass = Class.forName("sun.nio.ch.SelectorImpl", false, ClassLoader.getSystemClassLoader()); // Ensure the current selector implementation is what we can instrument. if (!selectorImplClass.isAssignableFrom(selector.getClass())) { return selector; } Field selectedKeysField = selectorImplClass.getDeclaredField("selectedKeys"); Field publicSelectedKeysField = selectorImplClass.getDeclaredField("publicSelectedKeys"); selectedKeysField.setAccessible(true); publicSelectedKeysField.setAccessible(true); selectedKeysField.set(selector, selectedKeySet); publicSelectedKeysField.set(selector, selectedKeySet); selectedKeys = selectedKeySet; logger.trace("Instrumented an optimized java.util.Set into: {}", selector); } catch (Throwable t) { selectedKeys = null; logger.trace("Failed to instrument an optimized java.util.Set into: {}", selector, t); } return selector; }
run方法的实现
@Override protected void run() { //所有的逻辑操作都在for循环体内进行,只有当NioEventLoop接收到退出指令的时候,才退出循环,否则一直执行下去,这也是通用的NIO线程实现方式。 for (;;) { //首先需要将wakenUp还原为false,并将之前的wakeup状态保存到oldWakenUp变量中。 oldWakenUp = wakenUp.getAndSet(false); try { //通过hasTasks()方法判断当前的消息队列中是否有消息尚未处理 if (hasTasks()) { //如果有则调用selectNow()方法立即进行一次select操作,看是否有准备就绪的Channel需要处理。 //Selector的selectNow()方法会立即触发Selector的选择操作,如果有准备就绪的Channel,则返回就绪Channel的集合,否则返回0。 //选择完成之后,再次判断用户是否调用了Selector的wakeup方法,如果调用,则执行selector.wakeup()操作。 selectNow(); } else { //执行select()方法,由Selector多路复用器轮询,看是否有准备就绪的Channel。 //取当前系统的纳秒时间,调用delayNanos()方法计算获得NioEventLoop中定时任务的触发时间。 //计算下一个将要触发的定时任务的剩余超时时间,将它转换成毫秒,为超时时间增加0.5毫秒的调整值。 //对剩余的超时时间进行判断,如果需要立即执行或者已经超时,则调用selector.selectNow()进行轮询操作,将selectCnt设置为1,并退出当前循环。 //将定时任务剩余的超时时间作为参数进行select操作,每完成一次select操作,对select计数器selectCnt加1。 //Select操作完成之后,需要对结果进行判断,如果存在下列任意一种情况,则退出当前循环: //if (selectedKeys != 0 || oldWakenUp || wakenUp.get() || hasTasks()) {break} //1.有Channel处于就绪状态,selectedKeys不为0,说明有读写事件需要处理; //2.oldWakenUp为true; //3.系统或者用户调用了wakeup操作,唤醒当前的多路复用器; //4.消息队列中有新的任务需要处理。 //如果本次Selector的轮询结果为空,也没有wakeup操作或是新的消息需要处理,则说明是个空轮询. //有可能触发了JDK的epoll bug,它会导致Selector的空轮询,使I/O线程一直处于100%状态。 //截止到当前最新的JDK7版本,该bug仍然没有被完全修复。所以Netty需要对该bug进行规避和修正。 //该Bug的修复策略如下: //(1)对Selector的select操作周期进行统计; //(2)每完成一次空的select操作进行一次计数; //(3)在某个周期(例如100ms)内如果连续发生N次空轮询,说明触发了JDK NIO的epoll()死循环bug。 //监测到Selector处于死循环后,需要通过重建Selector的方式让系统恢复正常,重建步骤如下: //(1)首先通过inEventLoop()方法判断是否是其他线程发起的rebuildSelector, //如果由其他线程发起,为了避免多线程并发操作Selector和其他资源,需要将rebuildSelector封装成Task, //放到NioEventLoop的消息队列中,由NioEventLoop线程负责调用,这样就避免了多线程并发操作导致的线程安全问题。 //(2)调用openSelector方法创建并打开新的Selector //(3)通过循环,将原Selector上注册的SocketChannel从旧的Selector上去注册,重新注册到新的Selector上,并将老的Selector关闭。 //过销毁旧的、有问题的多路复用器,使用新建的Selector,就可以解决空轮询Selector导致的I/O线程CPU占用100%的问题。 select(); //判断用户是否调用了Selector的wakeup方法 if (wakenUp.get()) { //如果调用,则执行selector.wakeup()操作。 selector.wakeup(); } } cancelledKeys = 0; final long ioStartTime = System.nanoTime(); needsToSelectAgain = false; //如果轮询到了处于就绪状态的SocketChannel,则需要处理网络I/O事件 if (selectedKeys != null) { processSelectedKeysOptimized(selectedKeys.flip()); } else { //由于默认未开启selectedKeys优化功能,所以会进入processSelectedKeysPlain分支执行。 //1.对SelectionKey进行保护性判断,如果为空则返回。 //2.获取SelectionKey的迭代器进行循环操作,通过迭代器获取SelectionKey和SocketChannel的附件对象, //3.将已选择的选择键从迭代器中删除,防止下次被重复选择和处理 //4.对SocketChannel的附件类型进行判读, //如果是AbstractNioChannel类型,说明它是NioServerSocketChannel或者NioSocketChannel,需要进行I/O读写相关的操作 //步骤如下: //--首先从NioServerSocketChannel或者NioSocketChannel中获取其内部类Unsafe,判断当前选择键是否可用, //--如果不可用,则调用Unsafe的close方法,释放连接资源。 //--如果选择键可用,则继续对网络操作位进行判断,如下: //----如果是读或者连接操作,则调用Unsafe的read方法。此处Unsafe的实现是个多态 //对于NioServerSocketChannel,它的读操作就是接收客户端的TCP连接。 //对于NioSocketChannel,它的读操作就是从SocketChannel中读取ByteBuffer。 //----如果网络操作位为写,则说明有半包消息尚未发送完成,需要继续调用flush方法进行发送 //----如果网络操作位为连接状态,则需要对连接结果进行判读,在进行finishConnect判断之前,需要将网络操作位进行修改,注销掉SelectionKey.OP_CONNECT。 //如果它是NioTask,则对其进行类型转换,调用processSelectedKey进行处理。由于Netty自身没实现NioTask接口,所以通常情况下系统不会执行该分支,除非用户自行注册该Task到多路复用器。 processSelectedKeysPlain(selector.selectedKeys()); } //由于NioEventLoop需要同时处理I/O事件和非I/O任务,为了保证两者都能得到足够的CPU时间被执行,Netty提供了I/O比例供用户定制。 //如果I/O操作多于定时任务和Task,则可以将I/O比例调大,反之则调小,默认值为50%。 //Task的执行时间根据本次I/O操作的执行时间计算得来。 final long ioTime = System.nanoTime() - ioStartTime; final int ioRatio = this.ioRatio;//50% //处理完I/O事件之后,NioEventLoop需要执行非I/O操作的系统Task和定时任务 //首先从定时任务消息队列中弹出消息进行处理,如果消息队列为空,则退出循环。 //根据当前的时间戳进行判断,如果该定时任务已经或者正处于超时状态,则将其加入到执行Task Queue中,同时从延时队列中删除。 //定时任务如果没有超时,说明本轮循环不需要处理,直接退出即可。 //执行Task Queue中原有的任务和从延时队列中复制的已经超时或者正处于超时状态的定时任务 //由于获取系统纳秒时间是个耗时的操作,每次循环都获取当前系统纳秒时间进行超时判断会降低性能。 //为了提升性能,每执行60次循环判断一次,如果当前系统时间已经到了分配给非I/O操作的超时时间,则退出循环。 //这是为了防止由于非I/O任务过多导致I/O操作被长时间阻塞。 runAllTasks(ioTime * (100 - ioRatio) / ioRatio); //判断系统是否进入优雅停机状态,如果处于关闭状态。 if (isShuttingDown()) { //调用closeAll方法,释放资源。遍历获取所有的Channel,调用它的Unsafe.close()方法关闭所有链路,释放线程池、ChannelPipeline和ChannelHandler等资源。 closeAll(); //并让NioEventLoop线程退出循环,结束运行。 if (confirmShutdown()) { break; } } } catch (Throwable t) { logger.warn("Unexpected exception in the selector loop.", t); // Prevent possible consecutive immediate failures that lead to // excessive CPU consumption. try { Thread.sleep(1000); } catch (InterruptedException e) { // Ignore. } } } }