阻塞队列workQueue选择
任务队列是基于阻塞队列实现的,即采用生产者消费者模式,在Java中需要实现BlockingQueue接口。但Java已经为我们提供了7种阻塞队列的实现:
- ArrayBlockingQueue:一个由数组结构组成的有界阻塞队列(数组结构可配合指针实现一个环形队列)
- LinkedBlockingQueue: 一个由链表结构组成的有界阻塞队列,在未指明容量时,容量默认为Integer.MAX_VALUE
- PriorityBlockingQueue: 一个支持优先级排序的无界阻塞队列,对元素没有要求,可以实现Comparable接口也可以提供Comparator来对队列中的元素进行比较。跟时间没有任何关系,仅仅是按照优先级取任务。
- DelayQueue:类似于PriorityBlockingQueue,是二叉堆实现的无界优先级阻塞队列。要求元素都实现Delayed接口,通过执行时延从队列中提取任务,时间没到任务取不出来。
- SynchronousQueue: 一个不存储元素的阻塞队列,消费者线程调用take()方法的时候就会发生阻塞,直到有一个生产者线程生产了一个元素,消费者线程就可以拿到这个元素并返回;生产者线程调用put()方法的时候也会发生阻塞,直到有一个消费者线程消费了一个元素,生产者才会返回。
- LinkedBlockingDeque: 使用双向队列实现的有界双端阻塞队列。双端意味着可以像普通队列一样FIFO(先进先出),也可以像栈一样FILO(先进后出)。
- LinkedTransferQueue: 它是ConcurrentLinkedQueue、LinkedBlockingQueue和SynchronousQueue的结合体,但是把它用在ThreadPoolExecutor中,和LinkedBlockingQueue行为一致,但是是无界的阻塞队列。
注意有界队列和无界队列的区别:如果使用有界队列,当队列饱和时并超过最大线程数时就会执行拒绝策略;而如果使用无界队列,因为任务队列永远都可以添加任务,所以设置maximumPoolSize没有任何意义
拒绝策略(handler)
当线程池的线程数达到最大线程数时,需要执行拒绝策略。拒绝策略需要实现RejectedExecutionHandler
接口,并实现rejectedExecution(Runnable r, ThreadPoolExecutor executor)
方法。不过Executors框架已经为我们实现了4种拒绝策略:
- AbortPolicy(默认):丢弃任务并抛出RejectedExecutionException异常。
- CallerRunsPolicy:在任务被拒绝添加后,会由调用execute方法的的主线程来执行被拒绝的任务
- DiscardPolicy:丢弃任务,但是不抛出异常。可以配合这种模式进行自定义的处理方式。
- DiscardOldestPolicy:丢弃队列最早的未处理任务,然后重新尝试执行任务。
四种策略都可以进行选择
线程池状态与切换
在ThreadPoolExecutor中定义了一个volatile变量,另外定义了几个static final变量表示线程池的各个状态:
volatile int runState; //runState表示当前线程池的状态,它是一个volatile变量用来保证线程之间的可见性 static final int RUNNING = 0;//当创建线程池后,初始时,线程池处于RUNNING状态 //如果调用了shutdown()方法,则线程池处于SHUTDOWN状态,此时线程池不能够接受新的任务,它会等待所有任务执行完毕 static final int SHUTDOWN = 1; //如果调用了shutdownNow()方法,则线程池处于STOP状态,此时线程池不能接受新的任务,并且会去尝试终止正在执行的任务 static final int STOP = 2; //当线程池处于SHUTDOWN或STOP状态,并且所有工作线程已经销毁,任务缓存队列已经清空或执行结束后,线程池被设置为TERMINATED状态 static final int TERMINATED = 3;
线程池常用方法
线程池的使用示例如下:
// 创建线程池 ThreadPoolExecutor threadPool = new ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) // 向线程池提交任务 threadPool.execute(new Runnable() { @Override public void run() { ... // 线程执行的任务 } }); // 关闭线程池 threadPool.shutdown(); // 设置线程池的状态为SHUTDOWN,然后中断所有没有正在执行任务的线程 // 设置线程池的状态为 STOP,然后尝试停止所有的正在执行或暂停任务的线程,并返回等待执行任务的列表 threadPool.shutdownNow();
在ThreadPoolExecutor类中有几个非常重要的方法:
- execute()方法实际上是Executor中声明的方法,在ThreadPoolExecutor进行了具体的实现,这个方法是ThreadPoolExecutor的核心方法,通过这个方法可以向线程池提交一个任务,交由线程池去执行
- submit()方法是在ExecutorService中声明的方法,在AbstractExecutorService就已经有了具体的实现,在ThreadPoolExecutor中并没有对其进行重写,这个方法也是用来向线程池提交任务的,但是它和execute()方法不同,它能够返回任务执行的结果,去看submit()方法的实现,会发现它实际上还是调用的execute()方法,只不过它利用了Future来获取任务执行结果
shutdown()和shutdownNow()
是用来关闭线程池的。shutdown不会立即终止线程池,而是要等所有任务缓存队列中的任务都执行完后才终止,但再也不会接受新的任务, shutdownNow立即终止线程池,并尝试打断正在执行的任务,并且清空任务缓存队列,返回尚未执行的任务
还有很多其他的方法:比如:getQueue() 、getPoolSize() 、getActiveCount()、getCompletedTaskCount()
等获取与线程池相关属性的方法
线程池执行策略
描述一下线程池工作的原理,同时对上面的参数有一个更深的了解。其工作原理流程图如下图片来源:
可以简单的总结如下:
- 如果当前线程池中的线程数目小于corePoolSize,则每来一个任务,就会创建一个线程去执行这个任务;
- 如果当前线程池中的线程数目>=corePoolSize,则每来一个任务,会尝试将其添加到任务缓存队列当中,
- 若当前任务数<workQueue容量,添加成功,则该任务会等待空闲线程将其取出去执行;
- 若当前任务数>workQueue容量,添加失败,则会尝试创建新的线程去执行这个任务 - 如果当前线程池中的线程数目没有达到maximumPoolSize,则会创建新线程执行任务,并且根据keepAlive设置的闲置时间会自动销毁
- 如果当前线程池中的线程数目和任务队列都满了,则会采取任务拒绝策略进行处理;
需要注意,如果允许为核心池中的线程设置存活时间,那么核心池中的线程空闲时间超过keepAliveTime,线程也会被终止。
预置线程池
Executors中为我们预置了几种线程池,而让我们不必考虑上述线程池的一些参数,可以理解为一些最佳实践,这里列举一下以及简单介绍下它们的作用,定长线程池(FixedThreadPool), 定时线程池(ScheduledThreadPool ),可缓存线程池(CachedThreadPool)单线程化线程池(SingleThreadExecutor)四种
- 定长线程池,只有核心线程,线程数量固定,执行完立即回收,任务队列为链表结构的有界队列,应用场景主要是控制线程最大并发数,可持续发展,节约资源
- 定时线程池,核心线程数量固定,非核心线程数量无限,执行完闲置10ms后回收或周期性的执行任务,任务队列为延时阻塞队列,应用场景主要是执行定时或周期性的任务,定时周期执行
- 可缓存线程池,特点是无核心线程,非核心线程数量无限,执行完闲置60s后回收,任务队列为不存储元素的阻塞队列,应用场景主要为执行大量、耗时少的任务,短平快,短期大量,60s的缓存时间
- 单线程化线程池, 只有1个核心线程,无非核心线程,执行完立即回收,任务队列为链表结构的有界队列,应用场景为不适合并发但可能引起IO阻塞性及影响UI线程响应的操作,如数据库操作、文件操作等,一次只让一个线程干,安全稳定
以上这些只是Java预置的,但存在一些问题:
- FixedThreadPool 和 SingleThreadPool: 允许的请求队列长度为 Integer.MAX_VALUE,可能会堆积大量的请求,从而导致OOM。
- CachedThreadPool 和 ScheduledThreadPool: 允许的创建线程数量为 Integer.MAX_VALUE,可能会创建大量的线程,从而导致OOM
其实还是按照适合自己的场景自定义比较好。
死锁问题及解决方案
本部分回答以下几个问题,如果能回答正确,则证明本部分掌握好了。
- 死锁的问题如何产生,发生条件,死锁的示例
- 死锁如何解决
接下来我们看这部分的内容。
产生死锁的必要条件
所谓死锁,是指 多个进程循环等待它方占有的资源而无限期地僵持下去的局面。很显然,如果没有外力的作用,那麽死锁涉及到的各个进程都将永远处于封锁状态,只要下面四个条件有一个不具备,系统就不会出现死锁。
- 互斥条件,即某个资源在一段时间内只能由一个进程占有,不能同时被两个或两个以上的进程占有。这种独占资源如CD-ROM驱动器,打印机等等,必须在占有该资源的进程主动释放它之后,其它进程才能占有该资源。这是由资源本身的属性所决定的。如独木桥就是一种独占资源,两方的人不能同时过桥。
- 不可抢占条件,进程所获得的资源在未使用完毕之前,资源申请者不能强行地从资源占有者手中夺取资源,而只能由该资源的占有者进程自行释放。如过独木桥的人不能强迫对方后退,也不能非法地将对方推下桥,必须是桥上的人自己过桥后空出桥面(即主动释放占有资源),对方的人才能过桥。
- 占有且申请条件,进程至少已经占有一个资源,但又申请新的资源;由于该资源已被另外进程占有,此时该进程阻塞;但是,它在等待新资源之时,仍继续占用已占有的资源。还以过独木桥为例,甲乙两人在桥上相遇。甲走过一段桥面(即占有了一些资源),还需要走其余的桥面(申请新的资源),但那部分桥面被乙占有(乙走过一段桥面)。甲过不去,前进不能,又不后退;乙也处于同样的状况。
- 循环等待条件,存在一个进程等待序列{P1,P2,…,Pn},其中P1等待P2所占有的某一资源,P2等待P3所占有的资源源,…,而Pn等待P1所占有的的某一资源,形成一个进程循环等待环。就像前面的过独木桥问题,甲等待乙占有的桥面,而乙又等待甲占有的桥面,从而彼此循环等待。
上面我们提到的这四个条件在死锁时会同时发生。也就是说,只要有一个必要条件不满足,则死锁就可以排除,只要破坏这四个必要条件中的任意一个条件,死锁就不会发生。这就为我们解决死锁问题提供了可能。以下是一个死锁的示例:
public class TestMian { //A、B 表示两把锁 String A = "A"; String B = "B"; public static void main(String[] args) { TestMian testMian = new TestMian(); new Thread(()->{ try { testMian.a(); } catch (InterruptedException e) { e.printStackTrace(); } }).start(); new Thread(()->{ try { testMian.b(); } catch (InterruptedException e) { e.printStackTrace(); } }).start(); } public void a() throws InterruptedException { //持有锁A后,尝试持有锁B ***********重点************** synchronized (A){ System.out.println("A"); TimeUnit.SECONDS.sleep(1); synchronized (B){ System.out.println("B"); } } } public void b() throws InterruptedException { //持有锁B后,尝试持有锁A ***********重点************** synchronized (B){ System.out.println("B"); TimeUnit.SECONDS.sleep(1); synchronized (A){ System.out.println("A"); } } } }
解决死锁问题
有两种策略和几种方式解决死锁的问题,一种是从代码编写上处理,另一种是从资源分配上处理,从代码的逻辑上去确定加锁的顺序和方式,避免死锁可以概括成三种方法:
- 固定加锁的顺序(针对锁顺序死锁)
- 开放调用(针对对象之间协作造成的死锁)
- 使用定时锁 tryLock();使用显式 Lock锁,在获取锁时使用 tryLock()方法。当等待超过时限的时候,tryLock()不会一直等待,而是返回错误信息。使用tryLock()能够有效避免死锁问题。
例如尽量使用ReentrantLock,使用它的锁可中断,定时中断机制。
public class DiffLockWithReentrantLock { private int amount; private final Lock lock = new ReentrantLock(); public DiffLockWithReentrantLock(int amount){ this.amount=amount; } private void transfer(DiffLockWithReentrantLock target, int transferAmount) throws InterruptedException { while (true) { if (this.lock.tryLock()) { try { if (target.lock.tryLock()) { try { if(amount< transferAmount){ System.out.println("余额不足!"); }else{ amount=amount-transferAmount; target.amount=target.amount+transferAmount; } break; } finally { target.lock.unlock(); } } } finally { this.lock.unlock(); } } //随机sleep一定的时间,保证可以释放掉锁 Thread.sleep(1000+new Random(1000L).nextInt(1000)); } } }
第二种通过安全序列解决,所谓系统是安全的,是指系统中的所有进程能够按照某一种次序分配资源,并且依次地运行完毕,这种进程序列{P1,P2,…,Pn}就是安全序列。如果存在这样一个安全序列,则系统是安全的;如果系统不存在这样一个安全序列,则系统是不安全的。
安全序列{P1,P2,…,Pn}是这样组成的:若对于每一个进程Pi,它需要的附加资源可以被系统中当前可用资源加上所有进程Pi当前占有资源之和所满足,则{P1,P2,…,Pn}为一个安全序列,这时系统处于安全状态,不会进入死锁状态。
总结
行文至此已洋洋洒洒将近7万字,是我这两周的学习成果没错了,在此过程中学习了大量Blog以及《Java并发编程的艺术》,感觉和17年时候看待并发这件事发生了翻天覆地的变化,可以说透彻并深入的从底层内存模型并发机制到往上的JUC包的使用,整体算有个脉络了,一通百通,不光是Java,在并发编程中学到的一些思想其实可以复用到很多地方,例如Redis的跳表,数据库的排它锁、MVCC机制等。思想是相同的,只是实现不同而已,其实现在还有一些东西不够深入,例如内存屏障和指令级的变化,JUC包下的源码这些都还没研究透彻,之后自己有时间还是想好好研究研究,以上。