阻塞队列指的就是在队列的基础上附加了两个操作的队列。
两个附加的操作是:在队列为空时,获取元素的线程会等待队列变为非空。当队列满时,存储元素的线程会等待队列可用。阻塞队列常用于生产者和消费者的场景,生产者是往队列里添加元素的线程,消费者是从队列里拿元素的线程。阻塞队列就是生产者存放元素的容器,而消费者也只从容器里拿元素。
常见阻塞场景
- 当队列中没有数据的情况下,消费者端的所有线程都会被自动堵塞(挂起),直到有数据放入队列。
- 当队列中填满数据的情况下,生产者端的所有线程都会被自动阻塞(挂起),直到队列中有空的位置,线程被自动唤醒。
支持以上两种阻塞场景的被称为阻塞队列。
BlockingQueue 方法
放入数据:
- off (anObject) :表示如果可能的话,将addObject 加到 BlockingQueue里。即如果BlockingQueue 可以容纳,则返回 true,否则 返回false .(本方法不阻塞当前执行方法的线程)
- offer(E o,long timeout,TimeUnit unit):可以设定等待时间。如果在指定的时间内还不能忘队列中加入 BlockingQueue ,则返回失败。
- put(anObject): 把anObject 加到BlockQueue 里,如果 BlockQueue 没有空间,则调用此方法的线程被阻断直到 BlockingQueue 里面有空间再继续。
获取数据:
- poll(time) :取走 BlockingQueue 里排在首位的对象,若不能立即去除,则可以等 time 参数规定的时间,取不到是 返回 null.
- poll(long timeout,TimeUnit unit): 从BlockingQueue 取出一个队首的对象,如果在指定时间内,则立即返回队列中的数据。否则直到时间超时还没有数据可取,返回失败。
- take() : 取走BlockingQueue 里排在首位的对象。若 BlockingQueueue为空,则阻断进入等待状态,直道BlockingQueue 有新的数据被加入。
- drainTo(): 一次性从 BlockingQueue 获取所有可用的数据对象(还可以指定数据的个数).通过该方法,可以提升获取数据的效率;无序多次分批加速或释放锁。
Java中的阻塞队列
在java中提供了7个阻塞队列,分别如下
- ArrayBlockingQueue : 由数组结构组成的有界阻塞队列
- LinkedBlockingQueue: 由链表结构组成的有界阻塞队列
- PriorityBlockingQueue: 支持优先级排序的误解阻塞队列
- DelyQueue : 使用优先级队列实现的无界阻塞队列
- SynchronousQueue : 不存储元素的阻塞队列
- LinkedTransferQueue: 由链表结构组成的无界阻塞队列
- LinkedBlockingQueue: 由链表结构组成的双向阻塞队列。
ArryBlockingQueue
它是用数组实现的有界阻塞队列,并按照先进先出(FIFO) 的原则对元素进行排序。默认情况下不保证线程公平的访问队列。公平访问队列就是指阻塞的所有生产者线程或消费线程,当队列可用是,可以按照阻塞的先后顺序访问队列。即先阻塞的生产者线程,可以先往队列里插入元素;先阻塞的消费者线程,可以先从队列里获取元素。通常情况下为了保证公平性会降低通吐量。我们可以使用以下代码创建一个公平的阻塞队列。
ArrayBlockingQueue fairQueue=new ArrayBlockingQueue(2000,true);
LinkedBlockingQueue
它是基于链表的阻塞队列,同ArrayListBlockingQueue类似,此队列按照先进先出(FIFO)的原则对元素进行排序,其内部也维持着一个数据缓冲队列(该队列由一个链表构成). 当生产者往队列中放入一个数据时,队列会从生产者手中获取数据,并缓冲在队列内部,而生产者立即返回;只有当队列缓冲区达到缓存容量的最大值是(LinkedBlockingQueue可以通过构造方法指定该值),才会阻塞生产者队列,直到消费者从队列中消费掉一份数据,生产者线程会被唤醒。反之对于消费者这端的处理也基于同样的原理。
而LinkedBlockingQueue 之所以能够高效的处理并发数据,还因为其对于生产者端和消费者端分别采用了独立的锁来控制数据同步。 这也意味着在高并发的情况下生产者和消费者可以并行的操作队列中的数据,以此来提高整个队列的并发性能。
我们需要注意的是,如果构造一个 LinkedBlockingQueue 对象,而没有指定其容量大小,LinkedBlockingQueue 会默认一个类似无限大小的容量(Integer.MAX_VALUE).这样一来,如果生产者的速度一旦大于消费者的速度,也许还没有等到队列满阻塞产生,系统内存就有可能已被消耗殆尽了。ArrayBlockingQueue 和 LinkedBlockingQueue 是两个最普通也是最常用的阻塞队列。一般情况下,在处理多线程的 生产者-消费者问题是,使用这两个类足以。
Demo
//生产者 public class Shengchan implements Runnable{ private volatile boolean isRunnage =true; //是否在运行状态 private BlockingQueue<String> queue; //阻塞队列 //原子方式更新 private static AtomicInteger cout=new AtomicInteger(); //自动更新的值 private static final int De=1000; @Override public void run() { String data=null; Random r=new Random(); System.out.println("启动生产者线程"); while (isRunnage){ System.out.println("正在生产"); try { Thread.sleep(r.nextInt(De)); //取0-1000的一个随机数 data="data"+cout.incrementAndGet(); //以原子方式将 cout+1 System.out.println("将数据:"+data+"放入队列..."); if (!queue.offer(data,2, TimeUnit.SECONDS)){ //设定的等待时间,如果超过2S还没加进去返回true System.out.println("放入数据失败:"+data); } } catch (InterruptedException e) { e.printStackTrace(); Thread.currentThread().interrupt(); }finally { System.out.println("退出生产者线程"); } } } public void stop(){ isRunnage=false; } public Shengchan(BlockingQueue<String> queue) { this.queue = queue; } }
//消费者 public class Xiaofeizhe implements Runnable{ private BlockingQueue<String> queue; private static final int DEFAULT=1000; //构造函数 public Xiaofeizhe(BlockingQueue<String> queue) { this.queue = queue; } @Override public void run() { System.out.println("启动消费者线程"); Random r=new Random(); boolean isRunnag=true; while(isRunnag){ System.out.println("正在从队列获取数据"); try { String data=queue.poll(2, TimeUnit.SECONDS); //有数据是直接从队列的对首取走,无数据是阻塞,在2s内有数据,取走,超过2s还没数据,返回失败 if (data != null) { System.out.println("拿到数据"+data); System.out.println("正在消费数据"+data); Thread.sleep(r.nextInt(DEFAULT)); }else { //超过2s还没数据,认为所有生产线程都已经退出,自动退出消费线程。 isRunnag=false; } } catch (InterruptedException e) { e.printStackTrace(); //中断线程 Thread.currentThread().interrupt(); }finally { System.out.println("退出消费者线程"); } } } }
//测试类 public class BlockingQueueTest { public static void main(String[] args) throws InterruptedException { //声明一个容量为10的缓存队列 BlockingQueue<String> queue=new LinkedBlockingDeque<>(10); //new了3个生产者和一个消费者 Shengchan s1=new Shengchan(queue); Shengchan s2=new Shengchan(queue); Shengchan s3=new Shengchan(queue); Xiaofeizhe x1=new Xiaofeizhe(queue); //借助Excutors //创建线程池 ExecutorService service= Executors.newCachedThreadPool(); service.execute(s1); service.execute(s2); service.execute(s3); service.execute(x1); //执行10s Thread.sleep(10*1000); s1.stop(); s2.stop(); s3.stop(); Thread.sleep(2000); //退出Excutor service.shutdown(); } }
PriorityBlockingQueue
它是一个支持优先级的无界阻塞队列。默认情况下元素采取自然顺序升序排列。这里可以自定义实现 comepareTo() 方法来指定元素进行排序规则;或者初始化PriorityBlockingQueue 时,指定构造参数 Comparator 来对元素进行排序。但其不能保证痛优先级元素的顺序 .
DelayQueue
它是一个支持延时获取元素的无界阻塞队列。队列使用 PriorityQueue 来实现。队列中的元素必须实现Delayed接口。创建元素时,可以指定元素到期的时间,只有在元素到期时才能从队列中取走。
SynchronousQueue
它是一个不存储元素的阻塞队列。每个插入操作必须等待另一个线程的移除操作,同样任何一个移除操作都等待另一个线程的插入操作。因此队列内部其实没有任何一个元素,或者说容量是0,严格来说它并不是一种容器。由于队列没有容量,因此不能调用peek操作(返回队列的头元素)。
LinkedTransferQueue
它是一个由链表结构组成的无界阻塞TransferQueue队列。LinkedTransferQueue实现了一个重要的接口TransferQueue。该接口含有5个方法,其中有3个重要方法,分别如下。
- transfer(E e) :若当前存在一个正在等待获取的消费者线程,则立刻将元素传递给消费者;如果没有消费者在等待接收数据,就会将元素插入到队列尾部,并且等待进入阻塞状态,知道有消费者线程取走该数据。
- tryTransfer(E e): 若当前存在一个正在等待获取的消费者线程,则立刻将元素传递给消费者;若不存在,则返回 false ,并且不进入队列,这是一个不阻塞的操作。与 transfer 方法不同的是,tryTransfer方法无论消费者是否接受,其都会立即返回;而 transfer方法则是消费者接受了才返回。
- tryTransfer(E e,lomh timeout,TimeUnit unit): 若当前存在一个正在等待获取的消费者线程,则立即将元素传递给消费者;若不存在则将元素插入到队列尾部,并且等待消费者线程取走该元素。若在指定的超时时间内元素未被消费者线程获取,则返回 false ;若在指定的超时时间内其被消费者线程获取,则返回 true.
LinkBlockingDeque
它是一个由链表结构组成的双向阻塞队列。双向队列可以从队列的两端插入和移除元素,因此在多线程同时入队时,也就减少了一半的竞争。由于是双向的,因此 LinkedBlockingDeque 多了addFrist,addLast,offerFitrst,offerLast,peekFirst,peekLast等方法。其中,以Frist单词结尾的方法,表示插入,获取或移除双端队列的第一个元素;以Last单词结尾的方法,表示插入,获取或移除双端队列的最后一个元素。
阻塞队列的实现原理
我们以ArryBlockingQueue 为例子
我们可以看到,ArrayBockingQueue 维护了一个 Object 类型的数组,takeIndex 和 putIndex 分别表示队首元素和队尾元素的下标,count表示队列中元素的个数,接着往下看
lock是一个可重入锁,notEmpty和 notFull 是等待条件,接着我们看put 方法
它先获取了锁,并且是一个可中断锁,然后判断当前线程个数是否等于数组长度,如果相等,则嗲用 notFulawait()进行等待。当次线程被其他线程唤醒时,通过 enqueue(e) 方法插入元素,接着看 enqueue 方法
插入成功后,通过notEntry唤醒正在等待元素的线程。
下面再来看看take方法
和 put 方法类似,put 方法是等待 notFull 信号,而 Take是等待 notEmpty 的信号,在 take 方法中,如果可以取元素,则通过 dequeue 方法取得元素,下面看看dequeue 方法
和 enqueue 方法类似,在获取元素后,通过notFull 的signal 唤醒正在等待插入元素的线程。