一个高效的Schedular

简介:
Scheduler

package com.schedular; 

import java.util.concurrent.ScheduledFuture; 
import java.util.concurrent.ScheduledThreadPoolExecutor; 
import java.util.concurrent.ThreadPoolExecutor; 
import java.util.concurrent.TimeUnit; 

/** 
* A scheduler that internally uses a thread pool to permit concurrent execution    
* of scheduled tasks. This scheduler is preferable to a {@link java.util.Timer}    
* when the execution of one task may block long enough to delay execution of    
* other tasks. 
*/
 
public  class Scheduler  implements ShutdownCallback { 
         
         /** 
         * Specify no initial delay when scheduling a task.    
         */
 
         public  static  final  long NO_INITIAL_DELAY = 0; 
         
         private  final ScheduledThreadPoolExecutor _executor; 
         
         /** 
         * Creates a scheduler. 
         * 
         * @param poolSize The thread pool size. 
         * @throws IllegalArgumentException if the pool size is less than or equal to zero. 
         */
 
         public Scheduler( int poolSize) { 
                 if (poolSize <= 0) { 
                         throw  new IllegalArgumentException( "illegal pool size: "+poolSize); 
                } 
                 
                LoggingThreadGroup threadGroup =  new LoggingThreadGroup( "SchedulerGroup"); 
                 
                 // set pool threads as daemons in case an orderly shutdown does not occur 
                ThreadGroupFactory tFactory=    
                         new ThreadGroupFactory(threadGroup,  "Scheduler-"); 
                tFactory.createDaemonThreads( true);             
                 
                _executor =    
                      new ScheduledThreadPoolExecutor(poolSize,    
                                                                                     tFactory,    
                                                                                      new ThreadPoolExecutor.DiscardPolicy()); 
                 
                 // delayed tasks should not execute after shutdown 
                _executor.setExecuteExistingDelayedTasksAfterShutdownPolicy( false); 
                
              // prestart one thread to make sure the first execution is timely 
             _executor.prestartCoreThread(); 
        } 
         
         /** 
         * Creates and executes a periodic action that becomes enabled first after    
         * the given initial delay, and subsequently with the given period; that    
         * is executions will commence after initialDelay then initialDelay+period,    
         * then initialDelay + 2 * period, and so on. If any execution of the task    
         * encounters an exception, subsequent executions are suppressed. Otherwise,    
         * the task will only terminate via cancellation or termination of the    
         * executor. If any execution of this task takes longer than its period,    
         * then subsequent executions may start late, but will not concurrently    
         * execute. 
         *    
         * @param task The task to execute. 
         * @param initialDelay The initial delay (in msec) before the first execution. 
         * @param period The period (in msec) between successive executions. 
         * @return A ScheduledFuture that may be used to cancel task execution. 
         */
 
         public ScheduledFuture scheduleAtFixedRate(Runnable task,  long initialDelay,  long period) { 
                 return _executor.scheduleAtFixedRate(task, initialDelay, period, TimeUnit.MILLISECONDS); 
        } 
         
         /** 
         * Creates and executes a periodic action that becomes enabled first after    
         * the given initial delay, and subsequently with the given delay between    
         * the termination of one execution and the commencement of the next.    
         * If any execution of the task encounters an exception, subsequent    
         * executions are suppressed. Otherwise, the task will only terminate via    
         * cancellation or termination of the executor. 
         *    
         * @param task The task to execute. 
         * @param initialDelay The initial delay (in msec) before the first execution. 
         * @param delay The delay (in msec) between termination of one execution    
         *                            and commencement of the next. 
         * @return A ScheduledFuture that may be used to cancel task execution. 
         */
 
         public ScheduledFuture scheduleWithFixedDelay(Runnable task,  long initialDelay,  long delay) { 
                 return _executor.scheduleWithFixedDelay(task, initialDelay, delay, TimeUnit.MILLISECONDS); 
        } 
         
         /** 
         * Tries to remove from the work queue all {@link Future} 
         * tasks that have been cancelled. This method can be useful as a 
         * storage reclamation operation, that has no other impact on 
         * functionality. Cancelled tasks are never executed, but may 
         * accumulate in work queues until worker threads can actively 
         * remove them. Invoking this method instead tries to remove them now. 
         * However, this method may fail to remove tasks in 
         * the presence of interference by other threads. 
         */
 
         public  void purgeTasks() { 
                _executor.purge(); 
        } 
                 
         /** 
         * Shut down the scheduler in an orderly manner, allowing any currently    
         * executing tasks to complete. 
         */
 
         public  void shutdown() { 
                _executor.shutdown(); 
                                 
                 // block at most for 5 seconds for any currently executing task to terminate 
                 try { 
                        _executor.awaitTermination(5, TimeUnit.SECONDS); 
                }  catch (InterruptedException e) { 
                         // do nothing 
                } 
        } 


 
ThreadGroupFactory
import java.util.concurrent.ThreadFactory; 

public  class ThreadGroupFactory    
         implements ThreadFactory 

         private ThreadGroup _group; 
         private String            _namePrefix; 
         private  int                 _numThreads; 
         private  boolean         _createDaemonThreads; 
         private  final Object            _syncLock =  new Object(); 
         
         /** 
         * Creates an instance where the threads created by this factory are    
         * assigned to the specified ThreadGroup. 
         * 
         * @param group The ThreadGroup. 
         * @param namePrefix The name prefix for each thread created by this factory. 
         */
 
         public ThreadGroupFactory(ThreadGroup group, String namePrefix) { 
                _group            = group; 
                _namePrefix = namePrefix; 
                _numThreads = 0; 
        } 
         
         /** 
         * Creates an instance where the threads created by this factory are    
         * assigned to the current thread's ThreadGroup. 
         * 
         * @param namePrefix The name prefix for each thread created by this factory. 
         */
 
         public ThreadGroupFactory(String namePrefix) { 
                 this(Thread.currentThread().getThreadGroup(), namePrefix); 
        } 
         
         /** 
         * Set the threads created by this factory to be daemon threads. 
         *    
         * @param daemonThreads <code>true</code> to set threads created by this    
         *                                            factory to be daemon threads. 
         */
 
         public  void createDaemonThreads( boolean daemonThreads) { 
                 synchronized (_syncLock) { 
                        _createDaemonThreads = daemonThreads;                         
                } 
        } 
         
         public Thread newThread(Runnable r) { 
                String name; 
                 boolean daemon; 
                 
                 synchronized (_syncLock) { 
                        name = _namePrefix + ++_numThreads;    
                        daemon = _createDaemonThreads; 
                } 
                 
                Thread thread =  new Thread(_group, r, name); 
                thread.setDaemon(daemon); 
                 
                 return thread; 
        } 

 
LoggingThreadGroup
/** 
* Create your threads using this ThreadGroup to have uncaught exceptions 
* logged via Log4j    
*/
 
public  class LoggingThreadGroup    
         extends ThreadGroup 

         private  final Log _log = LogFactory.getLog(LoggingThreadGroup. class); 

         public LoggingThreadGroup(String groupName) { 
                 super(groupName); 
        } 

         public  void uncaughtException(Thread t, Throwable exc) { 
                _log.warn( "ThreadGroup[" +  this.getName() + 
                                     "]: Unhandled exception", exc); 
                 super.uncaughtException(t, exc); 
        } 
}
 
Junit
/* 
* NOTE: This copyright does *not* cover user programs that use HQ 
* program services by normal system calls through the application 
* program interfaces provided as part of the Hyperic Plug-in Development 
* Kit or the Hyperic Client Development Kit - this is merely considered 
* normal use of the program, and does *not* fall under the heading of 
* "derived work". 

* Copyright (C) [2004-2008], Hyperic, Inc. 
* This file is part of HQ. 

* HQ is free software; you can redistribute it and/or modify 
* it under the terms version 2 of the GNU General Public License as 
* published by the Free Software Foundation. This program is distributed 
* in the hope that it will be useful, but WITHOUT ANY WARRANTY; without 
* even the implied warranty of MERCHANTABILITY or FITNESS FOR A 
* PARTICULAR PURPOSE. See the GNU General Public License for more 
* details. 

* You should have received a copy of the GNU General Public License 
* along with this program; if not, write to the Free Software 
* Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 
* USA. 
*/
 

package org.hyperic.hq.application; 

import junit.framework.TestCase; 
import java.util.concurrent.ScheduledFuture; 

/** 
* Tests the Scheduler class. 
*/
 
public  class Scheduler_test  extends TestCase { 

         public Scheduler_test(String name) { 
                 super(name); 
        } 
                 
         public  void testIllegalPoolSize() { 
                 try { 
                         new Scheduler(-1); 
                        fail( "Expected IllegalArgumentException."); 
                }  catch (IllegalArgumentException e) { 
                         // expected outcome 
                }  catch (Exception e) { 
                        fail( "Expected IllegalArgumentException instead of:"+e); 
                } 
                 
                 try { 
                         new Scheduler(0); 
                        fail( "Expected IllegalArgumentException."); 
                }  catch (IllegalArgumentException e) { 
                         // expected outcome 
                }  catch (Exception e) { 
                        fail( "Expected IllegalArgumentException instead of:"+e); 
                } 
        } 
         
         public  void testExecuteAtFixedRate()  throws Exception { 
                Scheduler scheduler =  new Scheduler(1); 
                                 
                RunnableCounter counter =  new RunnableCounter(50,  false); 
                 
                ScheduledFuture future =    
                        scheduler.scheduleAtFixedRate(counter, Scheduler.NO_INITIAL_DELAY, 100); 
                 
                Thread.sleep(210); 
                 
                assertFalse(future.isDone()); 
                assertFalse(future.isCancelled()); 
                 
                assertEquals(3, counter.numRuns()); 
                 
                scheduler.shutdown(); 
        } 
         
         public  void testExecuteWithFixedDelay()  throws Exception { 
                Scheduler scheduler =  new Scheduler(1); 
                 
                RunnableCounter counter =  new RunnableCounter(100,  false); 
                 
                 // the execution period is about 200 msec (delay+runtime) 
                ScheduledFuture future =    
                        scheduler.scheduleWithFixedDelay(counter, Scheduler.NO_INITIAL_DELAY, 100); 
                 
                Thread.sleep(210); 
                 
                assertFalse(future.isDone()); 
                assertFalse(future.isCancelled()); 
                 
                assertEquals(2, counter.numRuns()); 
                 
                scheduler.shutdown();                 
        } 
         
         public  void testCancellingScheduledTask()  throws Exception { 
                Scheduler scheduler =  new Scheduler(1); 
                 
                RunnableCounter counter =  new RunnableCounter(50,  false); 
                 
                ScheduledFuture future =    
                        scheduler.scheduleAtFixedRate(counter, Scheduler.NO_INITIAL_DELAY, 100); 
                 
                Thread.sleep(210); 
                 
                assertFalse(future.isDone()); 
                assertFalse(future.isCancelled()); 
                 
                future.cancel( true); 
                 
                assertTrue(future.isDone()); 
                assertTrue(future.isCancelled()); 
                 
                Thread.sleep(100); 
                 
                assertEquals(3, counter.numRuns()); 
                 
                scheduler.shutdown(); 
        } 
         
         public  void testExecuteAtFixedRateWithInitialDelay()  throws Exception { 
                Scheduler scheduler =  new Scheduler(1); 
                 
                RunnableCounter counter =  new RunnableCounter(50,  false); 
                 
                ScheduledFuture future =    
                        scheduler.scheduleAtFixedRate(counter, 100, 100); 
                 
                Thread.sleep(250); 
                 
                assertFalse(future.isDone()); 
                assertFalse(future.isCancelled()); 
                 
                assertEquals(2, counter.numRuns()); 
                 
                scheduler.shutdown();                 
        } 
         
         public  void testExecuteWithFixedDelayWithInitialDelay()  throws Exception { 
                Scheduler scheduler =  new Scheduler(1); 
                 
                RunnableCounter counter =  new RunnableCounter(50,  false); 
                 
                ScheduledFuture future =    
                        scheduler.scheduleWithFixedDelay(counter, 100, 50); 
                 
                Thread.sleep(300); 
                 
                assertFalse(future.isDone()); 
                assertFalse(future.isCancelled()); 
                 
                assertEquals(2, counter.numRuns()); 
                 
                scheduler.shutdown();                 
        } 
         
         /** 
         * Test executing one task at a fixed rate and one task with a fixed delay. 
         *    
         * @throws Exception 
         */
 
         public  void testExecuteConcurrentTasks()  throws Exception { 
                Scheduler scheduler =  new Scheduler(2); 
                 
                RunnableCounter counter1 =  new RunnableCounter(10,  false); 

                RunnableCounter counter2 =  new RunnableCounter(10,  false); 
                                 
                ScheduledFuture future1 =    
                        scheduler.scheduleAtFixedRate(counter1, Scheduler.NO_INITIAL_DELAY, 50); 
                 
                 // the execution period is about 60 msec (delay+runtime) 
                ScheduledFuture future2 =    
                        scheduler.scheduleWithFixedDelay(counter2, Scheduler.NO_INITIAL_DELAY, 50); 
                 
                Thread.sleep(210); 
                 
                assertFalse(future1.isDone()); 
                assertFalse(future1.isCancelled()); 

                assertFalse(future2.isDone()); 
                assertFalse(future2.isCancelled()); 
                 
                assertEquals(5, counter1.numRuns()); 

                assertEquals(4, counter2.numRuns()); 
                 
                scheduler.shutdown();                                
        } 
         
         /** 
         * Test that a scheduled task will stop executing if it throws an    
         * unchecked exception. 
         */
 
         public  void testUncheckedExceptionInTask()  throws Exception { 
                Scheduler scheduler =  new Scheduler(1); 
                 
                RunnableCounter counter =  new RunnableCounter(50,  true); 
                 
                ScheduledFuture future =    
                        scheduler.scheduleAtFixedRate(counter, Scheduler.NO_INITIAL_DELAY, 100); 
                 
                Thread.sleep(210); 
                 
                assertTrue(future.isDone()); 
                assertFalse(future.isCancelled()); 
                 
                 // only should have run once since a runtime exception was thrown 
                assertEquals(1, counter.numRuns()); 
                 
                scheduler.shutdown();                 
        } 
         
         private  class RunnableCounter  implements Runnable { 

                 private  final  long _sleepTime; 
                 private  int _numRuns; 
                 private  final  boolean _throwException; 
                 private  final Object _lock =  new Object(); 
                 
                 public RunnableCounter( long sleepTime,  boolean throwUncheckedException) { 
                        _sleepTime = sleepTime; 
                        _throwException = throwUncheckedException; 
                } 
                 
                 public  void run() { 
                         synchronized ( this) { 
                                _numRuns++; 
                        } 
                         
                         try { 
                                Thread.sleep(_sleepTime); 
                        }  catch (InterruptedException e) { 
                        } 
                         
                         if (_throwException) { 
                                 throw  new RuntimeException( "unchecked exception"); 
                        } 
                } 
                 
                 public  synchronized  int numRuns() { 
                         return _numRuns; 
                } 
                 
        } 
         



    本文转自danni505 51CTO博客,原文链接:http://blog.51cto.com/danni505/204896,如需转载请自行联系原作者



相关文章
|
3月前
|
JavaScript 前端开发 安全
Angular Renderer2 的作用和使用场景介绍
Angular Renderer2 的作用和使用场景介绍
47 0
|
5月前
|
前端开发 JavaScript 搜索推荐
什么是 Angular 应用的 re-hydration 过程
什么是 Angular 应用的 re-hydration 过程
22 1
|
6月前
|
JavaScript 前端开发 编译器
Angular 里的 Module 增强
Angular 里的 Module 增强
30 0
|
6月前
Angular ModuleWithProviders 类型的使用场景介绍
Angular ModuleWithProviders 类型的使用场景介绍
31 0
|
6月前
|
JavaScript 前端开发 搜索推荐
什么是 Angular 应用的 rerender 机制
什么是 Angular 应用的 rerender 机制
31 0
|
7月前
|
JavaScript 前端开发 编译器
关于 index.ts 在大型 Angular 项目中的应用
关于 index.ts 在大型 Angular 项目中的应用
44 0
angular36-todoMVC准备工作
angular36-todoMVC准备工作
66 0
angular36-todoMVC准备工作
angular5-angular特性简介
angular5-angular特性简介
85 0
angular5-angular特性简介
|
前端开发
Angular 应用是怎么工作的?
你是否好奇 Angular 应用背后场景都发生了什么?
Angular 应用是怎么工作的?
|
缓存 运维 Kubernetes
带你畅游k8s schedular
k8s 作为云原生最重要的基石之一,她是怎么运作的呢?你是否了解过她是怎么从众多的 node 节点中筛选出符合 pod 的调度节点,这里会从 k8s 的调度原理和流程开始结合源码内容带你了解整个调度过程。
620 0
带你畅游k8s schedular