- 原创 | Java 2021 超神之路,很肝~
- 中文详细注释的开源项目
- RPC 框架 Dubbo 源码解析
- 网络应用框架 Netty 源码解析
- 消息中间件 RocketMQ 源码解析
- 数据库中间件 Sharding-JDBC 和 MyCAT 源码解析
- 作业调度中间件 Elastic-Job 源码解析
- 分布式事务中间件 TCC-Transaction 源码解析
- Eureka 和 Hystrix 源码解析
- Java 并发源码
ArrayList和LinkedList有什么区别?
这种侮辱人的问题,默认就把这两者限定在了同一个场景之中,它甚至连八股文都算不上。
一旦你被问到这种问题,也证明面试基本上泡汤了--面试官已经实在是找不到其他问题与你交流了。
你Over了。
但当我们细看一下LinkedList的class定义,就会发现,它并不像是ArrayList的那样具有纯洁的列表精神。
public class ArrayList<E> extends AbstractList<E> implements List<E>, RandomAccess, Cloneable, java.io.Serializable //VS public class LinkedList<E> extends AbstractSequentialList<E> implements List<E>, Deque<E>, Cloneable, java.io.Serializable
LinkedList除了能够当作普通的列表,它还是一个Deque。双向链表,听着就比较唬人,这就是一个既能当做队列、又能当做栈的存在。
有了这种双重Buff的叠加,LinkedList的应用场景比ArrayList丰富的多。除了能做最简单的LRU缓存,LinkedList在刷题的时候也是充满了正能量。
王者ConcurrentLinkedQueue,一个阻塞的双向队列,它的基本操作方法有:(3[基本]x2[异常与返回值]+4[阻塞加超时])x3[队头队尾]=5x2x3=30,足足有30个方法。看完上面的文章,这30个方法可以很快手到擒来。
不过我们今天要聊的一个重点,是使用Deque来实现更快的延迟队列。
延迟队列
如果你想要某些数据延迟一段时间再进行处理,或者要再某段时间内按照分组进行一些计算,那延迟队列无疑是非常合适的。
在Java的Concurrent包里,就静悄悄的躺着DelayQueue。只要你的数据实现了Delayed接口,那么就可以放心大胆的把它们往里面塞。
可惜的是,DelayQueue的底层存储,使用的是PriorityQueue。
PriorityQueue是堆实现的,offer和poll数据的时间复杂度是O(logN)。这就意味着,当DelayQueue中的数据比较多的时候,它的性能就会下降。
除了把数据分片,使用多个DelayQueue来完成工作,我们有没有速度更快的方法?比如把PriorityQueue使用LinkedList来替换?
这要看场景。
如果你向DelayQueue中添加数据,是与当前添加的时间相关的,那完全可以使用LinkedList来替换PriorityQueue。
基于 Spring Boot + MyBatis Plus + Vue & Element 实现的后台管理系统 + 用户小程序,支持 RBAC 动态权限、多租户、数据权限、工作流、三方登录、支付、短信、商城等功能
关键代码
要了解DelayQueue的运行原理,我们可以需要先看一下Delayed接口。任何想要存储到DelayQueue中的数据,都需要实现这个接口。
其中,getDelay就是用来判断当前数据是否超时的方法。而compareTo,则是PriorityQueue用来排序的,如果我们是按照当前塞入数据的,则compareTo方法就不是必要的。
public long getDelay(@NotNull TimeUnit unit) { return MAX_CACHE_DUAL + this.enqueueTimeNs - System.nanoTime(); } public int compareTo(@NotNull Delayed o) { long d = getDelay(TimeUnit.NANOSECONDS) - o.getDelay(TimeUnit.NANOSECONDS); return (d == 0) ? 0 : (d < 0 ? -1 : 1); }
按照以上的思路,我们把DelayQueue的代码拷贝一份,仅保留关键代码,如下。
public class LightweightDelayedQueue<E extends Delayed> { private final transient ReentrantLock lock = new ReentrantLock(); private final LinkedList<E> q = new LinkedList<>(); private final Condition available = lock.newCondition(); private Thread leader; public void put(E e) { final ReentrantLock lock = this.lock; lock.lock(); try { q.offer(e); if (q.peek() == e) { leader = null; available.signal(); } } finally { lock.unlock(); } } public E take() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { for (; ; ) { E first = q.peek(); if (first == null) available.await(); else { long delay = first.getDelay(NANOSECONDS); if (delay <= 0L) return q.poll(); first = null; // don't retain ref while waiting if (leader != null) available.await(); else { Thread thisThread = Thread.currentThread(); leader = thisThread; try { available.awaitNanos(delay); } finally { if (leader == thisThread) leader = null; } } } } } finally { if (leader == null && q.peek() != null) available.signal(); lock.unlock(); } } public int size() { final ReentrantLock lock = this.lock; lock.lock(); try { return q.size(); } finally { lock.unlock(); } } public int drainTo(Collection<? super E> c, int maxElements) { Objects.requireNonNull(c); if (c == this) throw new IllegalArgumentException(); if (maxElements <= 0) return 0; final ReentrantLock lock = this.lock; lock.lock(); try { int n = 0; for (E first; n < maxElements && (first = q.peek()) != null && first.getDelay(NANOSECONDS) <= 0; ) { c.add(first); // In this order, in case add() throws. q.poll(); ++n; } return n; } finally { lock.unlock(); } } }
基于 Spring Cloud Alibaba + Gateway + Nacos + RocketMQ + Vue & Element 实现的后台管理系统 + 用户小程序,支持 RBAC 动态权限、多租户、数据权限、工作流、三方登录、支付、短信、商城等功能
主要方法
轻量级的延迟队列,如果一旦采用了LinkedList,那么它的入队、出队方法,就都变成了O(1)的时间复杂度。在延迟队列中的数据增加时,时间复杂度也能维持不变,可以说是速度快的连兔子都追不上了。
一般,在java中,put和take方法,都是代表阻塞性方法。比如,take方法,我们可以将其安全的阻塞在某个线程上,而不用担心太多的资源浪费。
final Thread thread = Thread.currentThread(); while (!this.close && !thread.isInterrupted()) { Data data = q.take(); }
这一切都是Condition办到的,它和wait、notify的作用是一样的。
当我们通过put方法添加新的数据到队列中,会通过signal方法,来通知等待的线程获取数据。
相同的,如果take方法发现队列中的数据为空,它将进入等待状态。如果有数据,也并不是马上把这些数据取出来,因为数据还没到期。比如最老的数据还剩下5秒才到期,代码也对这种情况进行了处理,它会尝试awaitNanos
对应的时间。
这样,我们就可以使用这种简单的轮询方式来实现延迟队列,而不需要单独的线程去检测队列中的数据。
增加take方法的效率
但是这样还不够。
当数据量比较大的时候,队列的数据可能有多条已经到期。如果我们通过take方法来一条一条获取的话,效率自然不如批量获取高。
drainTo方法,可以一股脑的把到期的数据转移到其他的集合中,但它并不是一个阻塞性的方法。
我们可以先使用take来阻塞线程,然后再批量取出所有数据。
下面代码,会一次性获取100条数据作为一个批量。
final Data takeItem = q.take(); final List<Data> buckets = new ArrayList<>(100); q.drainTo(buckets, 99); buckets.add(takeItem);
End
实际上,我们的某个业务,当采用LinkedList来替代PriorityQueue,并进行批量操作后,CPU的使用直接降低了1/3。
Deque是xjjdog最喜欢的一个接口。每当使用offerFirst、offerLast这样精准的API进行操作,都会体验到多巴胺的乐趣。LinkedList作为它的儿子,自然拥有了这些所有的功能。
当我们使用concurrent包里的基本API,对这些淳朴的工具进行封装,它们就会焕发出新的活力。