jetty的NIO线程模型

简介:

概述

jetty NIO是典型reactor模型,如下图所示:

即:mainReactor负责监听server socket,接受新连接,并将建立的socket分派给subReactor。subReactor负责多路分离已连接的socket,读写网络数据,扔给worker线程池来处理。本文主要是讲解jetty中mainReactor、subReactor、线程池的实现。

mainReactor

jetty中的server就相当于一个容器,一个jetty容器包含多个连接器和一个线程池,连接器实现了LifeCycle接口,随容器启动而启动,下图是连接器启动后,监听server socket,建立连接的过程:

可见,jetty利用了线程池来建立连接,每一个连接任务被当成一个job被放到了job队列里面,负责连接的线程会从队列中取出任务来执行,将得到的ServerSocket交给subReactor,下面来看subReactor的实现。

subReactor

这里需要提一下jetty nio很重要的一个类SelectorManager,它负责channel注册,select,wakeup等操作。在SelectorManager中有SelectSet数组,可以把SelectSet理解为SelectorManager的代理,因为真正做事的是SelectSet,这里面SelectSet设计为一个数组,应该也是分而治之的思想,让一个selector监听更少的selectionkey。

SelectSet中有一个非常重要的成员changes,changes中存放了所有有变化的channel、endpoint、attachement。分别在以下情况触发addChannel方法:当有新的通道加入时,当有新的事件到来时,当有数据到来时。

subReactor的执行流程如下图:

在这里导致addChange除了selectorManager.register之外,还有endpoint.updatekey()以及selectionkey数据有变化时等等。

ThreadPool

jetty的线程池相当简单,其实mainReactor与subReactor共用同一个线程池,线程池的实现类是QueuedThreadPool,当然在jetty.xml中可以设置自己的线程池类。简单看下线程池的run方法

 

[java]  view plain  copy
 
  print ?
  1. private Runnable _runnable = new Runnable()  
  2.   {  
  3.       public void run()  
  4.       {  
  5.           boolean shrink=false;  
  6.           try  
  7.           {  
  8.               Runnable job=_jobs.poll();  
  9.               while (isRunning())  
  10.               {  
  11.                   // Job loop  
  12.                   while (job!=null && isRunning())  
  13.                   {  
  14.                       runJob(job);  
  15.                       job=_jobs.poll();  
  16.                   }  
  17.   
  18.                   // Idle loop  
  19.                   try  
  20.                   {  
  21.                       _threadsIdle.incrementAndGet();  
  22.   
  23.                       while (isRunning() && job==null)  
  24.                       {  
  25.                           if (_maxIdleTimeMs<=0)  
  26.                               job=_jobs.take();  
  27.                           else  
  28.                           {  
  29.                               // maybe we should shrink?  
  30.                               final int size=_threadsStarted.get();  
  31.                               if (size>_minThreads)  
  32.                               {  
  33.                                   long last=_lastShrink.get();  
  34.                                   long now=System.currentTimeMillis();  
  35.                                   if (last==0 || (now-last)>_maxIdleTimeMs)  
  36.                                   {  
  37.                                       shrink=_lastShrink.compareAndSet(last,now) &&  
  38.                                       _threadsStarted.compareAndSet(size,size-1);  
  39.                                       if (shrink)  
  40.                                           return;  
  41.                                   }  
  42.                               }  
  43.                               job=idleJobPoll();  
  44.                           }  
  45.                       }  
  46.                   }  
  47.                   finally  
  48.                   {  
  49.                       _threadsIdle.decrementAndGet();  
  50.                   }  
  51.               }  
  52.           }  
  53.           catch(InterruptedException e)  
  54.           {  
  55.                 ...  
  56.           }  
  57.       }  
  58.   };  

1、线程池有个最小线程数_minThreads=8,当线程池启动时会创建_minThreads个线程,并启动它们。第12行,线程从任务队列中取出一个任务,并执行。这里使用了while循环表示这里会阻塞等待任务执行完,当任务队列中没有任务时,才会退出while循环;

 

2、退出while循环后,这个线程就空闲了,在这里需要有个回收策略,在等待_maxIdleTimeMs时间后,如果当前线程数大于_minThreads时,就会回收这个线程。

那么线程数什么时候会大于_minThreads?来看看dispatch()方法中的核心代码

 

[java]  view plain  copy
 
  print ?
  1. // If we had no idle threads or the jobQ is greater than the idle threads  
  2.                if (idle==0 || jobQ>idle)  
  3.                {  
  4.                    int threads=_threadsStarted.get();  
  5.                    if (threads<_maxThreads)  
  6.                        startThread(threads);  
  7.                }  

如果没有空闲的线程或者空闲线程数太少,在保证线程数没有超过_maxThreads时会新建线程。


原文链接:[http://wely.iteye.com/blog/2360486]

相关文章
|
3月前
|
编解码 网络协议 API
Netty运行原理问题之Netty的主次Reactor多线程模型工作的问题如何解决
Netty运行原理问题之Netty的主次Reactor多线程模型工作的问题如何解决
|
23天前
|
并行计算 JavaScript 前端开发
单线程模型
【10月更文挑战第15天】
|
25天前
|
安全 Java
Java多线程通信新解:本文通过生产者-消费者模型案例,深入解析wait()、notify()、notifyAll()方法的实用技巧
【10月更文挑战第20天】Java多线程通信新解:本文通过生产者-消费者模型案例,深入解析wait()、notify()、notifyAll()方法的实用技巧,包括避免在循环外调用wait()、优先使用notifyAll()、确保线程安全及处理InterruptedException等,帮助读者更好地掌握这些方法的应用。
16 1
|
2月前
|
消息中间件 存储 NoSQL
剖析 Redis List 消息队列的三种消费线程模型
Redis 列表(List)是一种简单的字符串列表,它的底层实现是一个双向链表。 生产环境,很多公司都将 Redis 列表应用于轻量级消息队列 。这篇文章,我们聊聊如何使用 List 命令实现消息队列的功能以及剖析消费者线程模型 。
98 20
剖析 Redis List 消息队列的三种消费线程模型
|
1月前
|
NoSQL Redis 数据库
Redis单线程模型 redis 为什么是单线程?为什么 redis 单线程效率还能那么高,速度还能特别快
本文解释了Redis为什么采用单线程模型,以及为什么Redis单线程模型的效率和速度依然可以非常高,主要原因包括Redis操作主要访问内存、核心操作简单、单线程避免了线程竞争开销,以及使用了IO多路复用机制epoll。
47 0
Redis单线程模型 redis 为什么是单线程?为什么 redis 单线程效率还能那么高,速度还能特别快
|
1月前
|
安全 调度 C#
STA模型、同步上下文和多线程、异步调度
【10月更文挑战第19天】本文介绍了 STA 模型、同步上下文和多线程、异步调度的概念及其优缺点。STA 模型适用于单线程环境,确保资源访问的顺序性;同步上下文和多线程提高了程序的并发性和响应性,但增加了复杂性;异步调度提升了程序的响应性和资源利用率,但也带来了编程复杂性和错误处理的挑战。选择合适的模型需根据具体应用场景和需求进行权衡。
|
1月前
|
消息中间件 NoSQL 关系型数据库
【多线程-从零开始-捌】阻塞队列,消费者生产者模型
【多线程-从零开始-捌】阻塞队列,消费者生产者模型
23 0
|
4月前
|
安全 Java Linux
(七)Java网络编程-IO模型篇之从BIO、NIO、AIO到内核select、epoll剖析!
IO(Input/Output)方面的基本知识,相信大家都不陌生,毕竟这也是在学习编程基础时就已经接触过的内容,但最初的IO教学大多数是停留在最基本的BIO,而并未对于NIO、AIO、多路复用等的高级内容进行详细讲述,但这些却是大部分高性能技术的底层核心,因此本文则准备围绕着IO知识进行展开。
165 1
|
4月前
|
缓存 编译器 Go
开发与运维线程问题之Go语言的goroutine基于线程模型实现如何解决
开发与运维线程问题之Go语言的goroutine基于线程模型实现如何解决
55 3
|
4月前
|
算法 调度 人工智能
人工智能线程问题之无锁化编程如何解决
人工智能线程问题之无锁化编程如何解决
48 2