Tomcat6.0连接器源码分析3

简介:

接上篇http://www.iteye.com/topic/994833
我们看到JioEndPoint的start方法有下面一段代码:
  // Create worker collection
  if (executor == null) {
      workers = new WorkerStack(maxThreads);
  }
在上一篇中,executor一直都为null。什么时候不为空呢,这里因为Server.xml文件里的Connector元素还有一个executor属性,它指向一个Executor属性能名字。(参考:
http://tomcat.apache.org/tomcat-6.0-doc/config/http.html)。
在连接器上方有一个默认的Executor元素:
<Executor name="tomcatThreadPool" namePrefix="catalina-exec-" 
        maxThreads="150" minSpareThreads="4"/>
不过当前他是注释掉的,我们把他打开。
这个Executor默认的类是org.apache.catalina.core. StandardThreadExecutor,
当然你可以通过它的className属性使用自己的类,但必须像StandardThreadExecutor
一样实现Executor接口。

StandardThreadExecutor是在Doug lea大爷的ThreadPoolExecutor类构造出来的:
public void start() throws LifecycleException {
        lifecycle.fireLifecycleEvent(BEFORE_START_EVENT, null);
        TaskQueue taskqueue = new TaskQueue();
        TaskThreadFactory tf = new TaskThreadFactory(namePrefix);
        lifecycle.fireLifecycleEvent(START_EVENT, null);
        executor = new ThreadPoolExecutor(getMinSpareThreads(), getMaxThreads(), maxIdleTime, TimeUnit.MILLISECONDS,taskqueue, tf) {
   @Override
   protected void afterExecute(Runnable r, Throwable t) {
    AtomicInteger atomic = submittedTasksCount;
    if(atomic!=null) {
     atomic.decrementAndGet();
    }
   }
        };
        taskqueue.setParent( (ThreadPoolExecutor) executor);
        submittedTasksCount = new AtomicInteger();
        lifecycle.fireLifecycleEvent(AFTER_START_EVENT, null);
}

剩下的工作就是在Connector里添加属性executor,来引用tomcatThreadPool。
<Connector executor="tomcatThreadPool"
               port="8080" protocol="HTTP/1.1" 
               connectionTimeout="20000" 
               redirectPort="8443" />
这样executor就横空出世。

那么开始那段代码就不需要WorkerStack了。Tomcat自个搞了个WorkerStack出来,
还是给Doug lea大爷一个很大的面子哦。
这样processSocket不需大费周折,直接交给executor去执行就OK了:
protected boolean processSocket(Socket socket) {
        try {
            if (executor == null) {
                getWorkerThread().assign(socket);
            } else {
                executor.execute(new SocketProcessor(socket));
            }
        } catch (Throwable t) {
            // This means we got an OOM or similar creating a thread, or that
            // the pool and its queue are full
            log.error(sm.getString("endpoint.process.fail"), t);
            return false;
        }
        return true;
    }

  
上一篇说讨论BIO模式的连接器,无外乎创建ServerSocket -> 绑定(Bind)端口
->接受(accept)连接->取出一个线程处理Socket 的过程。

我们继续讨论NIO模型的连接器,首先更改连接器协议为org.apache.coyote.http11. Http11NioProtocol:
  <Connector executor="tomcatThreadPool"
               port="8080" protocol="org.apache.coyote.http11. Http11NioProtocol" 
               connectionTimeout="20000" 
               redirectPort="8443" />
很简单,这样就启动了NIO模型。
与BIO模型一致,Http11NioProtocol在init方法里初始化NioEndpoint,我们讨论BIO模型时提到JioEndPoint,以后还会提到AprEndpoint。同样后续工作主要由NioEndpoint来完成。

首先看NioEndpoint的init方法:
/**
     * Initialize the endpoint.
     */
    public void init()
        throws Exception {

        if (initialized)
            return;

 serverSock = ServerSocketChannel.open();
        serverSock.socket().setPerformancePreferences(socketProperties.getPerformanceConnectionTime(),
                                                      socketProperties.getPerformanceLatency(),
                                                      socketProperties.getPerformanceBandwidth());
        InetSocketAddress addr = (address!=null?new InetSocketAddress(address,port):new InetSocketAddress(port));
        serverSock.socket().bind(addr,backlog); 
        serverSock.configureBlocking(true); //mimic APR behavior
        serverSock.socket().setSoTimeout(getSocketProperties().getSoTimeout());
        // Initialize thread count defaults for acceptor, poller
        if (acceptorThreadCount == 0) {
            // FIXME: Doesn't seem to work that well with multiple accept threads
            acceptorThreadCount = 1;
        }
        if (pollerThreadCount <= 0) {
            //minimum one poller thread
            pollerThreadCount = 1;
        }
        stopLatch = new CountDownLatch(pollerThreadCount);
        //SSL相关部分我们跳过
        if (oomParachute>0) reclaimParachute(true);
        selectorPool.open();
        initialized = true;
}
首先创建ServerSocketChannel,绑定监听端口。这里有一点不同之外,在NIO模式下,我们传统的方式像这样建立服务监听:
serverSock = ServerSocketChannel.open();
 serverSock.socket().bind(new InetSocketAddress(8888));
Selector selector = Selector.open();
serverSock.configureBlocking(false);
serverSock.register(selector, SelectionKey.OP_ACCEPT);
while (selector.select() > 0) {
  //处理连接请求
}
但Tomcat采用了Blocking模式接收连接请求,在读写的时候采用No-Blocking模式。这种做法和weblogic的做法一致(我反编译看到的),像MINA等开源框架还是采用传统的方式。
我想这里有两个解释,一是接收线程的工作就是等待新的连接,没其他事可以,用No-Blocking已没有意义,另外selector.select()在一些OS上还会出现CPU 100%的空转现象,如果有其他见解,请告知我,在此感谢。
代码行设定了两个变量:acceptorThreadCount,pollerThreadCount。 前者是acceptor线程的个数,后者是读写线程的个数。这个模型是Dong Lea在《Scalable IO in Java》中的“Using Multiple Reactors”。acceptorThreadCount一般设定为1,而pollerThreadCount Tomcat给的默认值为CPU个数(Runtime.getRuntime().availableProcessors());

初始化完毕,我们接着看start方法:
public void start()
        throws Exception {
        // Initialize socket if not done before
        if (!initialized) {
            init();
        }
        if (!running) {
            running = true;
            paused = false;
            
            // Create worker collection
            if (getUseExecutor()) {
                if ( executor == null ) {
                    TaskQueue taskqueue = new TaskQueue();
                    TaskThreadFactory tf = new TaskThreadFactory(getName() + "-exec-");
                    executor = new ThreadPoolExecutor(getMinSpareThreads(), getMaxThreads(), 60, TimeUnit.SECONDS,taskqueue, tf);
                    taskqueue.setParent( (ThreadPoolExecutor) executor, this);
                }
            } else if ( executor == null ) {//avoid two thread pools being created
                workers = new WorkerStack(maxThreads);
            }

            // Start poller threads
            pollers = new Poller[getPollerThreadCount()];
            for (int i=0; i<pollers.length; i++) {
                pollers[i] = new Poller();
                Thread pollerThread = new Thread(pollers[i], getName() + "-ClientPoller-"+i);
                pollerThread.setPriority(threadPriority);
                pollerThread.setDaemon(true);
                pollerThread.start();
            }

            // Start acceptor threads
            for (int i = 0; i < acceptorThreadCount; i++) {
                Thread acceptorThread = new Thread(new Acceptor(), getName() + "-Acceptor-" + i);
                acceptorThread.setPriority(threadPriority);
                acceptorThread.setDaemon(daemon);
                acceptorThread.start();
            }
        }
    }
到行创建工作线程集合,用来处理请求过来的socket,这些在上两篇我们已经讨论过了。
有一点不同,在配置BIO连接器时,连接器元素有executor属性,它指向一个Executor属性的名字,在NIO模型又加了一个useExecutor属性。在executor为空的时候,useExecutor会起作用。默认情况下useExecutor为true,为什么这么做,tomcat解释是:
Set to true to use the NIO thread pool executor. The default value is true. If set to false, it uses a thread pool based on a stack for its execution. Generally, using the executor yields a little bit slower performance, but yields a better fairness for processing connections in a high load environment as the traffic gets queued through a FIFO queue. If set to true(default) then the max pool size is the maxThreads attribute and the core pool size is the minSpareThreads。
大致是说用WorkerStack性能虽然会比用ThreadPoolExecutor好一些,但ThreadPoolExecutor采用FIFO队列,提供了更好的公平性。这在高并发情况下会取得比较好的效里。

到行创建并启动Acceptor线程用接收请求,到行创建并启动读写线程,读取request和发送respone.先看Acceptor线程

protected class Acceptor implements Runnable {
        /**
         * The background thread that listens for incoming TCP/IP connections and
         * hands them off to an appropriate processor.
         */
        public void run() {
            // Loop until we receive a shutdown command
            while (running) {
                // Loop if endpoint is paused
                while (paused) {
                    try {
                        Thread.sleep(1000);
                    } catch (InterruptedException e) {
                        // Ignore
                    }
                }
                try {
                    // Accept the next incoming connection from the server socket
                    SocketChannel socket = serverSock.accept();
                    // Hand this socket off to an appropriate processor
                    //TODO FIXME - this is currently a blocking call, meaning we will be blocking
                    //further accepts until there is a thread available.
                    if ( running && (!paused) && socket != null ) {
                        //processSocket(socket);
                        if (!setSocketOptions(socket)) {
                            try {
                                socket.socket().close();
                                socket.close();
                            } catch (IOException ix) {
                                if (log.isDebugEnabled())
                                    log.debug("", ix);
                            }
                        } 
                    }
                }catch (SocketTimeoutException sx) {
                    //normal condition
                }catch ( IOException x ) {
                    if ( running ) log.error(sm.getString("endpoint.accept.fail"), x);
                } catch (OutOfMemoryError oom) {
                    try {
                        oomParachuteData = null;
                        releaseCaches();
                        log.error("", oom);
                    }catch ( Throwable oomt ) {
                        try {
                            try {
                                System.err.println(oomParachuteMsg);
                                oomt.printStackTrace();
                            }catch (Throwable letsHopeWeDontGetHere){}
                        }catch (Throwable letsHopeWeDontGetHere){}
                    }
                } catch (Throwable t) {
                    log.error(sm.getString("endpoint.accept.fail"), t);
                }
            }//while
        }//run
    }
没什么好说的,和BIO模式一样,主要看setSocketOptions方法:
protected boolean setSocketOptions(SocketChannel socket) {
        // Process the connection
        try {
            //disable blocking, APR style, we are gonna be polling it
            socket.configureBlocking(false);
            Socket sock = socket.socket();
            socketProperties.setProperties(sock);

            NioChannel channel = nioChannels.poll();
            if ( channel == null ) {
                // SSL 部分去除了
                  // normal tcp setup
                 NioBufferHandler bufhandler = new NioBufferHandler(socketProperties.getAppReadBufSize(),
                                                                       socketProperties.getAppWriteBufSize(),
                                                                       socketProperties.getDirectBuffer());

                 channel = new NioChannel(socket, bufhandler);
            } else {                
                channel.setIOChannel(socket);
                if ( channel instanceof SecureNioChannel ) {
                    SSLEngine engine = createSSLEngine();
                    ((SecureNioChannel)channel).reset(engine);
                } else {
                    channel.reset();
                }
            }
            getPoller0().register(channel);
        } catch (Throwable t) {
            try {
                log.error("",t);
            }catch ( Throwable tt){}
            // Tell to close the socket
            return false;
        }
        return true;
    }
这里有一个NioChannel 对像池nioChannels,主要目的大概是重用对像,降低GC,因为
NioBufferHandler是需要申请内存空间的。把内存空间清0重用比重新向操作系统申请要快一些。NioChannel只是SocketChannel的包装器,目的是使得SSL socket channel与
Non-SSL socket channel保持逻辑上的一致。

getPoller0采用轮循的方式取出一个Poller即读写线程。我们主要看一下Poller的register(channel)方法:

public void register(final NioChannel socket)
        {
            socket.setPoller(this);
            KeyAttachment key = keyCache.poll();
            final KeyAttachment ka = key!=null?key:new KeyAttachment();
            ka.reset(this,socket,getSocketProperties().getSoTimeout());
            PollerEvent r = eventCache.poll();
            ka.interestOps(SelectionKey.OP_READ);//this is what OP_REGISTER turns into.
            if ( r==null) r = new PollerEvent(socket,ka,OP_REGISTER);
            else r.reset(socket,ka,OP_REGISTER);
            addEvent(r);
        }
首先构造一个KeyAttachment,然后构造一个PollerEvent事件,并添加到事件队列。
记住这里Tomcat虚拟了一个OP_REGISTER。待续…
 


本文转自 anranran 51CTO博客,原文链接:http://blog.51cto.com/guojuanjun/541028

相关文章
|
4月前
|
XML 应用服务中间件 Apache
Tomcat AJP连接器配置secretRequired=“true“,但是属性secret确实空或者空字符串,这样的组合是无效的。
Tomcat AJP连接器配置secretRequired=“true“,但是属性secret确实空或者空字符串,这样的组合是无效的。
|
12月前
|
Arthas 弹性计算 安全
优雅上下线之如何安全的关闭Tomcat持久连接
优雅上下线之如何安全的关闭Tomcat持久连接
361 3
|
11月前
|
Web App开发 应用服务中间件
解决在访问tomcat时出现连接失败,Firefox 无法建立到 localhost:8080 服务器的连接的问题~
解决在访问tomcat时出现连接失败,Firefox 无法建立到 localhost:8080 服务器的连接的问题~
168 0
|
1天前
|
人工智能 前端开发 Java
【Tomcat源码分析】启动过程深度解析 (二)
本文深入探讨了Tomcat启动Web应用的过程,重点解析了其加载ServletContextListener及Servlet的机制。文章从Bootstrap反射调用Catalina的start方法开始,逐步介绍了StandardServer、StandardService、StandardEngine、StandardHost、StandardContext和StandardWrapper的启动流程。每个组件通过Lifecycle接口协调启动,子容器逐层启动,直至整个服务器完全启动。此外,还详细分析了Pipeline及其Valve组件的作用,展示了Tomcat内部组件间的协作机制。
【Tomcat源码分析】启动过程深度解析 (二)
|
14天前
|
前端开发 Java 应用服务中间件
【Tomcat源码分析 】"深入探索:Tomcat 类加载机制揭秘"
本文详细介绍了Java类加载机制及其在Tomcat中的应用。首先回顾了Java默认的类加载器,包括启动类加载器、扩展类加载器和应用程序类加载器,并解释了双亲委派模型的工作原理及其重要性。接着,文章分析了Tomcat为何不能使用默认类加载机制,因为它需要解决多个应用程序共存时的类库版本冲突、资源共享、类库隔离及JSP文件热更新等问题。最后,详细展示了Tomcat独特的类加载器设计,包括Common、Catalina、Shared、WebApp和Jsp类加载器,确保了系统的稳定性和安全性。通过这种设计,Tomcat实现了不同应用程序间的类库隔离与共享,同时支持JSP文件的热插拔。
【Tomcat源码分析 】"深入探索:Tomcat 类加载机制揭秘"
|
15天前
|
设计模式 应用服务中间件 容器
【Tomcat源码分析】Pipeline 与 Valve 的秘密花园
本文深入剖析了Tomcat中的Pipeline和Valve组件。Valve作为请求处理链中的核心组件,通过接口定义了关键方法;ValveBase为其基类,提供了通用实现。Pipeline则作为Valve容器,通过首尾相连的Valve链完成业务处理。StandardPipeline实现了Pipeline接口,提供了详细的Valve管理逻辑。通过对代码的详细分析,揭示了模板方法模式和责任链模式的应用,展示了系统的扩展性和模块化设计。
【Tomcat源码分析】Pipeline 与 Valve 的秘密花园
|
17天前
|
设计模式 人工智能 安全
【Tomcat源码分析】生命周期机制 Lifecycle
Tomcat内部通过各种组件协同工作,构建了一个复杂的Web服务器架构。其中,`Lifecycle`机制作为核心,管理组件从创建到销毁的整个生命周期。本文详细解析了Lifecycle的工作原理及其方法,如初始化、启动、停止和销毁等关键步骤,并展示了LifecycleBase类如何通过状态机和模板模式实现这一过程。通过深入理解Lifecycle,我们可以更好地掌握组件生命周期管理,提升系统设计能力。欢迎关注【码上遇见你】获取更多信息,或搜索【AI贝塔】体验免费的Chat GPT。希望本章内容对你有所帮助。
|
1月前
|
网络协议 Java 应用服务中间件
Tomcat源码分析 (一)----- 手撕Java Web服务器需要准备哪些工作
本文探讨了后端开发中Web服务器的重要性,特别是Tomcat框架的地位与作用。通过解析Tomcat的内部机制,文章引导读者理解其复杂性,并提出了一种实践方式——手工构建简易Web服务器,以此加深对Web服务器运作原理的认识。文章还详细介绍了HTTP协议的工作流程,包括请求与响应的具体格式,并通过Socket编程在Java中的应用实例,展示了客户端与服务器间的数据交换过程。最后,通过一个简单的Java Web服务器实现案例,说明了如何处理HTTP请求及响应,强调虽然构建基本的Web服务器相对直接,但诸如Tomcat这样的成熟框架提供了更为丰富和必要的功能。
|
4月前
|
前端开发 Java 应用服务中间件