Java网络编程与NIO详解10:深度解读Tomcat中的NIO模型

简介: 这位大侠,这是我的公众号:程序员江湖。 分享程序员面试与技术的那些事。 干货满满,关注就送。  转自:http://www.linkedkeeper.com/detail/blog.action?bid=1046 一、I/O复用模型解读 Tomcat的NIO是基于I/O复用来实现的。

微信公众号【Java技术江湖】一位阿里 Java 工程师的技术小站。(关注公众号后回复”Java“即可领取 Java基础、进阶、项目和架构师等免费学习资料,更有数据库、分布式、微服务等热门技术学习视频,内容丰富,兼顾原理和实践,另外也将赠送作者原创的Java学习指南、Java程序员面试指南等干货资源)

cedde63ad8a94ece7aac37190ffe807d1cefc1c2

转自:http://www.linkedkeeper.com/detail/blog.action?bid=1046

一、I/O复用模型解读

Tomcat的NIO是基于I/O复用来实现的。对这点一定要清楚,不然我们的讨论就不在一个逻辑线上。下面这张图学习过I/O模型知识的一般都见过,出自《UNIX网络编程》,I/O模型一共有阻塞式I/O,非阻塞式I/O,I/O复用(select/poll/epoll),信号驱动式I/O和异步I/O。这篇文章讲的是I/O复用。

这里先来说下用户态和内核态,直白来讲,如果线程执行的是用户代码,当前线程处在用户态,如果线程执行的是内核里面的代码,当前线程处在内核态。更深层来讲,操作系统为代码所处的特权级别分了4个级别。不过现代操作系统只用到了0和3两个级别。0和3的切换就是用户态和内核态的切换。更详细的可参照《深入理解计算机操作系统》。I/O复用模型,是同步非阻塞,这里的非阻塞是指I/O读写,对应的是recvfrom操作,因为数据报文已经准备好,无需阻塞。说它是同步,是因为,这个执行是在一个线程里面执行的。有时候,还会说它又是阻塞的,实际上是指阻塞在select上面,必须等到读就绪、写就绪等网络事件。有时候我们又说I/O复用是多路复用,这里的多路是指N个连接,每一个连接对应一个channel,或者说多路就是多个channel。复用,是指多个连接复用了一个线程或者少量线程(在Tomcat中是Math.min(2,Runtime.getRuntime().availableProcessors()))。

上面提到的网络事件有连接就绪,接收就绪,读就绪,写就绪四个网络事件。I/O复用主要是通过Selector复用器来实现的,可以结合下面这个图理解上面的叙述。

二、TOMCAT对IO模型的支持

tomcat从6以后开始支持NIO模型,实现是基于JDK的java.nio包。这里可以看到对read body 和response body是Blocking的。关于这点在第6.3节源代码阅读有重点介绍。

三、TOMCAT中NIO的配置与使用

在Connector节点配置protocol="org.apache.coyote.http11.Http11NioProtocol",Http11NioProtocol协议下默认最大连接数是10000,也可以重新修改maxConnections的值,同时我们可以设置最大线程数maxThreads,这里设置的最大线程数就是Excutor的线程池的大小。在BIO模式下实际上是没有maxConnections,即使配置也不会生效,BIO模式下的maxConnections是保持跟maxThreads大小一致,因为它是一请求一线程模式。

四、NioEndpoint组件关系图解读

我们要理解tomcat的nio最主要就是对NioEndpoint的理解。它一共包含LimitLatch、Acceptor、Poller、SocketProcessor、Excutor5个部分。LimitLatch是连接控制器,它负责维护连接数的计算,nio模式下默认是10000,达到这个阈值后,就会拒绝连接请求。Acceptor负责接收连接,默认是1个线程来执行,将请求的事件注册到事件列表。有Poller来负责轮询,Poller线程数量是cpu的核数Math.min(2,Runtime.getRuntime().availableProcessors())。

由Poller将就绪的事件生成SocketProcessor同时交给Excutor去执行。Excutor线程池的大小就是我们在Connector节点配置的maxThreads的值。

在Excutor的线程中,会完成从socket中读取httprequest,解析成HttpServletRequest对象,分派到相应的servlet并完成逻辑,然后将response通过socket发回client。在从socket中读数据和往socket中写数据的过程是直接通过socket完成读写,这时是阻塞完成的,但是在timeout控制上,使用了NIO的Selector机制,但是这个Selector并不是Poller线程维护的主Selector,而是BlockPoller线程中维护的Selector,称之为辅Selector。详细源代码可以参照 第6.3节。

五、NioEndpoint执行序列图

在下一小节NioEndpoint源码解读中我们将对步骤1-步骤11依次找到对应的代码来说明。

六、NioEndpoint源码解读

6.1、初始化

无论是BIO还是NIO,开始都会初始化连接限制,不可能无限增大,NIO模式下默认是10000。

public void startInternal() throws Exception {
    if (!running) {
        //省略代码...
        initializeConnectionLatch();
        //省略代码...
    }
}
protected LimitLatch initializeConnectionLatch() {
    if (maxConnections==-1) 
    return null;
    if (connectionLimitLatch==null) {
        connectionLimitLatch = new LimitLatch(getMaxConnections());
    }
    return connectionLimitLatch;
}

6.2、步骤解读

下面我们着重叙述跟NIO相关的流程,共分为11个步骤,分别对应上面序列图中的步骤。

步骤1:绑定IP地址及端口,将ServerSocketChannel设置为阻塞。

public void bind() throws Exception {
    //省略代码...
    serverSock.socket().bind(addr,getBacklog());
    serverSock.configureBlocking(true); 
    //省略代码...
    selectorPool.open();
}

步骤2:启动接收线程

public void startInternal() throws Exception {
    if (!running) {
        //省略代码...
        startAcceptorThreads();
    }
}
 
//这个方法实际是在它的超类AbstractEndpoint里面    
protected final void startAcceptorThreads() {
    int count = getAcceptorThreadCount();
    acceptors = new Acceptor[count];
 
    for (int i = 0; i < count; i++) {
        acceptors[i] = createAcceptor();
        Thread t = new Thread(acceptors[i], getName() + "-Acceptor-" + i);
        t.setPriority(getAcceptorThreadPriority());
        t.setDaemon(getDaemon());
        t.start();
    }
}

步骤3:ServerSocketChannel.accept()接收新连接

protected class Acceptor extends AbstractEndpoint.Acceptor {
    @Override
    public void run() {
        while (running) {
            try {
                //省略代码...
                SocketChannel socket = null;
                try {                        
                    socket = serverSock.accept();//接收新连接
                } catch (IOException ioe) {
                    //省略代码...
                    throw ioe;
                }
                //省略代码...
                if (running && !paused) {
                    if (!setSocketOptions(socket)) {
                        //省略代码...
                    }
                } else {
                    //省略代码...
                }
            } catch (SocketTimeoutException sx) {
            } catch (IOException x) {
                //省略代码...
            } catch (OutOfMemoryError oom) {
                //省略代码...
            } catch (Throwable t) {
                //省略代码...
            }
        }
    }
}

步骤4:将接收到的链接通道设置为非阻塞

步骤5:构造NioChannel对象

步骤6:register注册到轮询线程

protected boolean setSocketOptions(SocketChannel socket) {
    try {
        socket.configureBlocking(false);//将连接通道设置为非阻塞
        Socket sock = socket.socket();
        socketProperties.setProperties(sock);
        NioChannel channel = nioChannels.poll();//构造NioChannel对象
        //省略代码...
        getPoller0().register(channel);//register注册到轮询线程
    } catch (Throwable t) {
        //省略代码...
    }
    //省略代码...
}

步骤7:构造PollerEvent,并添加到事件队列

protected ConcurrentLinkedQueue<Runnable> events = new ConcurrentLinkedQueue<Runnable>();
public void register(final NioChannel socket) {
    //省略代码...
    PollerEvent r = eventCache.poll();
    //省略代码...
    addEvent(r);
}

步骤8:启动轮询线程

public void startInternal() throws Exception {
    if (!running) {
        //省略代码...
        // Start poller threads
        pollers = new Poller[getPollerThreadCount()];
        for (int i=0; i<pollers.length; i++) {
            pollers[i] = new Poller();
            Thread pollerThread = new Thread(pollers[i], getName() + "-ClientPoller-"+i);
            pollerThread.setPriority(threadPriority);
            pollerThread.setDaemon(true);
            pollerThread.start();
        }
        //省略代码...
    }
}

步骤9:取出队列中新增的PollerEvent并注册到Selector

public static class PollerEvent implements Runnable {
    //省略代码...
    @Override
    public void run() {
        if ( interestOps == OP_REGISTER ) {
            try {
                socket.getIOChannel().register(socket.getPoller().getSelector(), SelectionKey.OP_READ, key);
            } catch (Exception x) {
                log.error("", x);
            }
        } else {
            //省略代码...
        }//end if
    }//run
    //省略代码...
}

步骤10:Selector.select()

public void run() {
    // Loop until destroy() is called
    while (true) {
        try {
            //省略代码...
            try {
                if ( !close ) {
                    if (wakeupCounter.getAndSet(-1) > 0) {
                        keyCount = selector.selectNow();
                    } else {
                        keyCount = selector.select(selectorTimeout);
                    }
                    //省略代码...
                }
                //省略代码...
            } catch ( NullPointerException x ) {
                //省略代码...
            } catch ( CancelledKeyException x ) {
                //省略代码...
            } catch (Throwable x) {
                //省略代码...
            }
            //省略代码...
            Iterator<SelectionKey> iterator =
                        keyCount > 0 ? selector.selectedKeys().iterator() : null;
                     
            while (iterator != null && iterator.hasNext()) {
                SelectionKey sk = iterator.next();
                KeyAttachment attachment = (KeyAttachment)sk.attachment();
                if (attachment == null) {
                    iterator.remove();
                } else {
                    attachment.access();
                    iterator.remove();
                    processKey(sk, attachment);//此方法跟下去就是把SocketProcessor交给Excutor去执行
                }
            }//while
            //省略代码...
        } catch (OutOfMemoryError oom) {
            //省略代码...
        }
    }//while
    //省略代码...
}

步骤11:根据选择的SelectionKey构造SocketProcessor提交到请求处理线程

public boolean processSocket(NioChannel socket, SocketStatus status, boolean dispatch) {
    try {
        //省略代码...
        SocketProcessor sc = processorCache.poll();
        if ( sc == null ) 
            sc = new SocketProcessor(socket,status);
        else 
            sc.reset(socket,status);
        if ( dispatch && getExecutor()!=null ) 
            getExecutor().execute(sc);
        else 
            sc.run();
    } catch (RejectedExecutionException rx) {
        //省略代码...
    } catch (Throwable t) {
        //省略代码...
    }
    //省略代码...
}

6.3、NioBlockingSelector和BlockPoller介绍

上面的序列图有个地方我没有描述,就是NioSelectorPool这个内部类,是因为在整体理解tomcat的nio上面在序列图里面不包括它更好理解。在有了上面的基础后,我们在来说下NioSelectorPool这个类,对更深层了解Tomcat的NIO一定要知道它的作用。NioEndpoint对象中维护了一个NioSelecPool对象,这个NioSelectorPool中又维护了一个BlockPoller线程,这个线程就是基于辅Selector进行NIO的逻辑。以执行servlet后,得到response,往socket中写数据为例,最终写的过程调用NioBlockingSelector的write方法。代码如下:

public int write(ByteBuffer buf, NioChannel socket, long writeTimeout,MutableInteger lastWrite) 
                throws IOException {  
    SelectionKey key = socket.getIOChannel().keyFor(socket.getPoller().getSelector());  
    if ( key == null ) throw new IOException("Key no longer registered");  
    KeyAttachment att = (KeyAttachment) key.attachment();  
    int written = 0;  
    boolean timedout = false;  
    int keycount = 1; //assume we can write  
    long time = System.currentTimeMillis(); //start the timeout timer  
    try {  
        while ( (!timedout) && buf.hasRemaining()) {  
            if (keycount > 0) { //only write if we were registered for a write  
                //直接往socket中写数据  
                int cnt = socket.write(buf); //write the data  
                lastWrite.set(cnt);  
                if (cnt == -1)  
                    throw new EOFException();  
                written += cnt;  
                //写数据成功,直接进入下一次循环,继续写  
                if (cnt > 0) {  
                    time = System.currentTimeMillis(); //reset our timeout timer  
                    continue; //we successfully wrote, try again without a selector  
                }  
            }  
            //如果写数据返回值cnt等于0,通常是网络不稳定造成的写数据失败  
            try {  
                //开始一个倒数计数器   
                if ( att.getWriteLatch()==null || att.getWriteLatch().getCount()==0) 
                    att.startWriteLatch(1);  
                //将socket注册到辅Selector,这里poller就是BlockSelector线程  
                poller.add(att,SelectionKey.OP_WRITE);  
                //阻塞,直至超时时间唤醒,或者在还没有达到超时时间,在BlockSelector中唤醒  
                att.awaitWriteLatch(writeTimeout,TimeUnit.MILLISECONDS);  
            }catch (InterruptedException ignore) {  
                Thread.interrupted();  
            }  
            if ( att.getWriteLatch()!=null && att.getWriteLatch().getCount()> 0) {  
                keycount = 0;  
            }else {  
                //还没超时就唤醒,说明网络状态恢复,继续下一次循环,完成写socket  
                keycount = 1;  
                att.resetWriteLatch();  
            }  
            if (writeTimeout > 0 && (keycount == 0))  
                timedout = (System.currentTimeMillis() - time) >= writeTimeout;  
        } //while  
        if (timedout)   
            throw new SocketTimeoutException();  
    } finally {  
        poller.remove(att,SelectionKey.OP_WRITE);  
        if (timedout && key != null) {  
            poller.cancelKey(socket, key);  
        }  
    }  
    return written;  
}

也就是说当socket.write()返回0时,说明网络状态不稳定,这时将socket注册OP_WRITE事件到辅Selector,由BlockPoller线程不断轮询这个辅Selector,直到发现这个socket的写状态恢复了,通过那个倒数计数器,通知Worker线程继续写socket动作。看一下BlockSelector线程的代码逻辑:

public void run() {  
    while (run) {  
        try {  
            ......  
            Iterator iterator = keyCount > 0 ? selector.selectedKeys().iterator() : null;  
            while (run && iterator != null && iterator.hasNext()) {  
                SelectionKey sk = (SelectionKey) iterator.next();  
                KeyAttachment attachment = (KeyAttachment)sk.attachment();  
                try {  
                    attachment.access();  
                    iterator.remove(); ;  
                    sk.interestOps(sk.interestOps() & (~sk.readyOps()));  
                    if ( sk.isReadable() ) {  
                        countDown(attachment.getReadLatch());  
                    }  
                    //发现socket可写状态恢复,将倒数计数器置位,通知Worker线程继续  
                    if (sk.isWritable()) {  
                        countDown(attachment.getWriteLatch());  
                    }  
                }catch (CancelledKeyException ckx) {  
                    if (sk!=null) sk.cancel();  
                    countDown(attachment.getReadLatch());  
                    countDown(attachment.getWriteLatch());  
                }  
            }//while  
        }catch ( Throwable t ) {  
            log.error("",t);  
        }  
    }  
    events.clear();  
    try {  
        selector.selectNow();//cancel all remaining keys  
    }catch( Exception ignore ) {  
        if (log.isDebugEnabled())log.debug("",ignore);  
    }  
}

使用这个辅Selector主要是减少线程间的切换,同时还可减轻主Selector的负担。

七、关于性能

下面这份报告是我们压测的一个结果,跟想象的是不是不太一样?几乎没有差别,实际上NIO优化的是I/O的读写,如果瓶颈不在这里的话,比如传输字节数很小的情况下,BIO和NIO实际上是没有差别的。NIO的优势更在于用少量的线程hold住大量的连接。还有一点,我们在压测的过程中,遇到在NIO模式下刚开始的一小段时间内容,会有错误,这是因为一般的压测工具是基于一种长连接,也就是说比如模拟1000并发,那么同时建立1000个连接,下一时刻再发送请求就是基于先前的这1000个连接来发送,还有TOMCAT的NIO处理是有POLLER线程来接管的,它的线程数一般等于CPU的核数,如果一瞬间有大量并发过来,POLLER也会顿时处理不过来。

八、总结

NIO只是优化了网络IO的读写,如果系统的瓶颈不在这里,比如每次读取的字节说都是500b,那么BIO和NIO在性能上没有区别。NIO模式是最大化压榨CPU,把时间片都更好利用起来。对于操作系统来说,线程之间上下文切换的开销很大,而且每个线程都要占用系统的一些资源如内存,有关线程资源可参照这篇文章《一台java服务器可以跑多少个线程》。因此,使用的线程越少越好。而I/O复用模型正是利用少量的线程来管理大量的连接。在对于维护大量长连接的应用里面更适合用基于I/O复用模型NIO,比如web qq这样的应用。所以我们要清楚系统的瓶颈是I/O还是CPU的计算。

 

相关文章
|
10天前
|
Java
【思维导图】JAVA网络编程思维升级:URL与URLConnection的逻辑梳理,助你一臂之力!
【思维导图】JAVA网络编程思维升级:URL与URLConnection的逻辑梳理,助你一臂之力!
25 1
|
11天前
|
Kubernetes 负载均衡 网络安全
Kubernetes 网络模型与实践
【8月更文第29天】Kubernetes(K8s)是当今容器编排领域的佼佼者,它提供了一种高效的方式来管理容器化应用的部署、扩展和运行。Kubernetes 的网络模型是其成功的关键因素之一,它支持服务发现、负载均衡和集群内外通信等功能。本文将深入探讨 Kubernetes 的网络模型,并通过实际代码示例来展示服务发现和服务网格的基本概念及其实现。
30 1
|
10天前
|
XML JSON 搜索推荐
【高手过招】JAVA网络编程对决:URL与URLConnection的高级玩法,你敢挑战吗?
【高手过招】JAVA网络编程对决:URL与URLConnection的高级玩法,你敢挑战吗?
32 0
|
7天前
|
网络协议 数据安全/隐私保护 网络架构
计算机网络模型
【9月更文挑战第2天】
38 24
|
5天前
|
算法
基于GA遗传优化的离散交通网络双层规划模型设计matlab仿真
该程序基于GA遗传优化设计了离散交通网络的双层规划模型,以路段收费情况的优化为核心,并通过一氧化碳排放量评估环境影响。在MATLAB2022a版本中进行了验证,显示了系统总出行时间和区域排放最小化的过程。上层模型采用多目标优化策略,下层则确保总阻抗最小,实现整体最优解。
|
8天前
|
分布式计算 负载均衡 监控
p2p网络架构模型
P2P(Peer-to-Peer)模式是一种网络架构模型,在这种模型中,每个节点(peer)既是服务的提供者也是服务的消费者。这意味着每个参与的节点都可以直接与其他节点通信,并且可以相互提供资源和服务,例如文件共享、流媒体传输等。
17 6
|
5天前
|
网络协议 安全 网络安全
C语言 网络编程(四)常见网络模型
这段内容介绍了目前被广泛接受的三种网络模型:OSI七层模型、TCP五层模型以及TCP/IP四层模型,并简述了多个网络协议的功能与特性,包括HTTP、HTTPS、FTP、DNS、SMTP、TCP、UDP、IP、ICMP、ARP、RARP及SSH协议等,同时提到了ssh的免费开源实现openssh及其在Linux系统中的应用。
|
10天前
|
Java
【实战演练】JAVA网络编程高手养成记:URL与URLConnection的实战技巧,一学就会!
【实战演练】JAVA网络编程高手养成记:URL与URLConnection的实战技巧,一学就会!
22 3
|
10天前
|
安全 Java 网络安全
【认知革命】JAVA网络编程新视角:重新定义URL与URLConnection,让网络资源触手可及!
【认知革命】JAVA网络编程新视角:重新定义URL与URLConnection,让网络资源触手可及!
24 2
|
9天前
|
网络协议 C# 开发者
WPF与Socket编程的完美邂逅:打造流畅网络通信体验——从客户端到服务器端,手把手教你实现基于Socket的实时数据交换
【8月更文挑战第31天】网络通信在现代应用中至关重要,Socket编程作为其实现基础,即便在主要用于桌面应用的Windows Presentation Foundation(WPF)中也发挥着重要作用。本文通过最佳实践,详细介绍如何在WPF应用中利用Socket实现网络通信,包括创建WPF项目、设计用户界面、实现Socket通信逻辑及搭建简单服务器端的全过程。具体步骤涵盖从UI设计到前后端交互的各个环节,并附有详尽示例代码,助力WPF开发者掌握这一关键技术,拓展应用程序的功能与实用性。
25 0