并发编程从入门到放弃系列开始和结束(五)

简介: 对于 Java 部分的面试来说,突然想到并发这一块的内容是不太完整的,这篇文章会通篇把多线程和并发都大致阐述一遍,至少能够达到了解原理和使用的目的,内容会比较多,从最基本的线程到我们常用的类会统一说一遍,慢慢看。

阻塞队列

并发编程中,队列是其中不可缺少的一环,其实前面在说到线程池的时候,就已经提及到了阻塞队列了,这里我们要一起看看 JUC 包下提供的这些队列。

82245bcdda81b4cd09f37655759efa3b.jpg阻塞队列

阻塞队列中的阻塞包含两层意思:

  1. 插入的时候,如果阻塞队列满,插入元素阻塞
  2. 删除/查询的时候,如果阻塞队列空,删除/查询元素阻塞

下面列出队列的一些插入和删除元素的方法,一个个来说:

add:向队列尾部插入元素,插入成功返回 true,队列满则抛出IllegalStateException("Queue full")异常

offer:向队列尾部插入元素,队列满返回 false,否则返回 true,带超时的则是会阻塞,达到超时时间后返回

put:向队列尾部插入元素,队列满会一直阻塞

remove:删除队列头部元素,删除成功返回 true,队列空则抛出NoSuchElementException异常

poll:删除队列头部元素,删除成功返回队列头部元素,队列空返回null,带超时的则是会阻塞,达到超时时间后返回

take:删除队列头部元素,队列空会一直阻塞

element:查询队列头部元素,并且返回,队列空则抛出NoSuchElementException异常

peek:查询队列头部元素,并且返回

6b14206e53b14647c86c253cdb33d7ea.jpg

ArrayBlockingQueue

ArrayBlockingQueue 从名字就知道,基于数组实现的有界阻塞队列,基于AQS支持公平和非公平策略。

还是看构造函数吧,可以传入初始数组大小,一旦设置之后大小就不能改变了,传参可以支持公平和非公平,最后一个构造函数可以支持传入集合进行初始化,但是长度不能超过 capacity,否则抛出ArrayIndexOutOfBoundsException异常。

public ArrayBlockingQueue(int capacity);
public ArrayBlockingQueue(int capacity, boolean fair);
public ArrayBlockingQueue(int capacity, boolean fair, Collection<? extends E> c);

这个其实在上面介绍 Condition 的时候我们就已经实现过他了,这里就不再说了,可以参考上面 Condition 的部分。

LinkedBlockingQueue


LinkedBlockingQueue 基于链表实现的有界阻塞队列。

使用无参构造函数则链表长度为 Integer.MAX_VALUE,另外两个构造函数和 ArrayBlockingQueue 差不多。

public LinkedBlockingQueue();
public LinkedBlockingQueue(int capacity);
public LinkedBlockingQueue(Collection<? extends E> c);

我们可以看看 put 和 take 的源码。

  1. 首先加锁中断
  2. 然后判断如果达到了队列的最大长度,那么就阻塞等待,否则就把元素插入到队列的尾部
  3. 注意这里和 ArrayBlockingQueue 有个区别,这里再次做了一次判断,如果队列没满,唤醒因为 put 阻塞的线程,为什么要做判断,因为他们不是一把锁
  4. 最后的逻辑是一样的,notEmpty 唤醒
public void put(E e) throws InterruptedException {
    if (e == null) throw new NullPointerException();
    int c = -1;
    Node<E> node = new Node<E>(e);
    final ReentrantLock putLock = this.putLock;
    final AtomicInteger count = this.count;
    putLock.lockInterruptibly();
    try {
        while (count.get() == capacity) {
            notFull.await();
        }
        enqueue(node);
        c = count.getAndIncrement();
        if (c + 1 < capacity)
            notFull.signal();
    } finally {
        putLock.unlock();
    }
    if (c == 0)
        signalNotEmpty();
}
private void enqueue(Node<E> node) {
   // assert putLock.isHeldByCurrentThread();
   // assert last.next == null;
   last = last.next = node;
}
private void signalNotEmpty() {
   final ReentrantLock takeLock = this.takeLock;
   takeLock.lock();
   try {
     notEmpty.signal();
   } finally {
     takeLock.unlock();
   }
}

take的逻辑也是非常类似啊。

  1. 加锁中断
  2. 判断队列是不是空了,空了的话就阻塞等待,否则就从队列移除一个元素
  3. 然后再次做一次判断,队列要是不空,就唤醒阻塞的线程
  4. 最后唤醒 notFull 的线程
public E take() throws InterruptedException {
    E x;
    int c = -1;
    final AtomicInteger count = this.count;
    final ReentrantLock takeLock = this.takeLock;
    takeLock.lockInterruptibly();
    try {
        while (count.get() == 0) {
            notEmpty.await();
        }
        x = dequeue();
        c = count.getAndDecrement();
        if (c > 1)
            notEmpty.signal();
    } finally {
        takeLock.unlock();
    }
    if (c == capacity)
        signalNotFull();
    return x;
}
private E dequeue() {
   // assert takeLock.isHeldByCurrentThread();
   // assert head.item == null;
   Node<E> h = head;
   Node<E> first = h.next;
   h.next = h; // help GC
   head = first;
   E x = first.item;
   first.item = null;
   return x;
}
private void signalNotFull() {
   final ReentrantLock putLock = this.putLock;
   putLock.lock();
   try {
     notFull.signal();
   } finally {
     putLock.unlock();
   }
}

PriorityBlockingQueue


PriorityBlockingQueue 是支持优先级的无界阻塞队列,默认排序按照自然排序升序排列。

几个构造函数,无参构造函数初始容量为11,可以自定义,也可以在创建的时候传入 comparator 自定义排序规则。

public PriorityBlockingQueue();
public PriorityBlockingQueue(int initialCapacity);
public PriorityBlockingQueue(int initialCapacity, Comparator<? super E> comparator);
public PriorityBlockingQueue(Collection<? extends E> c);

直接看 put 和 take 方法吧,后面都这样,其他的就忽略好了,找到 put 之后,发现直接就是调用的 offer,那我们就直接看 offer 的实现。

  1. 首先还是加锁,然后看当前元素个数是否达到了数组的上限,到了就调用 tryGrow 去扩容。
  2. 看是否实现了 Comparator 接口,是的话就用 Comparator 去排序,否则就用 Comparable 去比较,如果两个都没有,会报错
  3. notEmpty 唤醒,最后解锁
public void put(E e) {
  offer(e); // never need to block
}
public boolean offer(E e) {
    if (e == null)
        throw new NullPointerException();
    final ReentrantLock lock = this.lock;
    lock.lock();
    int n, cap;
    Object[] array;
    while ((n = size) >= (cap = (array = queue).length))
        tryGrow(array, cap);
    try {
        Comparator<? super E> cmp = comparator;
        if (cmp == null)
            siftUpComparable(n, e, array);
        else
            siftUpUsingComparator(n, e, array, cmp);
        size = n + 1;
        notEmpty.signal();
    } finally {
        lock.unlock();
    }
    return true;
}

这里,我们要继续关注一下这个扩容的逻辑,到底是怎么处理的?代码不长,但是看着很方的样子。

  1. 首先,先释放锁,因为下面用 CAS 处理,估计怕扩容时间太长阻塞的线程太多
  2. 然后 CAS 修改 allocationSpinLock 为1
  3. CAS 成功的话,进行扩容的逻辑,如果长度小于64就扩容一倍,否则扩容一半
  4. 之前我们说他无界,其实不太对,这里就判断是否超过了最大长度,MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8,判断一下有可能会抛出内存溢出异常
  5. 然后创建一个新的对象数组,并且 allocationSpinLock 重新恢复为0
  6. 执行了一次 Thread.yield(),让出 CPU,因为有可能其他线程正在扩容,让大家争抢一下
  7. 最后确保新的对象数组创建成功了,也就是扩容是没有问题的,再次加锁,数组拷贝,结束
private void tryGrow(Object[] array, int oldCap) {
        lock.unlock(); // must release and then re-acquire main lock
        Object[] newArray = null;
        if (allocationSpinLock == 0 &&
            UNSAFE.compareAndSwapInt(this, allocationSpinLockOffset,
                                     0, 1)) {
            try {
                int newCap = oldCap + ((oldCap < 64) ?
                                       (oldCap + 2) : // grow faster if small
                                       (oldCap >> 1));
                if (newCap - MAX_ARRAY_SIZE > 0) {    // possible overflow
                    int minCap = oldCap + 1;
                    if (minCap < 0 || minCap > MAX_ARRAY_SIZE)
                        throw new OutOfMemoryError();
                    newCap = MAX_ARRAY_SIZE;
                }
                if (newCap > oldCap && queue == array)
                    newArray = new Object[newCap];
            } finally {
                allocationSpinLock = 0;
            }
        }
        if (newArray == null) // back off if another thread is allocating
            Thread.yield();
        lock.lock();
        if (newArray != null && queue == array) {
            queue = newArray;
            System.arraycopy(array, 0, newArray, 0, oldCap);
        }
}

take 的逻辑基本一样,最多有个排序的逻辑在里面,就不再多说了。

public E take() throws InterruptedException {
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    E result;
    try {
        while ( (result = dequeue()) == null)
            notEmpty.await();
    } finally {
        lock.unlock();
    }
    return result;
}

DelayQueue


DelayQueue 是支持延时的无界阻塞队列,这个在我们聊 ScheduledThreadPoolExecutor 也谈到过,里面也使用了延迟队列,只不过是它自己的一个内部类,DelayQueue 内部其实使用 PriorityQueue 来实现。

DelayQueue 的用法是添加元素的时候可以设置一个延迟时间,当时间到了之后才能从队列中取出来,使用 DelayQueue 中的对象必须实现 Delayed 接口,重写 getDelay 和  compareTo 方法,就像这样,那实现其实可以看 ScheduledThreadPoolExecutor 里面是怎么做的,这里我就不管那么多,示意一下就好了。

public class Test {
    public static void main(String[] args) throws Exception {
        DelayQueue<User> delayQueue = new DelayQueue<>();
        delayQueue.put(new User(1, "a"));
    }
    @Data
    @NoArgsConstructor
    @AllArgsConstructor
    static class User implements Delayed {
        private Integer id;
        private String username;
        @Override
        public long getDelay(TimeUnit unit) {
            return 0;
        }
        @Override
        public int compareTo(Delayed o) {
            return 0;
        }
    }
}

我们可以看看他的属性和构造函数,呐看到了吧,使用的 PriorityQueue,另外构造函数比较简单了,不说了。

private final transient ReentrantLock lock = new ReentrantLock();
private final PriorityQueue<E> q = new PriorityQueue<E>();
private Thread leader = null;
private final Condition available = lock.newCondition();
public DelayQueue();
public DelayQueue(Collection<? extends E> c);

OK,没啥毛病,这里我们要先看 take 方法,不能先看 put,否则我觉得闹不明白。

  1. 来第一步加锁,如果头结点是空的,也就是队列是空的话,阻塞,没啥好说的
  2. 反之队列有东西,我们就要去取了嘛,但是这里要看对象自己实现的 getDelay 方法获得延迟的时间,如果延迟的时间小于0,那说明到时间了,可以执行了,poll 返回
  3. 第一次,leader 线程肯定是空的,线程阻塞 delay 的时间之后才开始执行,完全没毛病,然后 leader 重新 置为 null
  4. 当 leader 不是 null 的时候,说明其他线程在操作了,所以阻塞等待唤醒
  5. 最后,leader 为 null,唤醒阻塞中的线程,解锁
public E take() throws InterruptedException {
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        for (;;) {
            E first = q.peek();
            if (first == null)
                available.await();
            else {
                long delay = first.getDelay(NANOSECONDS);
                if (delay <= 0)
                    return q.poll();
                first = null; // don't retain ref while waiting
                if (leader != null)
                    available.await();
                else {
                    Thread thisThread = Thread.currentThread();
                    leader = thisThread;
                    try {
                        available.awaitNanos(delay);
                    } finally {
                        if (leader == thisThread)
                            leader = null;
                    }
                }
            }
        }
    } finally {
        if (leader == null && q.peek() != null)
            available.signal();
        lock.unlock();
    }
}

然后再来看 put 就会简单多了,put 还是直接调用的 offer,看 offer 方法。

这里使用的是 PriorityQueue 的 offer 方法,其实和我们上面说到的 PriorityBlockingQueue 差不多,不再多说了,添加到队列头部之后,leader 置为 null,唤醒,结束了。

public boolean offer(E e) {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        q.offer(e);
        if (q.peek() == e) {
            leader = null;
            available.signal();
        }
        return true;
    } finally {
        lock.unlock();
    }
}

SynchronousQueue&LinkedTransferQueue


为什么这两个放一起说呢。。。因为这源码真的不想在这里说一遍,这俩源码可以单独出一个专题来写,长篇精悍文章不适合他他们,就简单先了解下。

SynchronousQueue 是一个不存储元素的阻塞队列,每个 put  必须等待 take,否则不能继续添加元素。

如果你还记得我们上面说到线程池的地方,newCachedThreadPool 默认就是使用的 SynchronousQueue。

他就两个构造方法,你一看就知道,对吧,支持公平和非公平,当然你也别问默认是啥,问就是非公平。

public SynchronousQueue();
public SynchronousQueue(boolean fair);

主要靠内部抽象类 Transferer,他的实现主要有两个,TransferQueue 和 TransferStack。

注意:如果是公平模式,使用的是 TransferQueue 队列,非公平则使用 TransferStack 栈。

abstract static class Transferer<E> {
 abstract E transfer(E e, boolean timed, long nanos);
}

LinkedTransferQueue 是链表组成的无界阻塞队列,看他内部类就知道了,这是个链表实现。

static final class Node {
    final boolean isData;   // 标记生产者或者消费者
    volatile Object item;   // 值
    volatile Node next;   // 下一个节点
    volatile Thread waiter;
}

LinkedBlockingDeque


LinkedBlockingDeque 是链表组成的双向阻塞队列,它支持从队列的头尾进行进行插入和删除元素。

构造函数有3个,不传初始容量就是 Integer 最大值。

public LinkedBlockingDeque() {
 this(Integer.MAX_VALUE);
}
public LinkedBlockingDeque(int capacity);
public LinkedBlockingDeque(Collection<? extends E> c);

看下双向链表的结构:

static final class Node<E> {
    E item;
    Node<E> prev;
    Node<E> next;
    Node(E x) {
        item = x;
    }
}

因为是双向链表,所以比其他的队列多了一些方法,比如 add、addFirst、addLast,add 其实就是 addLast,offer、put 也是类似。

我们可以区分看一下 putFirst 和 putLast ,主要区别就是 linkFirst 和 linkLast,分别去队列头部和尾部添加新节点,其他基本一致。

public void putFirst(E e) throws InterruptedException {
        if (e == null) throw new NullPointerException();
        Node<E> node = new Node<E>(e);
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            while (!linkFirst(node))
                notFull.await();
        } finally {
            lock.unlock();
        }
}
public void putLast(E e) throws InterruptedException {
  if (e == null) throw new NullPointerException();
   Node<E> node = new Node<E>(e);
   final ReentrantLock lock = this.lock;
   lock.lock();
   try {
    while (!linkLast(node))
     notFull.await();
   } finally {
    lock.unlock();
   }
}

结尾

本次长篇内容参考书籍和文档

  1. Java 并发编程的艺术
  2. Java 并发编程之美
  3. Java 并发编程实战
  4. Java 8实战
  5. 极客时间:Java 并发编程实战
相关文章
|
8月前
|
NoSQL 前端开发 Java
剑指JUC原理-20.并发编程实践(中)
剑指JUC原理-20.并发编程实践
73 0
|
8月前
|
消息中间件 存储 Java
剑指JUC原理-20.并发编程实践(下)
剑指JUC原理-20.并发编程实践
68 0
|
存储 缓存 算法
并发编程系列教程(12) - Disruptor框架
并发编程系列教程(12) - Disruptor框架
102 0
|
8月前
|
消息中间件 canal Java
剑指JUC原理-20.并发编程实践(上)
剑指JUC原理-20.并发编程实践
70 0
|
算法 编译器 程序员
[笔记]C++并发编程实战 《一》你好,C++的并发世界(二)
[笔记]C++并发编程实战 《一》你好,C++的并发世界(二)
|
存储 缓存 算法
并发编程基础
并发编程基础
|
Java 调度
并发编程从入门到放弃系列开始和结束(一)
对于 Java 部分的面试来说,突然想到并发这一块的内容是不太完整的,这篇文章会通篇把多线程和并发都大致阐述一遍,至少能够达到了解原理和使用的目的,内容会比较多,从最基本的线程到我们常用的类会统一说一遍,慢慢看。
1643 3
并发编程从入门到放弃系列开始和结束(一)
|
安全 Java 索引
并发编程从入门到放弃系列开始和结束(三)
对于 Java 部分的面试来说,突然想到并发这一块的内容是不太完整的,这篇文章会通篇把多线程和并发都大致阐述一遍,至少能够达到了解原理和使用的目的,内容会比较多,从最基本的线程到我们常用的类会统一说一遍,慢慢看。
并发编程从入门到放弃系列开始和结束(三)
|
缓存 安全 Java
并发编程从入门到放弃系列开始和结束(二)
对于 Java 部分的面试来说,突然想到并发这一块的内容是不太完整的,这篇文章会通篇把多线程和并发都大致阐述一遍,至少能够达到了解原理和使用的目的,内容会比较多,从最基本的线程到我们常用的类会统一说一遍,慢慢看。
并发编程从入门到放弃系列开始和结束(二)