学习ConcurrentLinkedQueue

简介: 原因:学习ConcurrentLinkedQueue是看到akka框架的默认邮箱是使用ConcurrentLinkedQueue实现的。   1. ConcurrentLinkedQueue在java.

 

原因:学习ConcurrentLinkedQueue是看到akka框架的默认邮箱是使用ConcurrentLinkedQueue实现的。

 

1. ConcurrentLinkedQueue在java.util.concurrent包中(java 版本是1.7.0_71),类间集成关系如下:

public class ConcurrentLinkedQueue<E> extends AbstractQueue<E>
        implements Queue<E>, java.io.Serializable

ConcurrentLinkedQueue继承了抽象类AbstractQueue,AbstractQueue抽象类中的几个实现方法也都是利用Queue接口中的方法实现的。

Queue接口中定义的抽象方法有:

package java.util;

public interface Queue<E> extends Collection<E> {

    // 向队列中插入元素e,不验证队列空间限制条件下插入一个元素。如果队列有剩余空间,直接插入;如果队列满了,就抛出IllegalStateException异常
    boolean add(E e);
    
    // 同样是向队列中插入元素e。如果队列有空间限制,同add;如果队列没有空间限制,比如ConcurrentLinkedQueue,总是可以插入进去
    boolean offer(E e);
    
    // 返回并删除队列头部的第一个元素,remove()与poll()方法的不同在于,如果队列为空,remove()方法会抛出异常,而poll()方法是返回null
    E remove();
    
    // 返回并删除队列头部的第一元素,如果队列空,返回null
    E poll();
    
    // 返回但是不删除队头元素,element()方法与peek()方法的不同在于,如果队列为空,element()方法会抛出NoSuchElementException,而peek()方法返回null
    E element();
    
    // 返回队头元素,如果队列为空,返回null
    E peek();
} 

队列的操作无非就是上述的插入和删除操作,从上述方法的定义来看,优先使用offer()和poll(),因为不抛异常的方法比较容易处理。

 

2. ConcurrentLinkedQueue是什么?

    ConcurrentLinkedQueue是基于链接节点实现的无界的线程安全的先进先出的非阻塞的队列。其链接节点的结构为:

    private static class Node<E> {
        volatile E item;
        volatile Node<E> next;
    }

   每一个链接节点(Node)包含节点元素(item)和指向下一个节点的引用(next)。ConcurrentLinkedQueue包含一个头结点和一个尾节点

    // 头结点,所有后继节点都可以从head开始,使用succ()方法访问到
private transient volatile Node<E> head; // 尾节点, private transient volatile Node<E> tail;

  头尾节点都提到了succ()方法,succ()方法是

    // sicc()方法是返回节点p的后继节点。如果节点p的后继节点指向自己,则返回头结点。这种情况是如何发生的?(节点p已经不在链表中了?)
    final Node<E> succ(Node<E> p) {
        Node<E> next = p.next;
        return (p == next) ? head : next;
    }

 succ()方法主要用途有什么?

   (1). 求队列大小  

    // 返回队列中元素个数,可以看到元素个数是int类型, 如果元素个数超过了Integer.MAX_VALUE的话,也只能返回Integer.MAX_VALUE
    // 另外,这个方法返回的值是不精确的。当然我们不是来看size()方法的,是来看succ()方法是如何使用的。
    public int size() {
        int count = 0; 
        // 从第一个节点开始遍历,如果节点不为null,统计节点个数,然后使用succ()方法获取下一个节点
        for (Node<E> p = first(); p != null; p = succ(p))
            if (p.item != null)
                // Collection.size() spec says to max out
                if (++count == Integer.MAX_VALUE)
                    break;
        return count;
    }

  

   (2). contains()方法中succ()的用法与求队列大小类似

    public boolean contains(Object o) {
        if (o == null) return false;
        for (Node<E> p = first(); p != null; p = succ(p)) {
            E item = p.item;
            if (item != null && o.equals(item))
                return true;
        }
        return false;
    }

  

3. ConcurrentLinkedQueue的构造函数为:

    public ConcurrentLinkedQueue() {
        head = tail = new Node<E>(null);
    }

 从构造函数看,ConcurrentLinkedQueue的头结点是包含null元素的一个节点,并且初始条件下head节点指向tail节点。

 接下来看下head和tail是如何在offer()和poll()方法中怎么使用的。

// 插入元素到队尾
public boolean offer(E e) {
  // 检查元素e是否为null,如果为null,抛出NullPointerException
  checkNotNull(e);
  // 创建新节点newNode
  final java.util.concurrent.ConcurrentLinkedQueue.Node<E>
      newNode = new java.util.concurrent.ConcurrentLinkedQueue.Node<E>(e);
  // 首先赋值tail给t (t = tail),赋值t给p (p = t)
  // 然后执行死循环for(;;)
  for (java.util.concurrent.ConcurrentLinkedQueue.Node<E> t = tail, p = t;;) {
    // 将p的next赋值给q, p.next -> q
    java.util.concurrent.ConcurrentLinkedQueue.Node<E> q = p.next;
    // 如果q为null,表示p是尾节点
    if (q == null) {
      // p是尾节点,将新节点newNode赋值给p的next,p.next -> e(newNode)
      // 这个赋值过程是使用CAS来实现的,CAS比较并交换,意思就是如果newNode != null,则交换他们
      if (p.casNext(null, newNode)) {
        // 如果p != t,即p != t = tail,表示t(= tail)不是尾节点
        if (p != t)
          // 将t置为尾节点,该操作允许失败,因此t(= tail)并不总是尾节点
          // 因此需要执行for(;;),先找到尾节点
          casTail(t, newNode);  // Failure is OK.
        return true;
      }
      // Lost CAS race to another thread; re-read next
    }
    else if (p == q)
      // 如果p == q, 说明尾节点tail已经不在链表中了,
      // 这种情况下,跳转到head,因为从head开始所有的节点都可达
      p = (t != (t = tail)) ? t : head;
    else
      // 如果p == q且q == null,p指向q,即p跳转到下一个元素
      p = (p != t && t != (t = tail)) ? t : q;
  }
}

  

public E poll() {
  // 跳出for(;;)循环的标志位
  restartFromHead:
  for (;;) {
    // 首先赋值head给h (h = head),赋值h给p (p = h),并定义变量q
    // 然后执行死循环for(;;)
    for (java.util.concurrent.ConcurrentLinkedQueue.Node<E> h = head, p = h, q;;) {
      // 获取p的元素值,即头节点的元素值
      E item = p.item;
      // 如果元素值不为null,并将p的元素置null
      // casItem(item, null)意思是如果item != null,则交换两者
      // 交换之后,item就从队列中被移除了
      if (item != null && p.casItem(item, null)) {
        if (p != h) // 如果p不是指向h (head),更新head的值
          updateHead(h, ((q = p.next) != null) ? q : p);
        return item;
      }
      else if ((q = p.next) == null) { // 说明元素为空
        updateHead(h, p);
        return null;
      }
      else if (p == q)
        continue restartFromHead;
      else
        p = q;
    }
  }
}

 

4. 生产者消费者使用ConcurrentLinkedQueue

import java.util.concurrent.ConcurrentLinkedQueue;

public class ProducerAndConsumer {
  private static ConcurrentLinkedQueue queue = new ConcurrentLinkedQueue();


  static class Producer extends Thread {
    String name;

    public Producer(String name) {
      this.name = name;
    }

    public void run() {
      for (int i = 0; i < 10; i++) {
        queue.offer(i);
        System.out.println(name + " : " + i);
        try {
          Thread.sleep(1 * 1000);
        } catch (InterruptedException e) {
          e.printStackTrace();
        }
      }
    }
  }


  static class Consumer extends Thread {
    String name;

    public Consumer(String name) {
      this.name = name;
    }

    public void run() {
      for (;;) {
        Object item = queue.poll();
        System.out.println(name + " : " + item);
        try {
          Thread.sleep(1 * 1000);
        } catch (InterruptedException e) {
          e.printStackTrace();
        }
      }
    }
  }

  public static void main(String[] args) {
    new Producer("p1").start();
    new Producer("p2").start();
    new Consumer("c1").start();
    // new Consumer("c2").start();
  }
}

  

 

目录
相关文章
|
8月前
|
存储 安全 Java
Java线程池ThreadPoolExcutor源码解读详解02-阻塞队列之ArrayBlockingQueue
`ArrayBlockingQueue` 是Java中一个基于数组的并发队列,具有线程安全的性质。以下是其关键信息的摘要: - **继承实现关系**:它扩展了`AbstractQueue`并实现了`BlockingQueue`接口,确保线程安全的入队和出队操作。 - **数据结构**:内部由固定大小的数组支撑,有`takeIndex`和`putIndex`跟踪元素的添加和移除位置,`count`记录队列中的元素数量。 - **特点**:队列长度在创建时必须指定且不可变,遵循先进先出(FIFO)原则,当队列满时,添加元素会阻塞,空时,移除元素会阻塞。
84 0
|
18天前
|
存储 安全 Java
ConcurrentLinkedQueue详解
通过本文的介绍,希望您能够深入理解 `ConcurrentLinkedQueue`的工作原理、主要特性、常用方法以及实际应用,并在实际开发中灵活运用这些知识,编写出高效、健壮的并发程序。
24 3
|
4月前
|
存储 安全 算法
JUC集合: ConcurrentLinkedQueue详解
与此同时,它的无界特性在使用时需要注意,因为过多的数据累积可能会导致内存消耗过大。合理应用 `ConcurrentLinkedQueue` 不仅可以提升应用性能,还能提高程序在并发环境下的可靠性。在实际的开发过程中,合理选择适当的并发容器对于构建高效稳定的系统至关重要。
52 2
|
5月前
ArrayBlockingQueue原理
文章主要介绍了ArrayBlockingQueue的工作原理。ArrayBlockingQueue通过ReentrantLock和Condition实现了高效的阻塞队列,能够有效地避免CPU资源浪费。它非常适合用于生产者-消费者模型的应用场景,特别是需要控制生产者和消费者线程同步的场合。
|
7月前
|
存储 安全 Java
深入探索Java并发编程:ArrayBlockingQueue详解
深入探索Java并发编程:ArrayBlockingQueue详解
|
8月前
|
存储 缓存 Java
Java线程池ThreadPoolExcutor源码解读详解06-阻塞队列之SynchronousQueue
SynchronousQueue 是 Java 中的一个特殊阻塞队列,它没有容量,实现线程间的直接对象交换。这个队列的特点和优缺点如下: 1. **无容量限制**:SynchronousQueue 不存储任何元素,每个 put 操作必须等待一个 take 操作,反之亦然。这意味着生产者和消费者必须严格同步。 2. **阻塞性质**:当一个线程试图插入元素时,如果没有线程正在等待获取,那么插入操作会阻塞;同样,尝试获取元素的线程如果没有元素可取,也会被阻塞。 3. **公平与非公平策略**:SynchronousQueue 支持公平和非公平的线程调度策略。公平模式下,等待时间最长的线程优先
138 5
|
8月前
|
存储 安全 Java
Java线程池ThreadPoolExcutor源码解读详解03-阻塞队列之LinkedBlockingQueue
LinkedBlockingQueue 和 ArrayBlockingQueue 是 Java 中的两种阻塞队列实现,它们的主要区别在于: 1. **数据结构**:ArrayBlockingQueue 采用固定大小的数组实现,而 LinkedBlockingQueue 则使用链表实现。 2. **容量**:ArrayBlockingQueue 在创建时必须指定容量,而 LinkedBlockingQueue 可以在创建时不指定容量,默认容量为 Integer.MAX_VALUE。 总结起来,如果需要高效并发且内存不是主要考虑因素,LinkedBlockingQueue 通常是更好的选择;
226 1
|
8月前
并发编程之BlockingQueue(阻塞队列)的详细解析
并发编程之BlockingQueue(阻塞队列)的详细解析
31 0
|
算法 安全 Java
JUC第十七讲:JUC集合: ConcurrentLinkedQueue详解
JUC第十七讲:JUC集合: ConcurrentLinkedQueue详解
115 0
|
存储 Java 容器
JUC第十八讲:JUC集合-BlockingQueue 详解
JUC第十八讲:JUC集合-BlockingQueue 详解

热门文章

最新文章