💪🏻 制定明确可量化的目标,坚持默默的做事。
一、继承实现关系图
二、低层数据存储结构
构造器:
public SynchronousQueue() { this(false); } public SynchronousQueue(boolean fair) { transferer = fair ? new SynchronousQueue.TransferQueue<E>() : new SynchronousQueue.TransferStack<E>(); }
说明:
- 默认底层使用栈存储数据结构
- 可传入true来使用队列作为底层数据结构
三、特点及优缺点
- SynchronousQueue 一个阻塞队列,其中每个put操作必须阻塞等待另一个线程take,反之先take操作也会进入自旋等待另一个线程put
- 同步队列没有任何内部容量,甚至没有一个容量
- 不允许空元素
- 在一个线程中运行的对象必须与在另一个线程中运行的对象同步,以便向其传递一些信息,事件或任务
- 支持公平策略和非公策略
- 不允许像ArrayBlockingQueue一样如peek()查看
- 不允许遍历
四、源码详解
核心方法
put和take都调 Transferer.transfer方法
4.1 TransferStack.transfer
非公平策略是TransferStack.transfer方法,源码及注释如下
/** * put <br/> * timed = false, nanos = 0 <br/> * take <br/> * timed = false, nanos = 0 <br/> */ E transfer(E e, boolean timed, long nanos) { /* * 循环尝试以下三种动作之一: * * 1.如果明显为空或已经包含相同模式的节点,则尝试将节点压入堆栈并等待匹配,返回它,或者如果取消则返回null。 * 2.如果明显包含互补模式节点,则尝试将满足节点推入堆栈,与相应的等待节点匹配,从堆栈中弹出,并返回匹配项。由于其他线程正在执行操作3,匹配或取消链接实际上可能并不需要 * 3.如果栈顶已经拥有另一个满足的节点,通过执行匹配和/或弹出操作来帮助它,然后继续。帮助的代码和满足的代码本质上是一样的,除非没有返回。 */ SNode s = null; // 如果e为null,则为REQUEST模式,否则为DATA模式 // 入口为put方法,e为非空;入口take方法,e为空 int mode = (e == null) ? REQUEST : DATA; for (;;) { // 头节点情况分类 // 1:为空,说明队列中还没有数据 // 2:非空,并且是 take 类型的,说明头节点线程正等着拿数据 // 3:非空,并且是 put 类型的,说明头节点线程正等着放数据 SNode h = head; // 栈头为空,说明队列中还没有数据。 // 栈头非空且栈头的类型和本次操作一致 // 比如都是 put,那么就把本次 put 操作放到该栈头的前面即可,让本次 put 能够先执行 if (h == null || h.mode == mode) { // empty or same-mode // 设置了超时时间,并且 e 进栈或者出栈要超时了, // 就会丢弃本次操作,返回 null 值。 // 如果栈头此时被取消了,丢弃栈头,取下一个节点继续消费 if (timed && nanos <= 0) { // 无法等待 // 栈头操作被取消 if (h != null && h.isCancelled()) // 丢弃栈头,把栈头的后一个元素作为栈头 casHead(h, h.next); // 将取消的节点弹栈 // 栈头为空,直接返回 null else return null; // 没有超时,直接把 e 作为新的栈头 } else if (casHead(h, s = snode(s, e, h, mode))) { // e 等待出栈,一种是空队列 take,一种是 put // take:阻塞取数据,当取到m时 m不为s // put:若调offer设置了超时间(超时未被取数据),且为超时返回时 m == s,则s应该被清除。如果未超时被取走了,则 m != s SNode m = awaitFulfill(s, timed, nanos); if (m == s) { clean(s); return null; } // 进入这里逻辑,栈头已被取出,则s不再是栈头 // 本来 s 是栈头的,现在 s 不是栈头了,s 后面又来了一个数,把新的数据作为栈头 if ((h = head) != null && h.next == s) casHead(h, s.next); // help s's fulfiller return (E) ((mode == REQUEST) ? m.item : s.item); } // 栈头正在等待其他线程 put 或 take // 比如栈头正在阻塞,并且是 put 类型,而此次操作正好是 take 类型,走此处 } else if (!isFulfilling(h.mode)) { // try to fulfill // 栈头已经被取消,把下一个元素作为栈头 if (h.isCancelled()) // already cancelled casHead(h, h.next); // pop and retry // snode 方法第三个参数 h 代表栈头,赋值给 s 的 next 属性 else if (casHead(h, s=snode(s, e, h, FULFILLING|mode))) { for (;;) { // loop until matched or waiters disappear // m 就是栈头,通过上面 snode 方法刚刚赋值 SNode m = s.next; // m is s's match if (m == null) { // all waiters are gone casHead(s, null); // pop fulfill node s = null; // use new node next time break; // restart main loop } SNode mn = m.next; // tryMatch 非常重要的方法,两个作用: // 1 唤醒被阻塞的栈头 m,2 把当前节点 s 赋值给 m 的 match 属性 // 这样栈头 m 被唤醒时,就能从 m.match 中得到本次操作 s // 其中 s.item 记录着本次的操作节点,也就是记录本次操作的数据 if (m.tryMatch(s)) { casHead(s, mn); // pop both s and m return (E) ((mode == REQUEST) ? m.item : s.item); } else // lost match s.casNext(m, mn); // help unlink } } } else { // help a fulfiller SNode m = h.next; // m is h's match if (m == null) // waiter is gone casHead(h, null); // pop fulfilling node else { SNode mn = m.next; if (m.tryMatch(h)) // help match casHead(h, mn); // pop both h and m else // lost match h.casNext(m, mn); // help unlink } } } }
执行过程:
- 判断是 put 方法还是 take 方法
- 判断栈头数据是否为空,如果为空或者栈头的操作和本次操作一致,是的话走 3,否则走 5
- 判断操作有无设置超时时间,如果设置了超时时间并且已经超时,返回 null,否则走 4
- 如果栈头为空,把当前操作设置成栈头,或者栈头不为空,但栈头的操作和本次操作相同,也把当前操作设置成栈头,并看看其它线程能否满足自己,不能满足则阻塞自己。比如当前操作是 take,但队列中没有数据,则阻塞自己
- 如果栈头已经是阻塞住的,需要别人唤醒的,判断当前操作能否唤醒栈头,可以唤醒走 6,否则走 4
- 把自己当作一个节点,赋值到栈头的 match 属性上,并唤醒栈头节点
- 栈头被唤醒后,拿到 match 属性,就是把自己唤醒的节点的信息,返回。
4.2 awaitFulFill阻塞方法
/** * 旋转/阻止,直到节点s通过执行操作匹配。 * @param s 等待的节点 * @param timed true if timed wait * @param nanos 超时时间 * @return 匹配的节点, 或者是 s 如果被取消 */ SNode awaitFulfill(SNode s, boolean timed, long nanos) { // deadline 死亡时间,如果设置了超时时间的话,死亡时间等于当前时间 + 超时时间,否则就是 0 final long deadline = timed ? System.nanoTime() + nanos : 0L; Thread w = Thread.currentThread(); // 自旋的次数,如果设置了超时时间,会自旋 32 次,否则自旋 512 次。 // 比如本次操作是 take 操作,自旋次数后,仍无其他线程 put 数据 // 就会阻塞,有超时时间的,会阻塞固定的时间,否则一致阻塞下去 int spins = (shouldSpin(s) ? (timed ? maxTimedSpins : maxUntimedSpins) : 0); for (;;) { // 当前线程有无被打断,如果过了超时时间,当前线程就会被打断 if (w.isInterrupted()) s.tryCancel(); SNode m = s.match; if (m != null) return m; if (timed) { nanos = deadline - System.nanoTime(); // 超时了,取消当前线程的等待操作 if (nanos <= 0L) { s.tryCancel(); continue; } } // 自选次数减1 if (spins > 0) spins = shouldSpin(s) ? (spins-1) : 0; // 把当前线程设置成 waiter,主要是通过线程来完成阻塞和唤醒 else if (s.waiter == null) s.waiter = w; // establish waiter so can park next iter else if (!timed) // park 阻塞 LockSupport.park(this); else if (nanos > spinForTimeoutThreshold) LockSupport.parkNanos(this, nanos); } }
五、作用及使用场景
- 创建线程池 Executors.newCachedThreadPool使用SynchronousQueue
- 试课课程:试课结束前15分钟通知老师
- 空闲连接延迟自动关闭
- 缓存:超过时间自动清除
- 超时处理、业务办理排队叫号、插队和枪购活动等