并发队列简介

简介: 并发队列简介

并发包下面的并发容器中,ConcurrentLinkedQueue 这种 Concurrent 类型和的集合才真正代表并发。


640.png

  • Concurrent 类型基于 lock-free ,常见的多线程访问场景,一般可以提供高吞吐量。
  • LinkedBlockingQueue 内部基于锁实现,提供了BlockingQueue等特性方法。


java.util.concurrent 包并发容器分类



  • Concurrent
  • CopyOnWrite
  • Blocking


Concurrent 类型集合


  1. Concurrent 类型没有类似 CopyOnWrite 之类容器相对较重的修改开销。
  2. Concurrent 往往提供了较低的遍历一致性,可以理解所谓的弱一致性,例如,当迭代器遍历时,如果容器发生修改,迭代器仍然可以继续遍历。
  3. 弱一致性的另外一个体现是, size 等操作准确性有限的,未必是100%准确。
  4. 读取的性能具有一定的不确定性

Java 集合中有个线程不安全的队列 LinkedList 是个 Deque。


ConcurrentLinkedDeque 和 LinkedBlockingQueue 区别



能够理解 ConcurrentLinkedQueue 和 LinkedBlockingQueue 的主要区别。在常规队列操作基础上, Blocking意味着提供了特定的等待性操作。


阻塞队列  LinkedBlockingQueue


适用阻塞队列的好处,多线程操作共同的队列时不需要额外的同步。


  • void put(E e): 在队尾插入元素,方法在队列满的时候会阻塞直到有队列成员被消费
  • boolean offer(E e): 在队尾插入元素,方法在队列满的时候不会阻塞,直接返回 false
  • E take(): 取出并删除队列中的首元素,如果队列为空,会阻塞,直到有队列成员被放进来
  • E poll(): 取出并删除队列中的首元素,如果队列为空,则返回 null,不进行阻塞
  • E peek(): 取出第一个元素但是不删除它,没有就返回 null


非阻塞队列 ConcurrentLinkedQueue


允许多线程访问一个共同集合,可以使用 ConcurrentLinedQueue Queue 中元素按 FIFO 原则进行排序。采用CAS 操作,来保证元素的一致性。

  • boolean offer(E e): 在队尾插入元素,不进行阻塞
  • E poll(): 取出并删除队列中的首元素,不进行阻塞
  • E peek(): 取出第一个元素但是不删除它,不进行阻塞

ConcurrentLinkedDeque 和 LinkedBlockingDeque。Deque 的侧重点是支持对队列的头尾都信息插入和删除:

  • 尾部插入时需要的addLast(e)、 offerLast(e)。
  • 尾部删除所需要的removeLast()、 pollLast()。


阻塞队列 BlockingQueue 分类


  • ArrayBlockingQueue

ArrayBlockingQueue是最典型的的有界队列,其内部以final的数组保存数据,数组的大小就决定了队列的边界,所以我们在创建ArrayBlockingQueue时,都要指定容量,如:


public ArrayBlockingQueue(int capacity, boolean fair)
  • LinkedBlockingQueue

其行为和内部代码都是基于有界的逻辑实现的,只不过如果我们没有在创建队列时就指定容量,那么其容量限制就自动被设置为Integer.MAX_VALUE,成为了无界队列。

  • SynchronousQueue

每个删除操作都要等待插入操作,反之每个插入操作也都要等待删除动作。这个队列的容量是0。

  • PriorityBlockingQueue 是无边界的优先队列,虽然严格意义上来讲,其大小总归是要受系统资源影响。
  • DelayedQueue 和 LinkedTransferQueue

都是无边界队列,就是 put 操作永远不会发生其他 BlockingQueue 阻塞的情况。


LinkedBlockingQueue 实现原理


/** Lock held by take, poll, etc */
private fnal ReentrantLock takeLock = new ReentrantLock();
/** Wait queue for waiting takes */
private fnal Condition notEmpty = takeLock.newCondition();
/** Lock held by put, ofer, etc */
private fnal ReentrantLock putLock = new ReentrantLock();
/** Wait queue for waiting puts */
private fnal Condition notFull = putLock.newCondition();

LinkedBlockingQueue 改进了锁操作的力度,头,尾操作使用不同的锁,在通用的场景下,他的吞吐量相对要更好一些。


public E take() throws InterruptedException {
        E x;
        int c = -1;
        final AtomicInteger count = this.count;
        final ReentrantLock takeLock = this.takeLock;
        takeLock.lockInterruptibly();
        try {
            while (count.get() == 0) {
                notEmpty.await();
            }
            x = dequeue();
            c = count.getAndDecrement();
            if (c > 1)
                notEmpty.signal();
        } finally {
            takeLock.unlock();
        }
        if (c == capacity)
            signalNotFull();
        return x;
    }

ArrayBlockingQueue  take 实现原理


public E take() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            while (count == 0)
                notEmpty.await();
            return dequeue();
        } finally {
            lock.unlock();
        }
    }

队列的使用场景


队列的典型使用场景是 生产者 -消费者 场景。


import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
public class ConsumerProducer {
    public static final String EXIT_MSG = "Good bye!";
    public static void main(String[] args) {
        // 使用较小的队列,以更好地在输出中展示其影响
        BlockingQueue<String> queue = new ArrayBlockingQueue<>(3);
        Producer producer = new Producer(queue);
        Consumer consumer = new Consumer(queue);
        new Thread(producer).start();
        new Thread(consumer).start();
    }
    static class Producer implements Runnable {
        private BlockingQueue<String> queue;
        public Producer(BlockingQueue<String> q) {
            this.queue = q;
        }
        @Override
        public void run() {
            for (int i = 0; i < 20; i++) {
                try {
                    Thread.sleep(5L);
                    String msg = "Message" + i;
                    System.out.println("Produced new item: " + msg);
                    queue.put(msg);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            try {
                System.out.println("Time to say good bye!");
                queue.put(EXIT_MSG);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
    static class Consumer implements Runnable {
        private BlockingQueue<String> queue;
        public Consumer(BlockingQueue<String> q) {
            this.queue = q;
        }
        @Override
        public void run() {
            try {
                String msg;
                while (!EXIT_MSG.equalsIgnoreCase((msg = queue.take()))) {
                    System.out.println("Consumed item: " + msg);
                    Thread.sleep(10L);
                }
                System.out.println("Got exit message, bye!");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}




相关文章
|
5天前
|
存储 缓存 算法
10分钟从实现和使用场景聊聊并发包下的阻塞队列
10分钟从实现和使用场景聊聊并发包下的阻塞队列
|
5天前
|
负载均衡 Java 数据处理
【C++ 并发 线程池】轻松掌握C++线程池:从底层原理到高级应用(三)
【C++ 并发 线程池】轻松掌握C++线程池:从底层原理到高级应用
63 2
|
5天前
|
存储 监控 Java
【C++ 并发 线程池】轻松掌握C++线程池:从底层原理到高级应用(二)
【C++ 并发 线程池】轻松掌握C++线程池:从底层原理到高级应用
50 1
|
5天前
|
负载均衡 安全 Java
【C++ 并发 线程池】轻松掌握C++线程池:从底层原理到高级应用(一)
【C++ 并发 线程池】轻松掌握C++线程池:从底层原理到高级应用
77 2
|
5天前
|
存储 调度
FreeRTOS深入教程(队列内部机制和源码分析)
FreeRTOS深入教程(队列内部机制和源码分析)
73 0
|
9月前
|
安全 Java 容器
多线程案例(2)-阻塞式队列
多线程案例(2)-阻塞式队列
43 0
|
存储
什么是队列,如何实现?
什么是队列,如何实现?
84 0
什么是队列,如何实现?
|
缓存 网络协议 Java
【Java原理探索】教你如何使用「精巧好用」的DelayQueue(延时队列)
【Java原理探索】教你如何使用「精巧好用」的DelayQueue(延时队列)
137 0
|
消息中间件
消息队列:第四章:延迟检查队列
消息队列:第四章:延迟检查队列
128 0
消息队列:第四章:延迟检查队列
|
消息中间件 存储 NoSQL
基于Redis实现DelayQueue延迟队列设计方案(附源码)
基于Redis实现DelayQueue延迟队列设计方案(附源码)
基于Redis实现DelayQueue延迟队列设计方案(附源码)