三、4类常见的线程池
newCachedThreadPool创建一个可缓存线程池,如果线程池长度超过处理需要,可灵活回收空闲线程,若无可回收,则新建线程。
newFixedThreadPool 创建一个定长线程池,可控制线程最大并发数,超出的线程会在队列中等待。
newScheduledThreadPool 创建一个定长线程池,支持定时及周期性任务执行。
newSingleThreadExecutor 创建一个单线程化的线程池,它只会用唯一的工作线程来执行任务,保证所有任务按照指定顺序(FIFO, LIFO, 优先级)执行。
其实他们都是通过ThreadPoolExcutors创建的
1.CachedThreadPool
由Executors的newCachedThreadPool方法创建,不存在核心线程,只存在数量不定的非核心线程,而且其数量最大值为Integer.MAX_VALUE。当线程池中的线程都处于活动时(全满),线程池会创建新的线程来处理新的任务,否则就会利用新的线程来处理新的任务,线程池中的空闲线程都有超时机制,默认超时时长为60s,超过60s的空闲线程就会被回收。和FixedThreadPool不同的是,CachedThreadPool的任务队列其实相当于一个空的集合,这将导致任何任务都会被执行,因为在这种场景下SynchronousQueue是不能插入任务的,SynchronousQueue是一个特殊的队列,在很多情况下可以理解为一个无法储存元素的队列。从CachedThreadPool的特性看,这类线程比较适合执行大量耗时较小的任务。当整个线程池都处于闲置状态时,线程池中的线程都会因为超时而被停止回收,几乎是不占任何系统资源。实现方式如下:
public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), threadFactory); }
2.FixedThreadPool
由Executors的newFixedThreadPool方法创建。它是一种线程数量固定的线程池,当线程处于空闲状态时,他们并不会被回收,除非线程池被关闭。当所有的线程都处于活动状态时,新的任务都会处于等待状态,直到有线程空闲出来。FixedThreadPool只有核心线程,且该核心线程都不会被回收,这意味着它可以更快地响应外界的请求。jdk实现如下:FixedThreadPool没有额外线程,只存在核心线程,而且核心线程没有超时机制,而且任务队列没有长度的限制。
public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(), threadFactory); }
3.ScheduledThreadPool
通过Executors的newScheduledThreadPool方式创建,核心线程数量是固定的,而非核心线程是没有限制的,并且当非核心线程闲置时它会被立即回收,ScheduledThreadPool这类线程池主要用于执行定时任务和具有固定时期的重复任务,实现方法如下:
public static ScheduledExecutorService newScheduledThreadPool( int corePoolSize, ThreadFactory threadFactory) { return new ScheduledThreadPoolExecutor(corePoolSize, threadFactory); } public ScheduledThreadPoolExecutor(int corePoolSize, ThreadFactory threadFactory) { super(corePoolSize, Integer.MAX_VALUE, DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS, new DelayedWorkQueue(), threadFactory); }
4.SingleThreadPool
通过Executors的newSingleThreadExecutor方法来创建。这类线程池内部只有一个核心线程,它确保所有的任务都在同一个线程中按顺序执行。SingleThreadExecutor的意义在于统一所有外界任务一个线程中,这使得这些任务之间不需要处理线程同步的问题,实现方式如下:
public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) { return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(), threadFactory)); }
四、为什么《阿里巴巴Java开发手册》上要禁止使用Executors来创建线程池
Executors创建出来的线程池使用的全都是无界队列,而使用无界队列会带来很多弊端,最重要的就是,它可以无限保存任务,因此很有可能造成OOM异常。同时在某些类型的线程池里面,使用无界队列还会导致maxinumPoolSize、keepAliveTime、handler等参数失效
五、我们用代码测试下
import java.util.*; import java.lang.*; import java.io.Serializable; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; class Rextester { private static int produceTaskSleepTime = 5; private static int consumeTaskSleepTime = 1000; private static int taskMaxNumber = 10; //定义最大添加10个线程到线程池中 public static void main(String args[]) { //构造一个线程池 //该策略默默的丢弃无法处理的任务,不予任何处理。 ThreadPoolExecutor threadPool = getTpe("DiscardPolicy"); //该策略将丢弃最老的一个请求,也就是即将被执行的任务,并尝试再次提交当前任务。 //ThreadPoolExecutor threadPool = getTpe("DiscardOldestPolicy"); for (int i = 1; i <= taskMaxNumber; i++) { try { //添加任务到线程池,我们要知道添加的任务是Runnable String value = "value is: " + i; System.out.println("put:" + value); threadPool.execute(new ThreadPoolTask(value)); //线程休息一段时间,方便我们分析结果 Thread. sleep(produceTaskSleepTime); } catch (Exception e) { e.printStackTrace(); } } } /** * 线程池执行的任务 */ public static class ThreadPoolTask implements Runnable { //保存任务所需要的数据 private String value; public ThreadPoolTask(String value) { this.value = value; } public void run() { //打印语句 System. out.println("start------" + value); try { Thread. sleep(consumeTaskSleepTime); } catch (Exception e) { e.printStackTrace(); } value = ""; } } /** *初始化线程池 */ public static ThreadPoolExecutor getTpe(String type) { if ("DiscardOldestPolicy".equals(type)) { return new ThreadPoolExecutor(2, 4, 3, TimeUnit. SECONDS, new ArrayBlockingQueue<Runnable>(3), new ThreadPoolExecutor.DiscardOldestPolicy()); } else { return new ThreadPoolExecutor(2, 4, 3, TimeUnit. SECONDS, new ArrayBlockingQueue<Runnable>(3), new ThreadPoolExecutor.DiscardPolicy()); } //DiscardPolicy DiscardOldestPolicy } }
运行结果:
put:value is: 1 start------value is: 1 put:value is: 2 start------value is: 2 put:value is: 3 put:value is: 4 put:value is: 5 put:value is: 6 start------value is: 6 put:value is: 7 start------value is: 7 put:value is: 8 put:value is: 9 put:value is: 10 start------value is: 3 start------value is: 4 start------value is: 5
我们把上面的生成线程池的代码不//
ThreadPoolExecutor threadPool = getTpe("DiscardOldestPolicy");
运行结果如下
put:value is: 1 start------value is: 1 put:value is: 2 start------value is: 2 put:value is: 3 put:value is: 4 put:value is: 5 put:value is: 6 start------value is: 6 put:value is: 7 start------value is: 7 put:value is: 8 put:value is: 9 put:value is: 10 start------value is: 8 start------value is: 9 start------value is: 10