【并发容器精讲三、】并发队列Queue

本文涉及的产品
容器镜像服务 ACR,镜像仓库100个 不限时长
简介: 1. 为什么要使用队列- 用队列可以在线程间传递数据:生产者,消费者模式,银行转帐- 考虑锁等线程安全问题重任 转移到队列上# 2. 并发队列简介简单介绍各个并发并发队列的关系,并发队列是指线程安全的队列,包含:阻塞队列和非阻塞队列

@[toc]

1. 为什么要使用队列

  • 用队列可以在线程间传递数据:生产者,消费者模式,银行转帐
  • 考虑锁等线程安全问题重任 转移到队列上

2. 并发队列简介

简单介绍各个并发并发队列的关系,并发队列是指线程安全的队列,包含:阻塞队列和非阻塞队列,区别如下。

  • 阻塞队列:满了之后不能再插入,当队列为空的时候,读不到会阻塞
  • 非阻塞队列:和阻塞队列完全不一样的

3. 各并发队列关系图

在这里插入图片描述
彩蛋 IDEA 生成UML

在这里插入图片描述
会出现 UML 图
在这里插入图片描述

4. 阻塞队列 BlockingQueue

1.什么是阻塞队列
: 阻塞队列是具有阻塞功能到队列,所以他首先是一个队列,其次他具有阻塞功能
: 通常, 阻塞队列的一端是给生产者放数据用的,另一端给消费者那数据用的。阻塞队列是线程安全的所以 生产者 消费者都可以是多线程的
: 阻塞功能:最有特色的两个带有阻塞功能方法是

  • task()方法,获取并移除队列的头节点,一旦执行task 任务的时候,队列里无数据 则阻塞
  • put() 方法 插入元素,但是如果队列已满,那就无法继续插入,则阻塞,直到有空闲的空间
  • 我们需要考虑的是是否有界(容量有多大):这是一个非常重要的属性,无界队列意味着可以容纳非常多的一个数 Integer.MAX_VALUE)约为2的31次
  • 阻塞队列 和 线程池的关系:阻塞队列是线程池重要组成部分

2.主要方法介绍
: put 、 take
: add、 remove、 element
: offer 、pull 、peek

ArrayBlockingQueue
: 有界
: 指定容量
: 公平 :还可以指定是否公平,如果指定公平,那么等待时间较长的线程会被优先处理,不过会带来一定的性能消耗
案例 :10个人面试,一个面试官,3个位置可以休息,每个人面试时间是10秒,模拟所有人面试场景

 package com.yxl.task;


import lombok.SneakyThrows;
import org.omg.PortableInterceptor.INACTIVE;

import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingDeque;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;

public class ArrayBlockingQueueDemo {


    public static void main(String[] args) {
        ArrayBlockingQueue<String> queue= new ArrayBlockingQueue<String>(3);
        Interviewer interviewer = new Interviewer(queue);
        Consumer consumer = new Consumer(queue);
        new Thread(interviewer).start();
        new Thread(consumer).start();

    }



}


class Interviewer implements  Runnable{

    ArrayBlockingQueue<String> blockingDeque;

    public Interviewer(ArrayBlockingQueue blockingDeque) {
        this.blockingDeque = blockingDeque;
    }

    @Override
    public void run() {
        System.out.println("10个候选人");
        for (int i = 0; i < 10 ; i++) {
            try {
                blockingDeque.put("wo"+i);
                System.out.println("安排好了"+ i);
            }catch (Exception e){
                e.getMessage();
            }
        }
        try {
            blockingDeque.put("stop");
        }catch (Exception e){
            e.getMessage();
        }

    }
}


class Consumer implements  Runnable{

    ArrayBlockingQueue<String>  blockingDeque;

    public Consumer(ArrayBlockingQueue blockingDeque) {
        this.blockingDeque = blockingDeque;
    }

    @SneakyThrows
    @Override
    public void run() {
        Thread.sleep(1000);
        String take ;
        while (!(take = blockingDeque.take()).equals("stop")){
            System.out.println("打印"+take);
        }
            System.out.println("结束 ");
    }
}

运行结果
在这里插入图片描述
LinkedBlockingQueue
: 无界
: 容量可以达到 Integet.MAX_VALUE
: 内部结果 Node 、两把锁、分析put 方法

源码

 /**
     * Linked list node class
     */
     //有一个一个的Node
    static class Node<E> {
        E item;

        /**
         * One of:
         * - the real successor Node
         * - this Node, meaning the successor is head.next
         * - null, meaning there is no successor (this is the last node)
         */
        Node<E> next;

        Node(E x) { item = x; }
    }

两把锁

//lock 锁 take 和  poll的锁
 /** Lock held by take, poll, etc */
    private final ReentrantLock takeLock = new ReentrantLock();

//put 和 offer 的锁
 /** Lock held by put, offer, etc */
    private final ReentrantLock putLock = new ReentrantLock();

put() 方法 源码

/**
     * Inserts the specified element at the tail of this queue, waiting if
     * necessary for space to become available.
     *
     * @throws InterruptedException {@inheritDoc}
     * @throws NullPointerException {@inheritDoc}
     */
    public void put(E e) throws InterruptedException {
// 判断数据是否为空
        if (e == null) throw new NullPointerException();
        // Note: convention in all put/take/etc is to preset local var
        // holding count negative to indicate failure unless set.
        int c = -1;
       //将数据放入Node中
        Node<E> node = new Node<E>(e);
        
        final ReentrantLock putLock = this.putLock;
        final AtomicInteger count = this.count;
        //加锁
        putLock.lockInterruptibly();
        try {
            /*
             * Note that count is used in wait guard even though it is
             * not protected by lock. This works because count can
             * only decrease at this point (all other puts are shut
             * out by lock), and we (or some other waiting put) are
             * signalled if it ever changes from capacity. Similarly
             * for all other uses of count in other wait guards.
             */
             //如果满了 等待
            while (count.get() == capacity) {
                notFull.await();
            }
            //否则存队列
            enqueue(node);
            c = count.getAndIncrement();
            if (c + 1 < capacity)
                notFull.signal();
        } finally {
            putLock.unlock();
        }
        if (c == 0)
            signalNotEmpty();
    }

PriorityBlockingQueue
: 支持优先级
: 自然顺序 (不是先进先出的)
: 无界队列
: PriorityQueue 的一个安全的版本

SynchronusQueue
: 他的容量为0
: SynchronusQueue的容量不是1 而是0,因为他不需要去持有元素,他是直接传递的
: 效率很高
: 是一个很好的用来直接传递的并发数据结构

在这里插入图片描述

5. 非阻塞队列

  • 并发包中的非阻塞队列只有ConcurrentLinkedQueue这一种,顾名思义ConcurrentLinkedQueue是使用链表作为其数据结构的,使用CAS非阻塞算法实现线程安全(不具备阻塞功能)适用于性能比较高的要求

源码

 /**
     * Inserts the specified element at the tail of this queue.
     * As the queue is unbounded, this method will never return {@code false}.
     *
     * @return {@code true} (as specified by {@link Queue#offer})
     * @throws NullPointerException if the specified element is null
     */
    public boolean offer(E e) {
        checkNotNull(e);
        final Node<E> newNode = new Node<E>(e);
// for 死循环 做cas
        for (Node<E> t = tail, p = t;;) {
            Node<E> q = p.next;
            if (q == null) {
                // p is last node
                // cas 
                if (p.casNext(null, newNode)) {
                    // Successful CAS is the linearization point
                    // for e to become an element of this queue,
                    // and for newNode to become "live".
                    if (p != t) // hop two nodes at a time
                        casTail(t, newNode);  // Failure is OK.
                    return true;
                }
                // Lost CAS race to another thread; re-read next
            }
            else if (p == q)
                // We have fallen off list.  If tail is unchanged, it
                // will also be off-list, in which case we need to
                // jump to head, from which all live nodes are always
                // reachable.  Else the new tail is a better bet.
                p = (t != (t = tail)) ? t : head;
            else
                // Check for tail updates after two hops.
                p = (p != t && t != (t = tail)) ? t : q;
        }
    }



       // 运用 cas算法   compareAndSwapObject
        boolean casNext(Node<E> cmp, Node<E> val) {
            return UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);
        }

6. 如何选择适合的队列

从这几个方面选择

  • 边界
  • 空间
  • 吞吐量
相关文章
|
2月前
|
存储 安全 Java
Java多线程编程中的并发容器:深入解析与实战应用####
在本文中,我们将探讨Java多线程编程中的一个核心话题——并发容器。不同于传统单一线程环境下的数据结构,并发容器专为多线程场景设计,确保数据访问的线程安全性和高效性。我们将从基础概念出发,逐步深入到`java.util.concurrent`包下的核心并发容器实现,如`ConcurrentHashMap`、`CopyOnWriteArrayList`以及`BlockingQueue`等,通过实例代码演示其使用方法,并分析它们背后的设计原理与适用场景。无论你是Java并发编程的初学者还是希望深化理解的开发者,本文都将为你提供有价值的见解与实践指导。 --- ####
|
6月前
|
安全 程序员 C++
C++一分钟之-C++中的并发容器
【7月更文挑战第17天】C++11引入并发容器,如`std::shared_mutex`、`std::atomic`和线程安全的集合,以解决多线程中的数据竞争和死锁。常见问题包括原子操作的误用、锁的不当使用和迭代器失效。避免陷阱的关键在于正确使用原子操作、一致的锁管理以及处理迭代器失效。通过示例展示了如何安全地使用这些工具来提升并发编程的安全性和效率。
80 1
|
6月前
|
缓存 安全 Java
Java中的并发容器:ConcurrentHashMap详解
Java中的并发容器:ConcurrentHashMap详解
|
6月前
|
安全 Java 容器
第一篇:并发容器学习开篇介绍
第一篇:并发容器学习开篇介绍
49 4
|
6月前
|
存储 安全 算法
(九)深入并发编程之并发容器:阻塞队列、写时复制容器、锁分段容器原理详谈
相信大家在学习JavaSE时都曾接触过容器这一内容,一般Java中的容器可分为四类:Map、List、Queue以及Set容器,而在使用过程中,对于ArrayList、HashMap等这类容器都是经常使用的,但问题在于这些容器在并发环境下都会存在线程安全问题。
|
7月前
|
设计模式 存储 C++
【C++/STL】:stack/queue的使用及底层剖析&&双端队列&&容器适配器
【C++/STL】:stack/queue的使用及底层剖析&&双端队列&&容器适配器
82 2
|
7月前
|
C++ 容器
C++之Queue容器
C++之Queue容器
|
6月前
|
设计模式 存储 缓存
【C++】详解STL容器之一的deque和适配器stack,queue
【C++】详解STL容器之一的deque和适配器stack,queue
|
8月前
|
存储 算法 C语言
从C语言到C++_19(容器适配器+stack和queue模拟实现+优先级队列priority_queue)(下)
从C语言到C++_19(容器适配器+stack和queue模拟实现+优先级队列priority_queue)
62 2
|
8月前
|
安全 Java 容器
Java一分钟之-并发编程:并发容器(ConcurrentHashMap, CopyOnWriteArrayList)
【5月更文挑战第18天】本文探讨了Java并发编程中的`ConcurrentHashMap`和`CopyOnWriteArrayList`,两者为多线程数据共享提供高效、线程安全的解决方案。`ConcurrentHashMap`采用分段锁策略,而`CopyOnWriteArrayList`适合读多写少的场景。注意,`ConcurrentHashMap`的`forEach`需避免手动同步,且并发修改时可能导致`ConcurrentModificationException`。`CopyOnWriteArrayList`在写操作时会复制数组。理解和正确使用这些特性是优化并发性能的关键。
74 1