首先来看一个例子,例子来源于网上:
/*** 多线程模拟实现生产者/消费者模型* */publicclassBlockingQueueTest2 { /*** * 定义装苹果的篮子* */publicclassBasket { // 篮子,能够容纳3个苹果BlockingQueue<String>basket=newLinkedBlockingQueue<String>(3); // 生产苹果,放入篮子publicvoidproduce() throwsInterruptedException { // put方法放入一个苹果,若basket满了,等到basket有位置basket.put("An apple"); } // 消费苹果,从篮子中取走publicStringconsume() throwsInterruptedException { // take方法取出一个苹果,若basket为空,等到basket有苹果为止(获取并移除此队列的头部)returnbasket.take(); } } // 定义苹果生产者classProducerimplementsRunnable { privateStringinstance; privateBasketbasket; publicProducer(Stringinstance, Basketbasket) { this.instance=instance; this.basket=basket; } publicvoidrun() { try { while (true) { // 生产苹果System.out.println("生产者准备生产苹果:"+instance); basket.produce(); System.out.println("!生产者生产苹果完毕:"+instance); // 休眠300msThread.sleep(300); } } catch (InterruptedExceptionex) { System.out.println("Producer Interrupted"); } } } // 定义苹果消费者classConsumerimplementsRunnable { privateStringinstance; privateBasketbasket; publicConsumer(Stringinstance, Basketbasket) { this.instance=instance; this.basket=basket; } publicvoidrun() { try { while (true) { // 消费苹果System.out.println("消费者准备消费苹果:"+instance); System.out.println(basket.consume()); System.out.println("!消费者消费苹果完毕:"+instance); // 休眠1000msThread.sleep(1000); } } catch (InterruptedExceptionex) { System.out.println("Consumer Interrupted"); } } } publicstaticvoidmain(String[] args) { BlockingQueueTest2test=newBlockingQueueTest2(); // 建立一个装苹果的篮子Basketbasket=test.newBasket(); ExecutorServiceservice=Executors.newCachedThreadPool(); Producerproducer=test.newProducer("生产者001", basket); Producerproducer2=test.newProducer("生产者002", basket); Consumerconsumer=test.newConsumer("消费者001", basket); service.submit(producer); service.submit(producer2); service.submit(consumer); // 程序运行5s后,所有任务停止// try {// Thread.sleep(1000 * 5);// } catch (InterruptedException e) {// e.printStackTrace();// }// service.shutdownNow(); } }
采用线程池和阻塞队列实现生产/消费者模型。其中LinkedBlockingQueue是阻塞队列,同时线程安全,其特点:
采用链表数据结构Node的方式进行节点数据的记录,
同时其进行入队和出队的计数器采用原子性的AtomicInteger
其出队和入队采用采用两把锁,putLock和takeLock,同时进行删除的时候,采用fullLock
其与LinkedBlockingQueue相比,其可以无界可以有界,而ArrayBlockingQueue是有界的,同时实现的数据结构不通过,一个采用数组、一个采用链表,同时采用的锁的方式不同,ArrayBlockingQueue采用一把锁,没有对生产和消费消息进行锁的分离。
1.相关变量
//容量,为空时使用Integer.MAX_VALUE=2^31-1privatefinalintcapacity; /** Current number of elements *///计数,队列中的元素个数privatefinalAtomicIntegercount=newAtomicInteger(); //头结点,head.item==null,首节点不存放元素transientNode<E>head; //尾节点,last.next==nullprivatetransientNode<E>last; /** Lock held by take, poll, etc *///消费队列锁privatefinalReentrantLocktakeLock=newReentrantLock(); /** Wait queue for waiting takes *///消费队列等待消费,用于队满时,进行消费privatefinalConditionnotEmpty=takeLock.newCondition(); /** Lock held by put, offer, etc *///生产队列锁privatefinalReentrantLockputLock=newReentrantLock(); /** Wait queue for waiting puts *///生产队列等待生产,用于队空时,进行生产privatefinalConditionnotFull=putLock.newCondition(); //节点信息:数据、后继点击staticclassNode<E> { Eitem; /*** One of:* - the real successor Node* - this Node, meaning the successor is head.next* - null, meaning there is no successor (this is the last node)*///下一个节点,分为三种情况:// 指向真正的节点、指向自己,后继节点为head.next、为空,表示当前节点为尾节点Node<E>next; Node(Ex) { item=x; } }
2.构造方法
//构造方法,空参构造默认队列容量为2^31-1publicLinkedBlockingQueue() { this(Integer.MAX_VALUE); } //构造方法,带指定容量publicLinkedBlockingQueue(intcapacity) { //对容量进行校验if (capacity<=0) thrownewIllegalArgumentException(); this.capacity=capacity; //创建节点信息last=head=newNode<E>(null); } //构造方法,放入带指定集合的元素信息入队//首先采用默认大小,进行上锁操作,// 放入元素到队列中,进行遍历,放入publicLinkedBlockingQueue(Collection<?extendsE>c) { //默认队列大小,2^31-1this(Integer.MAX_VALUE); //进行上锁操作finalReentrantLockputLock=this.putLock; putLock.lock(); // Never contended, but necessary for visibilitytry { //放入元素,进行计数intn=0; for (Ee : c) { if (e==null) thrownewNullPointerException(); if (n==capacity) thrownewIllegalStateException("Queue full"); enqueue(newNode<E>(e)); ++n; } count.set(n); } finally { //释放锁putLock.unlock(); } }
3.方法
生产方法
put操作
//入队操作//首先获取锁,再检查队列是否满了,如果满了,则进行阻塞等待,// 如果队列没有满,则进行生产操作,同时计数器进行计数//生产后的元素个数如果还没有达到容量时,会继续唤醒其他生产线程//当生产的元素是元素的第一个元素时唤醒阻塞等待消费的线程publicvoidput(Ee) throwsInterruptedException { //非空校验if (e==null) thrownewNullPointerException(); // Note: convention in all put/take/etc is to preset local var// holding count negative to indicate failure unless set.//设置计数为0,失败的时候返回intc=-1; Node<E>node=newNode<E>(e); finalReentrantLockputLock=this.putLock; finalAtomicIntegercount=this.count; //中断上锁putLock.lockInterruptibly(); try { /** Note that count is used in wait guard even though it is* not protected by lock. This works because count can* only decrease at this point (all other puts are shut* out by lock), and we (or some other waiting put) are* signalled if it ever changes from capacity. Similarly* for all other uses of count in other wait guards.*///检查队列是否满了,满了进行阻塞操作while (count.get() ==capacity) { notFull.await(); } //入队操作,将节点信息插入到队尾//last=last.next=nodeenqueue(node); c=count.getAndIncrement(); //元素没有满,则唤醒被阻塞的线程,增加线程if (c+1<capacity) notFull.signal(); } finally { //释放锁putLock.unlock(); } //插入的是一个元素时唤醒阻塞等待的线程if (c==0) signalNotEmpty(); }
offer操作
//阻塞带超时时间的offer操作publicbooleanoffer(Ee, longtimeout, TimeUnitunit) throwsInterruptedException { if (e==null) thrownewNullPointerException(); longnanos=unit.toNanos(timeout); intc=-1; finalReentrantLockputLock=this.putLock; finalAtomicIntegercount=this.count; putLock.lockInterruptibly(); try { while (count.get() ==capacity) { //如果时间<0,则表示超时返回了,此时队列未满,直接返回if (nanos<=0) returnfalse; nanos=notFull.awaitNanos(nanos); } //否者进行入队操作enqueue(newNode<E>(e)); c=count.getAndIncrement(); if (c+1<capacity) notFull.signal(); } finally { putLock.unlock(); } if (c==0) signalNotEmpty(); returntrue; } //首先进行非空校验,如果队满了,直接返回false//如果没有满,则进行上锁,同时进行判断,// 如果计数<容量,则进行入队操作//最后释放锁publicbooleanoffer(Ee) { if (e==null) thrownewNullPointerException(); finalAtomicIntegercount=this.count; if (count.get() ==capacity) returnfalse; intc=-1; Node<E>node=newNode<E>(e); finalReentrantLockputLock=this.putLock; putLock.lock(); try { if (count.get() <capacity) { enqueue(node); c=count.getAndIncrement(); if (c+1<capacity) notFull.signal(); } } finally { putLock.unlock(); } if (c==0) signalNotEmpty(); returnc>=0; }
消费者操作
take操作
//take操作 消费消息//如果队列为非空或者被唤醒,进行消费操作,计数器-1publicEtake() throwsInterruptedException { Ex; intc=-1; finalAtomicIntegercount=this.count; finalReentrantLocktakeLock=this.takeLock; takeLock.lockInterruptibly(); try { while (count.get() ==0) { notEmpty.await(); } x=dequeue(); c=count.getAndDecrement(); if (c>1) notEmpty.signal(); } finally { takeLock.unlock(); } //开始消费if (c==capacity) signalNotFull(); returnx; }
pull操作
//进行消费操作 poll,带超时时间publicEpoll(longtimeout, TimeUnitunit) throwsInterruptedException { Ex=null; intc=-1; longnanos=unit.toNanos(timeout); finalAtomicIntegercount=this.count; finalReentrantLocktakeLock=this.takeLock; takeLock.lockInterruptibly(); try { while (count.get() ==0) { if (nanos<=0) returnnull; nanos=notEmpty.awaitNanos(nanos); } x=dequeue(); c=count.getAndDecrement(); if (c>1) notEmpty.signal(); } finally { takeLock.unlock(); } if (c==capacity) signalNotFull(); returnx; } //进行poll操作publicEpoll() { finalAtomicIntegercount=this.count; if (count.get() ==0) returnnull; Ex=null; intc=-1; finalReentrantLocktakeLock=this.takeLock; takeLock.lock(); try { if (count.get() >0) { x=dequeue(); c=count.getAndDecrement(); if (c>1) notEmpty.signal(); } } finally { takeLock.unlock(); } if (c==capacity) signalNotFull(); returnx; }
remove操作
//删除操作,释放指定节点信息publicbooleanremove(Objecto) { if (o==null) returnfalse; //对生产消息和消费消息进行上锁fullyLock(); try { for (Node<E>trail=head, p=trail.next; p!=null; trail=p, p=p.next) { if (o.equals(p.item)) { //释放节点unlink(p, trail); returntrue; } } returnfalse; } finally { fullyUnlock(); } }
drainTo操作
publicintdrainTo(Collection<?superE>c) { returndrainTo(c, Integer.MAX_VALUE); } //一次性地将队列中的全部元素消费完同时返回指定集合的信息,避免多次加锁造成的性能开销//其中c和maxElement表示返回的集合、要获取的元素个数publicintdrainTo(Collection<?superE>c, intmaxElements) { if (c==null) thrownewNullPointerException(); if (c==this) thrownewIllegalArgumentException(); if (maxElements<=0) return0; booleansignalNotFull=false; //进行上锁finalReentrantLocktakeLock=this.takeLock; takeLock.lock(); try { //拿到两者之间的最小的一个intn=Math.min(maxElements, count.get()); // count.get provides visibility to first n NodesNode<E>h=head; inti=0; try { //将元素添加中集合c中while (i<n) { Node<E>p=h.next; c.add(p.item); p.item=null; h.next=h; h=p; ++i; } returnn; } finally { // Restore invariants even if c.add() threwif (i>0) { // assert h.item == null;head=h; signalNotFull= (count.getAndAdd(-i) ==capacity); } } } finally { takeLock.unlock(); if (signalNotFull) signalNotFull(); } }