LinkedBlockingQueue源码学习

简介: 采用线程池和阻塞队列实现生产/消费者模型。其中LinkedBlockingQueue是阻塞队列,同时线程安全,其特点:采用链表数据结构Node的方式进行节点数据的记录,同时其进行入队和出队的计数器采用原子性的AtomicInteger其出队和入队采用采用两把锁,putLock和takeLock,同时进行删除的时候,采用fullLock其与LinkedBlockingQueue相比,其可以无界可以有界,而ArrayBlockingQueue是有界的,同时实现的数据结构不通过,一个采用数组、一个采用链表,同时采用的锁的方式不同,ArrayBlockingQueue采用一把锁,没有对生产和消

首先来看一个例子,例子来源于网上:

/*** 多线程模拟实现生产者/消费者模型*  */publicclassBlockingQueueTest2 {
/*** * 定义装苹果的篮子* */publicclassBasket {
// 篮子,能够容纳3个苹果BlockingQueue<String>basket=newLinkedBlockingQueue<String>(3);
// 生产苹果,放入篮子publicvoidproduce() throwsInterruptedException {
// put方法放入一个苹果,若basket满了,等到basket有位置basket.put("An apple");
        }
// 消费苹果,从篮子中取走publicStringconsume() throwsInterruptedException {
// take方法取出一个苹果,若basket为空,等到basket有苹果为止(获取并移除此队列的头部)returnbasket.take();
        }
    }
// 定义苹果生产者classProducerimplementsRunnable {
privateStringinstance;
privateBasketbasket;
publicProducer(Stringinstance, Basketbasket) {
this.instance=instance;
this.basket=basket;
        }
publicvoidrun() {
try {
while (true) {
// 生产苹果System.out.println("生产者准备生产苹果:"+instance);
basket.produce();
System.out.println("!生产者生产苹果完毕:"+instance);
// 休眠300msThread.sleep(300);
                }
            } catch (InterruptedExceptionex) {
System.out.println("Producer Interrupted");
            }
        }
    }
// 定义苹果消费者classConsumerimplementsRunnable {
privateStringinstance;
privateBasketbasket;
publicConsumer(Stringinstance, Basketbasket) {
this.instance=instance;
this.basket=basket;
        }
publicvoidrun() {
try {
while (true) {
// 消费苹果System.out.println("消费者准备消费苹果:"+instance);
System.out.println(basket.consume());
System.out.println("!消费者消费苹果完毕:"+instance);
// 休眠1000msThread.sleep(1000);
                }
            } catch (InterruptedExceptionex) {
System.out.println("Consumer Interrupted");
            }
        }
    }
publicstaticvoidmain(String[] args) {
BlockingQueueTest2test=newBlockingQueueTest2();
// 建立一个装苹果的篮子Basketbasket=test.newBasket();
ExecutorServiceservice=Executors.newCachedThreadPool();
Producerproducer=test.newProducer("生产者001", basket);
Producerproducer2=test.newProducer("生产者002", basket);
Consumerconsumer=test.newConsumer("消费者001", basket);
service.submit(producer);
service.submit(producer2);
service.submit(consumer);
// 程序运行5s后,所有任务停止//        try {//            Thread.sleep(1000 * 5);//        } catch (InterruptedException e) {//            e.printStackTrace();//        }//        service.shutdownNow();    }
}

采用线程池和阻塞队列实现生产/消费者模型。其中LinkedBlockingQueue是阻塞队列,同时线程安全,其特点:

采用链表数据结构Node的方式进行节点数据的记录,

同时其进行入队和出队的计数器采用原子性的AtomicInteger

其出队和入队采用采用两把锁,putLock和takeLock,同时进行删除的时候,采用fullLock

其与LinkedBlockingQueue相比,其可以无界可以有界,而ArrayBlockingQueue是有界的,同时实现的数据结构不通过,一个采用数组、一个采用链表,同时采用的锁的方式不同,ArrayBlockingQueue采用一把锁,没有对生产和消费消息进行锁的分离。


1.相关变量

//容量,为空时使用Integer.MAX_VALUE=2^31-1privatefinalintcapacity;
/** Current number of elements *///计数,队列中的元素个数privatefinalAtomicIntegercount=newAtomicInteger();
//头结点,head.item==null,首节点不存放元素transientNode<E>head;
//尾节点,last.next==nullprivatetransientNode<E>last;
/** Lock held by take, poll, etc *///消费队列锁privatefinalReentrantLocktakeLock=newReentrantLock();
/** Wait queue for waiting takes *///消费队列等待消费,用于队满时,进行消费privatefinalConditionnotEmpty=takeLock.newCondition();
/** Lock held by put, offer, etc *///生产队列锁privatefinalReentrantLockputLock=newReentrantLock();
/** Wait queue for waiting puts *///生产队列等待生产,用于队空时,进行生产privatefinalConditionnotFull=putLock.newCondition();
//节点信息:数据、后继点击staticclassNode<E> {
Eitem;
/*** One of:* - the real successor Node* - this Node, meaning the successor is head.next* - null, meaning there is no successor (this is the last node)*///下一个节点,分为三种情况:// 指向真正的节点、指向自己,后继节点为head.next、为空,表示当前节点为尾节点Node<E>next;
Node(Ex) { item=x; }
}



2.构造方法

//构造方法,空参构造默认队列容量为2^31-1publicLinkedBlockingQueue() {
this(Integer.MAX_VALUE);
}
//构造方法,带指定容量publicLinkedBlockingQueue(intcapacity) {
//对容量进行校验if (capacity<=0) thrownewIllegalArgumentException();
this.capacity=capacity;
//创建节点信息last=head=newNode<E>(null);
}
//构造方法,放入带指定集合的元素信息入队//首先采用默认大小,进行上锁操作,// 放入元素到队列中,进行遍历,放入publicLinkedBlockingQueue(Collection<?extendsE>c) {
//默认队列大小,2^31-1this(Integer.MAX_VALUE);
//进行上锁操作finalReentrantLockputLock=this.putLock;
putLock.lock(); // Never contended, but necessary for visibilitytry {
//放入元素,进行计数intn=0;
for (Ee : c) {
if (e==null)
thrownewNullPointerException();
if (n==capacity)
thrownewIllegalStateException("Queue full");
enqueue(newNode<E>(e));
++n;
        }
count.set(n);
    } finally {
//释放锁putLock.unlock();
    }
}


3.方法

生产方法

put操作

//入队操作//首先获取锁,再检查队列是否满了,如果满了,则进行阻塞等待,// 如果队列没有满,则进行生产操作,同时计数器进行计数//生产后的元素个数如果还没有达到容量时,会继续唤醒其他生产线程//当生产的元素是元素的第一个元素时唤醒阻塞等待消费的线程publicvoidput(Ee) throwsInterruptedException {
//非空校验if (e==null) thrownewNullPointerException();
// Note: convention in all put/take/etc is to preset local var// holding count negative to indicate failure unless set.//设置计数为0,失败的时候返回intc=-1;
Node<E>node=newNode<E>(e);
finalReentrantLockputLock=this.putLock;
finalAtomicIntegercount=this.count;
//中断上锁putLock.lockInterruptibly();
try {
/** Note that count is used in wait guard even though it is* not protected by lock. This works because count can* only decrease at this point (all other puts are shut* out by lock), and we (or some other waiting put) are* signalled if it ever changes from capacity. Similarly* for all other uses of count in other wait guards.*///检查队列是否满了,满了进行阻塞操作while (count.get() ==capacity) {
notFull.await();
        }
//入队操作,将节点信息插入到队尾//last=last.next=nodeenqueue(node);
c=count.getAndIncrement();
//元素没有满,则唤醒被阻塞的线程,增加线程if (c+1<capacity)
notFull.signal();
    } finally {
//释放锁putLock.unlock();
    }
//插入的是一个元素时唤醒阻塞等待的线程if (c==0)
signalNotEmpty();
}


offer操作

//阻塞带超时时间的offer操作publicbooleanoffer(Ee, longtimeout, TimeUnitunit)
throwsInterruptedException {
if (e==null) thrownewNullPointerException();
longnanos=unit.toNanos(timeout);
intc=-1;
finalReentrantLockputLock=this.putLock;
finalAtomicIntegercount=this.count;
putLock.lockInterruptibly();
try {
while (count.get() ==capacity) {
//如果时间<0,则表示超时返回了,此时队列未满,直接返回if (nanos<=0)
returnfalse;
nanos=notFull.awaitNanos(nanos);
        }
//否者进行入队操作enqueue(newNode<E>(e));
c=count.getAndIncrement();
if (c+1<capacity)
notFull.signal();
    } finally {
putLock.unlock();
    }
if (c==0)
signalNotEmpty();
returntrue;
}
//首先进行非空校验,如果队满了,直接返回false//如果没有满,则进行上锁,同时进行判断,// 如果计数<容量,则进行入队操作//最后释放锁publicbooleanoffer(Ee) {
if (e==null) thrownewNullPointerException();
finalAtomicIntegercount=this.count;
if (count.get() ==capacity)
returnfalse;
intc=-1;
Node<E>node=newNode<E>(e);
finalReentrantLockputLock=this.putLock;
putLock.lock();
try {
if (count.get() <capacity) {
enqueue(node);
c=count.getAndIncrement();
if (c+1<capacity)
notFull.signal();
        }
    } finally {
putLock.unlock();
    }
if (c==0)
signalNotEmpty();
returnc>=0;
}


消费者操作

take操作

//take操作 消费消息//如果队列为非空或者被唤醒,进行消费操作,计数器-1publicEtake() throwsInterruptedException {
Ex;
intc=-1;
finalAtomicIntegercount=this.count;
finalReentrantLocktakeLock=this.takeLock;
takeLock.lockInterruptibly();
try {
while (count.get() ==0) {
notEmpty.await();
        }
x=dequeue();
c=count.getAndDecrement();
if (c>1)
notEmpty.signal();
    } finally {
takeLock.unlock();
    }
//开始消费if (c==capacity)
signalNotFull();
returnx;
}


pull操作

//进行消费操作 poll,带超时时间publicEpoll(longtimeout, TimeUnitunit) throwsInterruptedException {
Ex=null;
intc=-1;
longnanos=unit.toNanos(timeout);
finalAtomicIntegercount=this.count;
finalReentrantLocktakeLock=this.takeLock;
takeLock.lockInterruptibly();
try {
while (count.get() ==0) {
if (nanos<=0)
returnnull;
nanos=notEmpty.awaitNanos(nanos);
        }
x=dequeue();
c=count.getAndDecrement();
if (c>1)
notEmpty.signal();
    } finally {
takeLock.unlock();
    }
if (c==capacity)
signalNotFull();
returnx;
}
//进行poll操作publicEpoll() {
finalAtomicIntegercount=this.count;
if (count.get() ==0)
returnnull;
Ex=null;
intc=-1;
finalReentrantLocktakeLock=this.takeLock;
takeLock.lock();
try {
if (count.get() >0) {
x=dequeue();
c=count.getAndDecrement();
if (c>1)
notEmpty.signal();
            }
        } finally {
takeLock.unlock();
        }
if (c==capacity)
signalNotFull();
returnx;
    }


remove操作

//删除操作,释放指定节点信息publicbooleanremove(Objecto) {
if (o==null) returnfalse;
//对生产消息和消费消息进行上锁fullyLock();
try {
for (Node<E>trail=head, p=trail.next;
p!=null;
trail=p, p=p.next) {
if (o.equals(p.item)) {
//释放节点unlink(p, trail);
returntrue;
            }
        }
returnfalse;
    } finally {
fullyUnlock();
    }
}


drainTo操作

publicintdrainTo(Collection<?superE>c) {
returndrainTo(c, Integer.MAX_VALUE);
    }
//一次性地将队列中的全部元素消费完同时返回指定集合的信息,避免多次加锁造成的性能开销//其中c和maxElement表示返回的集合、要获取的元素个数publicintdrainTo(Collection<?superE>c, intmaxElements) {
if (c==null)
thrownewNullPointerException();
if (c==this)
thrownewIllegalArgumentException();
if (maxElements<=0)
return0;
booleansignalNotFull=false;
//进行上锁finalReentrantLocktakeLock=this.takeLock;
takeLock.lock();
try {
//拿到两者之间的最小的一个intn=Math.min(maxElements, count.get());
// count.get provides visibility to first n NodesNode<E>h=head;
inti=0;
try {
//将元素添加中集合c中while (i<n) {
Node<E>p=h.next;
c.add(p.item);
p.item=null;
h.next=h;
h=p;
++i;
                }
returnn;
            } finally {
// Restore invariants even if c.add() threwif (i>0) {
// assert h.item == null;head=h;
signalNotFull= (count.getAndAdd(-i) ==capacity);
                }
            }
        } finally {
takeLock.unlock();
if (signalNotFull)
signalNotFull();
        }
    }
目录
相关文章
|
存储 安全 Java
【面试题精讲】ArrayBlockingQueue 和 LinkedBlockingQueue 区别
【面试题精讲】ArrayBlockingQueue 和 LinkedBlockingQueue 区别
|
8月前
|
存储 负载均衡 安全
Java并发基础:ArrayBlockingQueue全面解析!
ArrayBlockingQueue类是一个高效、线程安全的队列实现,它基于数组,提供了快速的元素访问,并支持多线程间的同步操作,作为有界队列,它能有效防止内存溢出,并通过阻塞机制平衡生产者和消费者的速度差异,它还提供了公平性和非公平性策略,满足不同场景下的需求。
148 1
Java并发基础:ArrayBlockingQueue全面解析!
|
5月前
ArrayBlockingQueue原理解析
该文章主要讲述了ArrayBlockingQueue的实现原理。
|
8月前
|
存储 安全 Java
Java线程池ThreadPoolExcutor源码解读详解03-阻塞队列之LinkedBlockingQueue
LinkedBlockingQueue 和 ArrayBlockingQueue 是 Java 中的两种阻塞队列实现,它们的主要区别在于: 1. **数据结构**:ArrayBlockingQueue 采用固定大小的数组实现,而 LinkedBlockingQueue 则使用链表实现。 2. **容量**:ArrayBlockingQueue 在创建时必须指定容量,而 LinkedBlockingQueue 可以在创建时不指定容量,默认容量为 Integer.MAX_VALUE。 总结起来,如果需要高效并发且内存不是主要考虑因素,LinkedBlockingQueue 通常是更好的选择;
227 1
|
8月前
|
缓存 安全 Java
Java并发基础:SynchronousQueue全面解析!
SynchronousQueue的优点在于其直接性和高效性,它实现了线程间的即时数据交换,无需中间缓存,确保了数据传输的实时性和准确性,同时,其灵活的阻塞机制使得线程同步变得简单而直观,适用于需要精确协调的生产者-消费者模型。
184 0
Java并发基础:SynchronousQueue全面解析!
|
8月前
|
存储 监控 安全
Java并发基础:LinkedBlockingQueue全面解析!
LinkedBlockingQueue类是以链表结构实现高效线程安全队列,具有出色的并发性能、灵活的阻塞与非阻塞操作,以及适用于生产者和消费者模式的能力,此外,LinkedBlockingQueue还具有高度的可伸缩性,能够在多线程环境中有效管理数据共享,是提升程序并发性能和稳定性的关键组件。
193 0
Java并发基础:LinkedBlockingQueue全面解析!
|
存储 安全 Java
LinkedBlockingQueue 原理
LinkedBlockingQueue 原理
|
存储 安全 Java
《从面试题来看源码》-LinkedBlockingQueue 源码分析
《从面试题来看源码》-LinkedBlockingQueue 源码分析
《从面试题来看源码》-LinkedBlockingQueue 源码分析
|
存储 缓存 资源调度
线程池 ThreadPoolExecutor 原理及源码笔记
前面在学习 JUC 源码时,很多代码举例中都使用了线程池 ThreadPoolExecutor,并且在工作中也经常用到线程池,所以现在就一步一步看看,线程池的源码,了解其背后的核心原理。
180 0
|
Java
【JUC】JDK1.8源码分析之LinkedBlockingQueue(四)
 分析完了ArrayBlockingQueue后,接着分析LinkedBlockingQueue,与ArrayBlockingQueue不相同,LinkedBlockingQueue底层采用的是链表结构,其源码也相对比较简单,下面进行正式的分析。
84 0
【JUC】JDK1.8源码分析之LinkedBlockingQueue(四)