Java多线程高并发学习笔记——阻塞队列

简介: 在探讨可重入锁之后,接下来学习阻塞队列,这篇文章也是断断续续的写了很久,因为最近开始学ssm框架,准备做一个自己的小网站,后续可能更新自己写网站的技术分享。 请尊重作者劳动成果,转载请标明原文链接: http://www.cnblogs.com/superfj/p/7757876.html 阻塞队列是什么? 首先了解队列,队列是数据先进先出的一种数据结构。

在探讨可重入锁之后,接下来学习阻塞队列,这篇文章也是断断续续的写了很久,因为最近开始学ssm框架,准备做一个自己的小网站,后续可能更新自己写网站的技术分享。

请尊重作者劳动成果,转载请标明原文链接:

http://www.cnblogs.com/superfj/p/7757876.html

阻塞队列是什么?

首先了解队列,队列是数据先进先出的一种数据结构。阻塞队列,关键字是阻塞,先理解阻塞的含义,在阻塞队列中,线程阻塞有这样的两种情况:

1.当阻塞队列为空时,获取队列元素的线程将等待,直到该则塞队列非空;2.当阻塞队列变满时,使用该阻塞队列的线程会等待,直到该阻塞队列变成非满。

为什么要使用阻塞队列?

在常见的情况下,生产者消费者模式需要用到队列,生产者线程生产数据,放进队列,然后消费从队列中获取数据,这个在单线程的情况下没有问题。但是当多线程的情况下,某个特定时间下,(峰值高并发)出现消费者速度远大于生产者速度,消费者必须阻塞来等待生产者,以保证生产者能够生产出新的数据;当生产者速度远大于消费者速度时,同样也是一个道理。这些情况都要程序员自己控制阻塞,同时又要线程安全和运行效率。

阻塞队列的出现使得程序员不需要关心这些细节,比如什么时候阻塞线程,什么时候唤醒线程,这些都由阻塞队列完成了。

阻塞队列的主要方法

 阻塞队列的方法,在不能立即满足但可能在将来某一时刻满足的情况下,按处理方式可以分为三类:

抛出异常:抛出一个异常;

特殊值:返回一个特殊值(null或false,视情况而定)

则塞:在成功操作之前,一直阻塞线程

超时:放弃前只在最大的时间内阻塞

 

工欲善其事必先利其器,学会用阻塞队列,必须要知道它有哪些方法,怎么用,有哪些注意事项,这样到真正使用的时候,就能少踩雷了。

首先介绍插入操作:

1.public abstract boolean add(E paramE);

 将指定元素插入此队列中(如果立即可行且不会违反容量限制),成功时返回 true,如果当前没有可用的空间,则抛出 IllegalStateException。

如果该元素是NULL,则会抛出NullPointerException异常。

 

2.public abstract boolean offer(E paramE);

将指定元素插入此队列中(如果立即可行且不会违反容量限制),成功时返回 true,如果当前没有可用的空间,则返回 false。

 

3.public abstract void put(E paramE) throws InterruptedException;

 将指定元素插入此队列中,将等待可用的空间(如果有必要)

 

4.offer(E o, long timeout, TimeUnit unit)

可以设定等待的时间,如果在指定的时间内,还不能往队列中加入BlockingQueue,则返回失败。

 

获取数据操作:

1.poll(time):取走BlockingQueue里排在首位的对象,若不能立即取出,则可以等time参数规定的时间,取不到时返回null;

2.poll(long timeout, TimeUnit unit):从BlockingQueue取出一个队首的对象,如果在指定时间内,队列一旦有数据可取,则立即返回队列中的数据。否则知道时间

超时还没有数据可取,返回失败。

3.take():取走BlockingQueue里排在首位的对象,若BlockingQueue为空,阻断进入等待状态直到BlockingQueue有新的数据被加入; 

4.drainTo():一次性从BlockingQueue获取所有可用的数据对象(还可以指定获取数据的个数),通过该方法,可以提升获取数据效率;不需要多次分批加锁或释放锁。

 

重点方法重点介绍

首先来看put方法

public void put(E paramE) throws InterruptedException {
        checkNotNull(paramE);
        ReentrantLock localReentrantLock = this.lock; localReentrantLock.lockInterruptibly(); try { while (this.count == this.items.length) this.notFull.await(); enqueue(paramE); localReentrantLock.unlock(); } finally { localReentrantLock.unlock(); } }

一行一行来看代码,首先进行空校验。checkNotNull(paramE);

private static void checkNotNull(Object paramObject) {
        if (paramObject != null) return; throw new NullPointerException(); }

这是一个私有方法,需要注意的就是如果put的参数为空,则抛出空指针异常。(这个很值得我们学习,先进行空校验,在维护的时候就很容易定位错误),接着 ReentrantLock localReentrantLock = this.lock;实例化锁,这个ReentrantLock 在我之前的博客中也介绍过,可以共同探讨一下。

下一行localReentrantLock.lockInterruptibly();这里特别强调一下:

lockInterruptibly()允许在等待时由其他线程的Thread.interrupt()方法来中断等待线程而直接返回,这时是不用获取锁的,而会抛出一个InterruptException。而ReentrantLock.lock()方法则不允许Thread.interrupt()中断,即使检测到了Thread.interruptted一样会继续尝试获取锁,失败则继续休眠。只是在最后获取锁成功之后在把当前线程置为interrupted状态。

 

注意这里已经锁住,每次进行此操作时时候只有一个线程,回到代码中,接着进行

while (this.count == this.items.length)
          this.notFull.await();

这里向我们说明了一个信息,当队列满的时候,将会等待。这里使用了private final Condition notFull;这个实例化的Condition,这个Condition用来控制队列满的等待。

 接着执行了enqueue(paramE)方法,进入这个方法来继续看

private void enqueue(E paramE) {
        Object[] arrayOfObject = this.items;
        arrayOfObject[this.putIndex] = paramE; if (++this.putIndex == arrayOfObject.length) this.putIndex = 0; this.count += 1; this.notEmpty.signal(); }

来看第一行,Object[] arrayOfObject = this.items;这个items是在构造器时候实例化的,final Object[] items = new Object[paramInt];将item赋值到arrayObject中

继续 arrayOfObject[this.putIndex] = paramE;将put方法传入的参数赋值到arrayOfObject中,这里其实是items也改变了,因为java是值引用的缘故。

if (++this.putIndex == arrayOfObject.length)
            this.putIndex = 0;

如果这个偏移值+1之后等于数组的长度,那么偏移值变为0。this.count += 1;count值加1;这个count代表数组的总数。this.notEmpty.signal();唤醒被Condition notEmpty阻塞的方法,最后 localReentrantLock.unlock();解锁(这个操作不能够忘了)

这里不禁要问,是什么方法被阻塞了呢?带着这个疑问来看take方法。

 

public E take() throws InterruptedException {
        ReentrantLock localReentrantLock = this.lock;
        localReentrantLock.lockInterruptibly();
        try { while (this.count == 0) this.notEmpty.await(); Object localObject1 = dequeue(); return localObject1; } finally { localReentrantLock.unlock(); } }

 首先看前两行,和put方法一样先上锁,使得每次持有本段代码的时候只有一个线程。

while (this.count == 0)
   this.notEmpty.await();

当数组的数量为空时,也就是无任何数据供区出来的时候,notEmpty这个Condition就会进行阻塞,知道被notEmpty唤醒,还记得上文提到的吗。就是在put方法中唤醒的,这里可以发现,只要成功进行一个put操作,就会唤醒一次。

 继续看代码,接着执行Object localObject1 = dequeue();获取元素,跟进dequeue()方法继续:

private E dequeue() {
        Object[] arrayOfObject = this.items;
        Object localObject = arrayOfObject[this.takeIndex];
        arrayOfObject[this.takeIndex] = null;
        if (++this.takeIndex == arrayOfObject.length)
            this.takeIndex = 0;
        this.count -= 1;
        if (this.itrs != null)
            this.itrs.elementDequeued();
        this.notFull.signal();
        return localObject;
    }

Object[] arrayOfObject = this.items;进行值传递操作,takeIndex是取元素的时候的偏移值,由此可见,put和take操作的偏移量分别是由putIndex和takeIndex控制的。

Object localObject = arrayOfObject[this.takeIndex];取出在数组中的数据,然后 arrayOfObject[this.takeIndex] = null;将原来位置的数据b变成null.

if (++this.takeIndex == arrayOfObject.length)
            this.takeIndex = 0;

如果当前的++takeIndex等于该数组的长度,则takeIndex赋值0,结合put方法,这两个操作是用数组形成队列操作。接着唤醒持有notFull这个Condition的线程。

方法就总结到这里,其实看put和take是有很多相似之处的,继续看下一章节。

常见的阻塞队列

首先来看这张图,这个是阻塞队列的继承图(双端队列,没有列出来,没有太大区别)

主要有ArrayBlockingQueue,LinkedBlockingQueue,PriorityBlockingQueue,SynchronousQueue,DelayQueue这个五个实现类。

在这五个阻塞队列中,比较常用的是ArrayBlockingQueue,LinkedBlockingQueue,本文也会重点介绍这两个类。

ArrayBlockingQueue

在上面的源码分析中就是分析的ArrayBlockingQueue的源码。数组阻塞队列必须传入的参数是数组大小,还可以指定是否公平性。公平性就是当队列可用时,线程访问队列的顺序按照它排队时候的顺序,非公平锁则不按照这样的顺序,但是非公平队列要比公平队列执行的速度快。

继续看ArrayBlockingQueue其实是一个数组有界队列,此队列按照先进先出的原则维护数组中的元素顺序,看源码可知,是由两个整形变量(上文提到的putIndex和takeIndex)分别指着头和尾的位置。

LinkedBlockingQueue

LinkedBlockingQueue是基于链表的阻塞队列,内部维持的数据缓冲队列是由链表组成的,也是按照先进先出的原则。

如果构造一个LinkedBlockingQueue对象,而没有指定其容量大小,LinkedBlockingQueue会默认一个类似无限大小(Integer.Max_VALUE)的容量,这样的话,如果生产者的速度一旦大于消费者的速度,也许还没有等到队列满阻塞产生,系统内存就有可能已经被消耗殆尽了。

LinkedBlockingQueue之所以能够高效的处理并发数据,是因为take()方法和put(E param)方法使用了不同的可重入锁,分别为private final ReentrantLock putLock和private final ReentrantLock takeLock,这也意味着在高并发的情况下生产者和消费者可以并行地操作队列中的数据,以此来提高整个队列的并发性能。

 

两者对比

1.ArrayBlockingQueue在put,take操作使用了同一个锁,两者操作不能同时进行,而LinkedBlockingQueue使用了不同的锁,put操作和take操作可同时进行。

2.ArrayBlockingQueue和LinkedBlockingQueue间还有一个明显的不同之处在于,前者在插入或删除元素时不会产生或销毁任何额外的对象实例,而后者则会生成一个额外的Node对象,这在长时间内需要高效并发地处理大批量数据的系统中,其对于GC的影响还是存在一定的区别。

 

 其他还有优先级阻塞队列:PriorityBlockingQueue延时队列:DelayQueue,SynchronousQueue等,因为使用频率较低,这里就不重点介绍了,有兴趣的读者可以深入研究。

 

用阻塞队列实现生产者消费者

模拟洗盘子的经历,洗碗工洗好一个盘子放在工作台上,然后厨师看到工作台上有空余的盘子,便使用盘子。写到代码里就是洗碗工就是一个生产者线程,厨师就是消费者线程,工作台就是阻塞队列。

public class TestBlockingQueue {
    /**
     * 生产和消费业务操作
     * 
     * @author tang
     *
     */
    protected class WorkDesk {

        BlockingQueue<String> desk = new LinkedBlockingQueue<String>(10);

        public void washDish() throws InterruptedException {
            desk.put("洗好一个盘子");
        }

        public String useDish() throws InterruptedException {
            return desk.take();
        }
    }

    /**
     * 生产者类
     * 
     * @author tang
     *
     */
    class Producer implements Runnable {

        private String producerName;
        private WorkDesk workDesk;

        public Producer(String producerName, WorkDesk workDesk) {
            this.producerName = producerName;
            this.workDesk = workDesk;
        }

        @Override
        public void run() {
            try {
                for (;;) {
                    System.out.println(producerName + "洗好一个盘子");
                    workDesk.washDish();
                    Thread.sleep(1000);
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }

    /**
     * 消费者类
     * 
     * @author tang
     *
     */
    class Consumer implements Runnable {
        private String consumerName;
        private WorkDesk workDesk;

        public Consumer(String consumerName, WorkDesk workDesk) {
            this.consumerName = consumerName;
            this.workDesk = workDesk;
        }

        @Override
        public void run() {
            try {
                for (;;) {
                    System.out.println(consumerName + "使用一个盘子");
                    workDesk.useDish();
                    Thread.sleep(1000);
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }

    public static void main(String args[]) throws InterruptedException {
        TestBlockingQueue testQueue = new TestBlockingQueue();
        WorkDesk workDesk = testQueue.new WorkDesk();

        ExecutorService service = Executors.newCachedThreadPool();
        //四个生产者线程
        Producer producer1 = testQueue.new Producer("生产者-1-", workDesk);
        Producer producer2 = testQueue.new Producer("生产者-2-", workDesk);
        Producer producer3 = testQueue.new Producer("生产者-3-", workDesk);
        Producer producer4 = testQueue.new Producer("生产者-4-", workDesk);
        //两个消费者线程
        Consumer consumer1 = testQueue.new Consumer("消费者-1-", workDesk);
        Consumer consumer2 = testQueue.new Consumer("消费者-2-", workDesk);
        
        service.submit(producer1);
        service.submit(producer2);
        service.submit(producer3);
        service.submit(producer4);
        service.submit(consumer1);
        service.submit(consumer2);

    }

}

查看打印结果:

总的来说生产者的速度是会大于消费者的速度的,但是因为阻塞队列的缘故,所以我们不需要控制阻塞,当阻塞队列满的时候,生产者线程就会被阻塞,直到不再满。反之亦然,当消费者线程多于生产者线程时,消费者速度大于生产者速度,当队列为空时,就会阻塞消费者线程,直到队列非空。

参考资料:

《Java编程思想》

http://www.cnblogs.com/studyLog-share/p/5390745.html

http://www.cnblogs.com/dolphin0520/p/3932906.html

 

个人博客网站 http://www.janti.cn
相关文章
|
7天前
|
安全 Java 开发者
深入解读JAVA多线程:wait()、notify()、notifyAll()的奥秘
在Java多线程编程中,`wait()`、`notify()`和`notifyAll()`方法是实现线程间通信和同步的关键机制。这些方法定义在`java.lang.Object`类中,每个Java对象都可以作为线程间通信的媒介。本文将详细解析这三个方法的使用方法和最佳实践,帮助开发者更高效地进行多线程编程。 示例代码展示了如何在同步方法中使用这些方法,确保线程安全和高效的通信。
27 9
|
10天前
|
存储 安全 Java
Java多线程编程的艺术:从基础到实践####
本文深入探讨了Java多线程编程的核心概念、应用场景及其实现方式,旨在帮助开发者理解并掌握多线程编程的基本技能。文章首先概述了多线程的重要性和常见挑战,随后详细介绍了Java中创建和管理线程的两种主要方式:继承Thread类与实现Runnable接口。通过实例代码,本文展示了如何正确启动、运行及同步线程,以及如何处理线程间的通信与协作问题。最后,文章总结了多线程编程的最佳实践,为读者在实际项目中应用多线程技术提供了宝贵的参考。 ####
|
7天前
|
监控 安全 Java
Java中的多线程编程:从入门到实践####
本文将深入浅出地探讨Java多线程编程的核心概念、应用场景及实践技巧。不同于传统的摘要形式,本文将以一个简短的代码示例作为开篇,直接展示多线程的魅力,随后再详细解析其背后的原理与实现方式,旨在帮助读者快速理解并掌握Java多线程编程的基本技能。 ```java // 简单的多线程示例:创建两个线程,分别打印不同的消息 public class SimpleMultithreading { public static void main(String[] args) { Thread thread1 = new Thread(() -> System.out.prin
|
10天前
|
Java
JAVA多线程通信:为何wait()与notify()如此重要?
在Java多线程编程中,`wait()` 和 `notify()/notifyAll()` 方法是实现线程间通信的核心机制。它们通过基于锁的方式,使线程在条件不满足时进入休眠状态,并在条件满足时被唤醒,从而确保数据一致性和同步。相比其他通信方式,如忙等待,这些方法更高效灵活。 示例代码展示了如何在生产者-消费者模型中使用这些方法实现线程间的协调和同步。
24 3
|
8天前
|
安全 Java
Java多线程集合类
本文介绍了Java中线程安全的问题及解决方案。通过示例代码展示了使用`CopyOnWriteArrayList`、`CopyOnWriteArraySet`和`ConcurrentHashMap`来解决多线程环境下集合操作的线程安全问题。这些类通过不同的机制确保了线程安全,提高了并发性能。
|
10天前
|
Java UED
Java中的多线程编程基础与实践
【10月更文挑战第35天】在Java的世界中,多线程是提升应用性能和响应性的利器。本文将深入浅出地介绍如何在Java中创建和管理线程,以及如何利用同步机制确保数据一致性。我们将从简单的“Hello, World!”线程示例出发,逐步探索线程池的高效使用,并讨论常见的多线程问题。无论你是Java新手还是希望深化理解,这篇文章都将为你打开多线程的大门。
|
10天前
|
安全 Java 编译器
Java多线程编程的陷阱与最佳实践####
【10月更文挑战第29天】 本文深入探讨了Java多线程编程中的常见陷阱,如竞态条件、死锁、内存一致性错误等,并通过实例分析揭示了这些陷阱的成因。同时,文章也分享了一系列最佳实践,包括使用volatile关键字、原子类、线程安全集合以及并发框架(如java.util.concurrent包下的工具类),帮助开发者有效避免多线程编程中的问题,提升应用的稳定性和性能。 ####
38 1
|
1月前
|
存储 消息中间件 资源调度
C++ 多线程之初识多线程
这篇文章介绍了C++多线程的基本概念,包括进程和线程的定义、并发的实现方式,以及如何在C++中创建和管理线程,包括使用`std::thread`库、线程的join和detach方法,并通过示例代码展示了如何创建和使用多线程。
43 1
C++ 多线程之初识多线程
|
25天前
|
Java 开发者
在Java多线程编程中,创建线程的方法有两种:继承Thread类和实现Runnable接口
【10月更文挑战第20天】在Java多线程编程中,创建线程的方法有两种:继承Thread类和实现Runnable接口。本文揭示了这两种方式的微妙差异和潜在陷阱,帮助你更好地理解和选择适合项目需求的线程创建方式。
19 3
|
25天前
|
Java 开发者
在Java多线程编程中,选择合适的线程创建方法至关重要
【10月更文挑战第20天】在Java多线程编程中,选择合适的线程创建方法至关重要。本文通过案例分析,探讨了继承Thread类和实现Runnable接口两种方法的优缺点及适用场景,帮助开发者做出明智的选择。
16 2