jetty的NIO线程模型

简介:

概述

jetty NIO是典型reactor模型,如下图所示:

即:mainReactor负责监听server socket,接受新连接,并将建立的socket分派给subReactor。subReactor负责多路分离已连接的socket,读写网络数据,扔给worker线程池来处理。本文主要是讲解jetty中mainReactor、subReactor、线程池的实现。

mainReactor

jetty中的server就相当于一个容器,一个jetty容器包含多个连接器和一个线程池,连接器实现了LifeCycle接口,随容器启动而启动,下图是连接器启动后,监听server socket,建立连接的过程:

可见,jetty利用了线程池来建立连接,每一个连接任务被当成一个job被放到了job队列里面,负责连接的线程会从队列中取出任务来执行,将得到的ServerSocket交给subReactor,下面来看subReactor的实现。

subReactor

这里需要提一下jetty nio很重要的一个类SelectorManager,它负责channel注册,select,wakeup等操作。在SelectorManager中有SelectSet数组,可以把SelectSet理解为SelectorManager的代理,因为真正做事的是SelectSet,这里面SelectSet设计为一个数组,应该也是分而治之的思想,让一个selector监听更少的selectionkey。

SelectSet中有一个非常重要的成员changes,changes中存放了所有有变化的channel、endpoint、attachement。分别在以下情况触发addChannel方法:当有新的通道加入时,当有新的事件到来时,当有数据到来时。

subReactor的执行流程如下图:

在这里导致addChange除了selectorManager.register之外,还有endpoint.updatekey()以及selectionkey数据有变化时等等。

ThreadPool

jetty的线程池相当简单,其实mainReactor与subReactor共用同一个线程池,线程池的实现类是QueuedThreadPool,当然在jetty.xml中可以设置自己的线程池类。简单看下线程池的run方法

 

[java] view plain copy

 

 print?

  1. private Runnable _runnable = new Runnable()  
  2.  {  
  3.      public void run()  
  4.      {  
  5.          boolean shrink=false;  
  6.          try  
  7.          {  
  8.              Runnable job=_jobs.poll();  
  9.              while (isRunning())  
  10.              {  
  11.                  // Job loop  
  12.                  while (job!=null && isRunning())  
  13.                  {  
  14.                      runJob(job);  
  15.                      job=_jobs.poll();  
  16.                  }  
  17.  
  18.                  // Idle loop  
  19.                  try  
  20.                  {  
  21.                      _threadsIdle.incrementAndGet();  
  22.  
  23.                      while (isRunning() && job==null)  
  24.                      {  
  25.                          if (_maxIdleTimeMs<=0)  
  26.                              job=_jobs.take();  
  27.                          else  
  28.                          {  
  29.                              // maybe we should shrink?  
  30.                              final int size=_threadsStarted.get();  
  31.                              if (size>_minThreads)  
  32.                              {  
  33.                                  long last=_lastShrink.get();  
  34.                                  long now=System.currentTimeMillis();  
  35.                                  if (last==0 || (now-last)>_maxIdleTimeMs)  
  36.                                  {  
  37.                                      shrink=_lastShrink.compareAndSet(last,now) &&  
  38.                                      _threadsStarted.compareAndSet(size,size-1);  
  39.                                      if (shrink)  
  40.                                          return;  
  41.                                  }  
  42.                              }  
  43.                              job=idleJobPoll();  
  44.                          }  
  45.                      }  
  46.                  }  
  47.                  finally  
  48.                  {  
  49.                      _threadsIdle.decrementAndGet();  
  50.                  }  
  51.              }  
  52.          }  
  53.          catch(InterruptedException e)  
  54.          {  
  55.                ...  
  56.          }  
  57.      }  
  58.  };  

1、线程池有个最小线程数_minThreads=8,当线程池启动时会创建_minThreads个线程,并启动它们。第12行,线程从任务队列中取出一个任务,并执行。这里使用了while循环表示这里会阻塞等待任务执行完,当任务队列中没有任务时,才会退出while循环;

 

2、退出while循环后,这个线程就空闲了,在这里需要有个回收策略,在等待_maxIdleTimeMs时间后,如果当前线程数大于_minThreads时,就会回收这个线程。

那么线程数什么时候会大于_minThreads?来看看dispatch()方法中的核心代码

 

[java] view plain copy

 

 print?

  1. // If we had no idle threads or the jobQ is greater than the idle threads  
  2.               if (idle==0 || jobQ>idle)  
  3.               {  
  4.                   int threads=_threadsStarted.get();  
  5.                   if (threads<_maxThreads)  
  6.                       startThread(threads);  
  7.               }  

如果没有空闲的线程或者空闲线程数太少,在保证线程数没有超过_maxThreads时会新建线程。


原文链接:[http://wely.iteye.com/blog/2360486]

相关文章
|
8月前
|
Java 应用服务中间件 API
从零手写实现 tomcat-06-servlet bio/thread/nio/netty 池化处理
该文介绍了逐步改进的网络服务器实现,从最初的 BIO 基础版到使用线程池的 BIO+Thread,再到 NIO 版本和 NIO+Thread,最后展示了一个使用 Netty 框架的简洁实现。文章旨在说明如何解决阻塞问题,并对比不同模型的优劣,最终推荐使用 Netty 以简化 NIO 编程。
|
Java Maven Spring
使用netty实现nio web服务器
使用netty实现nio web服务器
113 0
|
8月前
|
JSON 分布式计算 网络协议
netty实现tomcat(简易版)
netty实现tomcat(简易版)
91 0
|
8月前
|
移动开发 编解码 网络协议
用Java的BIO和NIO、Netty来实现HTTP服务器(三) 用Netty实现
用Java的BIO和NIO、Netty来实现HTTP服务器(三) 用Netty实现
|
8月前
|
存储 缓存 监控
Netty基础篇:详解Netty底层NIO
Netty基础篇:详解Netty底层NIO
|
消息中间件 网络协议 小程序
Jetty、Netty、Tomcat、Undertow
Jetty、Netty、Tomcat、Undertow
612 0
|
Java 应用服务中间件 Android开发
Java 线程池之Jetty 线程池学习总结
Java 线程池之Jetty 线程池学习总结
307 0
|
监控 网络协议 Java
Netty源码分析之NIO
Socket是两台主机之间逻辑连接的端点。TCP/IP是传输层协议,定义数据如何在忘了中进行传输。HTTP是应用成协议,主要用来定义规范,包装数据,方便数据处理。Socket是通信的基石,是支持TCP/IP协议的网络通信的基本操作单元。
170 0
【JAVA】如何基于Netty实现简单的RPC 框架
【JAVA】如何基于Netty实现简单的RPC 框架
202 0
【JAVA】如何基于Netty实现简单的RPC 框架
|
设计模式 Java
从I/O多路复用到Netty,还要跨过Java NIO包(二)
从I/O多路复用到Netty,还要跨过Java NIO包(二)
191 0
从I/O多路复用到Netty,还要跨过Java NIO包(二)