@[toc]
1. 为什么要使用队列
- 用队列可以在线程间传递数据:生产者,消费者模式,银行转帐
- 考虑锁等线程安全问题重任 转移到队列上
2. 并发队列简介
简单介绍各个并发并发队列的关系,并发队列是指线程安全的队列,包含:阻塞队列和非阻塞队列,区别如下。
- 阻塞队列:满了之后不能再插入,当队列为空的时候,读不到会阻塞
- 非阻塞队列:和阻塞队列完全不一样的
3. 各并发队列关系图
彩蛋 IDEA 生成UML
会出现 UML 图
4. 阻塞队列 BlockingQueue
1.什么是阻塞队列
: 阻塞队列是具有阻塞功能到队列,所以他首先是一个队列,其次他具有阻塞功能
: 通常, 阻塞队列的一端是给生产者放数据用的,另一端给消费者那数据用的。阻塞队列是线程安全的所以 生产者 消费者都可以是多线程的
: 阻塞功能:最有特色的两个带有阻塞功能方法是
- task()方法,获取并移除队列的头节点,一旦执行task 任务的时候,队列里无数据 则阻塞
- put() 方法 插入元素,但是如果队列已满,那就无法继续插入,则阻塞,直到有空闲的空间
- 我们需要考虑的是是否有界(容量有多大):这是一个非常重要的属性,无界队列意味着可以容纳非常多的一个数 Integer.MAX_VALUE)约为2的31次
- 阻塞队列 和 线程池的关系:阻塞队列是线程池重要组成部分
2.主要方法介绍
: put 、 take
: add、 remove、 element
: offer 、pull 、peek
ArrayBlockingQueue
: 有界
: 指定容量
: 公平 :还可以指定是否公平,如果指定公平,那么等待时间较长的线程会被优先处理,不过会带来一定的性能消耗
案例 :10个人面试,一个面试官,3个位置可以休息,每个人面试时间是10秒,模拟所有人面试场景
package com.yxl.task;
import lombok.SneakyThrows;
import org.omg.PortableInterceptor.INACTIVE;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingDeque;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
public class ArrayBlockingQueueDemo {
public static void main(String[] args) {
ArrayBlockingQueue<String> queue= new ArrayBlockingQueue<String>(3);
Interviewer interviewer = new Interviewer(queue);
Consumer consumer = new Consumer(queue);
new Thread(interviewer).start();
new Thread(consumer).start();
}
}
class Interviewer implements Runnable{
ArrayBlockingQueue<String> blockingDeque;
public Interviewer(ArrayBlockingQueue blockingDeque) {
this.blockingDeque = blockingDeque;
}
@Override
public void run() {
System.out.println("10个候选人");
for (int i = 0; i < 10 ; i++) {
try {
blockingDeque.put("wo"+i);
System.out.println("安排好了"+ i);
}catch (Exception e){
e.getMessage();
}
}
try {
blockingDeque.put("stop");
}catch (Exception e){
e.getMessage();
}
}
}
class Consumer implements Runnable{
ArrayBlockingQueue<String> blockingDeque;
public Consumer(ArrayBlockingQueue blockingDeque) {
this.blockingDeque = blockingDeque;
}
@SneakyThrows
@Override
public void run() {
Thread.sleep(1000);
String take ;
while (!(take = blockingDeque.take()).equals("stop")){
System.out.println("打印"+take);
}
System.out.println("结束 ");
}
}
运行结果
LinkedBlockingQueue
: 无界
: 容量可以达到 Integet.MAX_VALUE
: 内部结果 Node 、两把锁、分析put 方法
源码
/**
* Linked list node class
*/
//有一个一个的Node
static class Node<E> {
E item;
/**
* One of:
* - the real successor Node
* - this Node, meaning the successor is head.next
* - null, meaning there is no successor (this is the last node)
*/
Node<E> next;
Node(E x) { item = x; }
}
两把锁
//lock 锁 take 和 poll的锁
/** Lock held by take, poll, etc */
private final ReentrantLock takeLock = new ReentrantLock();
//put 和 offer 的锁
/** Lock held by put, offer, etc */
private final ReentrantLock putLock = new ReentrantLock();
put() 方法 源码
/**
* Inserts the specified element at the tail of this queue, waiting if
* necessary for space to become available.
*
* @throws InterruptedException {@inheritDoc}
* @throws NullPointerException {@inheritDoc}
*/
public void put(E e) throws InterruptedException {
// 判断数据是否为空
if (e == null) throw new NullPointerException();
// Note: convention in all put/take/etc is to preset local var
// holding count negative to indicate failure unless set.
int c = -1;
//将数据放入Node中
Node<E> node = new Node<E>(e);
final ReentrantLock putLock = this.putLock;
final AtomicInteger count = this.count;
//加锁
putLock.lockInterruptibly();
try {
/*
* Note that count is used in wait guard even though it is
* not protected by lock. This works because count can
* only decrease at this point (all other puts are shut
* out by lock), and we (or some other waiting put) are
* signalled if it ever changes from capacity. Similarly
* for all other uses of count in other wait guards.
*/
//如果满了 等待
while (count.get() == capacity) {
notFull.await();
}
//否则存队列
enqueue(node);
c = count.getAndIncrement();
if (c + 1 < capacity)
notFull.signal();
} finally {
putLock.unlock();
}
if (c == 0)
signalNotEmpty();
}
PriorityBlockingQueue
: 支持优先级
: 自然顺序 (不是先进先出的)
: 无界队列
: PriorityQueue 的一个安全的版本
SynchronusQueue
: 他的容量为0
: SynchronusQueue的容量不是1 而是0,因为他不需要去持有元素,他是直接传递的
: 效率很高
: 是一个很好的用来直接传递的并发数据结构
5. 非阻塞队列
- 并发包中的非阻塞队列只有ConcurrentLinkedQueue这一种,顾名思义ConcurrentLinkedQueue是使用链表作为其数据结构的,使用CAS非阻塞算法实现线程安全(不具备阻塞功能)适用于性能比较高的要求
源码
/**
* Inserts the specified element at the tail of this queue.
* As the queue is unbounded, this method will never return {@code false}.
*
* @return {@code true} (as specified by {@link Queue#offer})
* @throws NullPointerException if the specified element is null
*/
public boolean offer(E e) {
checkNotNull(e);
final Node<E> newNode = new Node<E>(e);
// for 死循环 做cas
for (Node<E> t = tail, p = t;;) {
Node<E> q = p.next;
if (q == null) {
// p is last node
// cas
if (p.casNext(null, newNode)) {
// Successful CAS is the linearization point
// for e to become an element of this queue,
// and for newNode to become "live".
if (p != t) // hop two nodes at a time
casTail(t, newNode); // Failure is OK.
return true;
}
// Lost CAS race to another thread; re-read next
}
else if (p == q)
// We have fallen off list. If tail is unchanged, it
// will also be off-list, in which case we need to
// jump to head, from which all live nodes are always
// reachable. Else the new tail is a better bet.
p = (t != (t = tail)) ? t : head;
else
// Check for tail updates after two hops.
p = (p != t && t != (t = tail)) ? t : q;
}
}
// 运用 cas算法 compareAndSwapObject
boolean casNext(Node<E> cmp, Node<E> val) {
return UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);
}
6. 如何选择适合的队列
从这几个方面选择
- 边界
- 空间
- 吞吐量