利用Doug Lea的并发包实现带超时机制的线程池

简介:
  jdk5引入的concurrent包来自于Doug Lea的卓越贡献。最近我在查找服务器OOM的原因之后,决定采用这个包重写应用中一个servlet,这个servlet调用了一个阻塞方法,当被阻塞之后,服务器中的线程数(因为阻塞了,后续请求不断地新增线程)突然增加导致了服务器当机,因此决定采用一个线程池,并且设置超时,如果阻塞方法超过一定时间就取消线程。因为我们的项目仍然跑在jdk 1.4.2上面,短期内不可能升级到jdk5,还是要利用这个并发包。去 这里下载源码并自己打包成jar,加入项目的lib,然后利用 PooledExecutorTimedCallable来实现我们的需求。首先是线程池,相当简单:
import  EDU.oswego.cs.dl.util.concurrent.BoundedBuffer;
import  EDU.oswego.cs.dl.util.concurrent.PooledExecutor;
/**
 * <p>类说明:线程池</p>
 * <p>注意事项:</p>
 * <pre></pre>
 * <p>创建日期:Sep 7, 2007 1:25:33 PM</p>
@author :dennis zane
 * 
@version  $Id:$
 
*/
public   class  MyThreadPool{
    
private   static  PooledExecutor exec  =   new  PooledExecutor( new  BoundedBuffer(
            
20 ),  30 );
    
static  {
        exec.setKeepAliveTime(
1000   *   60   *   5 );
        exec.createThreads(
5 );
        exec.setMinimumPoolSize(
4 );
    }

    
public   static   void  execute( final  Runnable r)  throws InterruptedException {
    
    exec.execute(r);
   
}

    
public   static   void  shutdown() {
        exec.shutdownAfterProcessingCurrentlyQueuedTasks();
        exec 
=   null ;
    }

}


    静态初始化并设置一个PoolExecutor,设置空闲线程的存活时间为5分钟,设置最小线程数为4,最大线程数为30,一开始创建5个线程以待使用(根据各自的应用调整这些参数),另外提供了shutdown方法以供ServeltContextListener的contextDestroyed方法调用以关闭线程池。那么,结合TimedCallable来实现提交线程的超时机制,调用类似:
           //设置超时时间
           private static final long timeout = 1000;
           ......
           ......
       try{
           Callable callable  =   new  Callable() {
                
public  Object call() {
                   
return new YourProgram().run();                   
                }
            };
            TimedCallable timedCallable 
=   new  TimedCallable(callable, timeout);
            FutureResult future 
=   new  FutureResult();
            Runnable cmd 
=  future.setter(timedCallable);
            
// 提交给线程池来执行
            MyThreadPool.execute(cmd);
            //获取任务结果
            YourObject obj
=  (YourObject) future.get();
            ......
            ......
         }
catch (InterruptedException e) {
            if (e instanceof TimeoutException) {
                 log.error("任务超时");
                  ...
             }
         }catch(InvocationTargetException e)
         {
            //清理任务..
         }
         ......

如果不是很理解这段代码,那么也许你应该先看看jdk5引入的Future、FutureTask等类,或者看看这里的文档。对于超时时间的大小估算,你应当在生产环境中计算该阻塞方法的调用时间,正常运行一段时间,利用脚本语言(比如ruby、python)分析日志以得到一个调用花费时间的最大值作为timeout,这里的单位是毫秒。而线程池大小的估算,要看你提交给线程执行的任务的类型:如果是计算密集型的任务,那么线程池的大小一般是(cpu个数+1);如果是IO密集型的任务(一般的web应用皆是此类),那么估算有一个公式,
假设N-cpu是cpu个数,U是目标CPU使用率,W/C是任务的等待时间与计算时间的比率,那么最优的池大小等于:
N-threads=N-cpu*U*(1+W/C)

文章转自庄周梦蝶  ,原文发布时间

目录
相关文章
|
3月前
|
Java
如何理解网络阻塞 I/O:BIO
如何理解网络阻塞 I/O:BIO
|
11月前
|
存储 缓存 Java
【Java并发编程 十二】JUC并发包下线程池(下)
【Java并发编程 十二】JUC并发包下线程池(下)
53 0
|
11月前
|
存储 缓存 监控
【Java并发编程 十二】JUC并发包下线程池(上)
【Java并发编程 十二】JUC并发包下线程池(上)
48 0
|
存储 安全 算法
《我要进大厂》- Java并发 夺命连环10问,你能坚持到第几问?(进程&线程 | 并行&并发 | 上下文切换 | 线程死锁 | 线程创建)
《我要进大厂》- Java并发 夺命连环10问,你能坚持到第几问?(进程&线程 | 并行&并发 | 上下文切换 | 线程死锁 | 线程创建)
《我要进大厂》- Java并发 夺命连环10问,你能坚持到第几问?(进程&线程 | 并行&并发 | 上下文切换 | 线程死锁 | 线程创建)
|
Java API
java并发原理实战(3) -- 线程的中断和初始化
java并发原理实战(3) -- 线程的中断和初始化
199 0
java并发原理实战(3) -- 线程的中断和初始化
|
程序员
面试官问:如何中断一个线程,具体如何实现?(上)
面试官问:如何中断一个线程,具体如何实现?(上)
128 0
|
Java API
面试官问:如何中断一个线程,具体如何实现?(下)
面试官问:如何中断一个线程,具体如何实现?(下)
面试官问:如何中断一个线程,具体如何实现?(下)
|
Java 调度 消息中间件
09.RxJava线程调度源码分析
上一次分析了RxJava的运作流程,其中的线程调度方面只是简单提了两句,以我看来,线程调度是RxJava中非常重要的一环,所以今天单独拿出来分析一下。 subscribeOn observeOn subscribeOn调用可以将之前的操作加如线程池,从...
1093 0