Java并发编程学习7-阻塞队列

简介: 本篇介绍阻塞队列相关的内容(Queue、BlockingQueue、Deque 和 BlockingDeque)

java-concurrency-logo.png

引言

介绍阻塞队列之前,先来介绍下队列 QueueQueue 用来临时保存一组等待处理的元素。它提供了几种非阻塞队列实现,如下:

  • ConcurrentLinkedQueue,这是一个传统的先进先出队列。
  • PriorityQueue,这是一个(非并发的)优先队列。

如上两个队列的操作不会阻塞,如果队列为空,那么获取元素的操作将返回空值。

阻塞队列 BlockingQueue 扩展了 Queue,增加了可阻塞的 puttake 方法,以及支持定时的 offerpoll 方法。如果队列已经满了,那么 put 方法将阻塞直到有空间可用;如果队列为空,那么 take 方法将会阻塞直到有元素可用。队列可以是有界的也可以是无界的,无界队列永远都不会充满,因此无界队列上的 put 方法也永远不会阻塞。

阻塞队列支持 生产者--消费者 这种设计模式。当数据生成时,生产者把数据放入队列,而当消费者准备处理数据时,将从队列中获取数据。一种最常见的 生产者--消费者 设计模式就是线程池与工作队列的组合,在 Executor 任务执行框架中就体现了这种模式,这也是后面的博文中将要介绍的内容。

Java 类库中包含了 BlockingQueue 的多种实现,如下:

  • LinkedBlockingQueueArrayBlockingQueueFIFO 队列,二者分别与 LinkedListArrayList 类似,但比同步 List 拥有更好的并发性能。
  • PriorityBlockingQueue 是一个按优先级排序的队列,它既可以根据元素的自然顺序来比较元素(前提是这些元素实现了Comparable方法),也可以使用 Comparator 来比较。
  • SynchronousQueue ,实际上不能算一个队列,因为它不会为队列中元素维护存储空间。与其他队列不同的是,它维护一组线程,这些线程在等待着把元素加入或移出队列。因为 SynchronousQueue 没有存储功能,因此 puttake 会一直阻塞,直到有另一个线程已经准备好参与到交付过程中。仅当有足够多的消费者,并且总是有一个消费者准备好获取交付的工作时,才适合使用同步队列。

主要内容

1. “桌面搜索” 示例

如下 FileCrawler 中给出了一个生产者任务,即在某个文件层次结构中搜索符合索引标准的文件,并将它们的名称放入工作队列。

    public class FileCrawler implements Runnable {
        private final BlockingQueue<File> fileQueue;
        
        private final FileFilter fileFilter;
        
        private final File root;
        
        public FileCrawler(BlockingQueue<File> fileQueue, FileFilter fileFilter, File root) {
            this.fileQueue = fileQueue;
            this.fileFilter = fileFilter;
            this.root = root;
        }
        
        public void run() {
            try {
                crawl(root);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }    
        }
        
        private void crawl(File root) throws InterruptedException {
            File[] entries = root.listFiles(fileFilter);
            if (entries != null) {
                for (File entry : entries) 
                    if (entry.isDirectory()) 
                        crawl(entry);
                    else if (!FileRecord.alreadyIndexed(entry)) 
                        fileQueue.put(entry);
            }
        }
    }

如下 Indexer 中给出了一个消费者任务,即从队列中取出文件名称并对它们建立索引,它会一直运行下去。

public class Indexer implements Runnable {

    private static final FleaLogger LOGGER = FleaLoggerProxy.getProxyInstance(Indexer.class);

    private final BlockingQueue<File> queue;

    public Indexer(BlockingQueue<File> queue) {
        this.queue = queue;
    }

    public void run() {
        try {
            while(true) {
                File file = queue.take();
                FileRecord.indexFile(file);
                if (LOGGER.isDebugEnabled()) {
                    LOGGER.debug(file.getAbsolutePath());
                }
            }
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }
}

生产者--消费者 模式提供了一种适合线程的方法将桌面搜索问题分解为更简单的组件。将文件遍历与建立索引等功能分解为独立的操作,每个操作只需完成一个任务,并且阻塞队列将负责所有的控制流程,因此每个功能的代码都更加简单和清晰。

下面我们再看一个测试代码示例,用于启动桌面搜索。

public class FileCrawlerTest {

    private static final int BOUND = 1000;

    private static final int N_CONSUMERS = 5;

    public static void main(String[] args) throws Exception {
        File file = new File("E:\\fleaworkspace");
        File file1 = new File("E:\\Software\\Maven\\Repository");
        File[] roots = {file, file1};
        startIndexing(roots);
    }

    private static void startIndexing(File[] roots) {
        BlockingQueue<File> queue = new LinkedBlockingQueue<>(BOUND);
        FileFilter fileFilter = new FileFilter() {
            public boolean accept(File file) {
                return true;
            }
        };

        for (File root : roots)
            new Thread(new FileCrawler(queue, fileFilter, root)).start();

        for (int i = 0; i < N_CONSUMERS; i++)
            new Thread(new Indexer(queue)).start();
    }
}

这里启动了多个文件搜索程序和索引简历程序,每个程序都在各自的线程中运行。前面讲到,消费者线程永远不会退出,因而程序无法终止,在后续的博文将介绍多种技术来解决这个问题。

2. 串行线程封闭

java.util.concurrent 中实现的各种阻塞队列都包含了足够的内部同步机制,从而安全地将对象从生产者线程发布到消费者线程。

对于可变对象,生产者--消费者 这种设计与阻塞队列一起,促进了串行线程封闭,从而将对象所有权从生产者交付给消费者。线程封闭对象只能由单个线程拥有,但可以通过安全地发布该对象来 “转移” 所有权。在转移所有权后,也只有另一个线程能获得这个对象的的访问权限,并且发布对象的线程不会再访问它。这种安全的发布确保了对象状态对于新的所有者来说是可见的,并且由于最初的所有者不会再访问它,因此对象被封闭在新的线程中。新的所有者线程可以对该对象做任意修改,因为它具有独占的访问权。

对象池利用了串行线程封闭,将对象“借给”一个请求线程。只要对象池包含足够的内部同步来安全地发布池中的对象,并且只要客户代码本身不会发布池中的对象,或者在将对象返回给对象池后就不再使用它,那么就可以安全地在线程之间传递所有权。

3. 双端队列与工作密取

Java 6 增加两种容器类型,DequeBlockingDeque,他们分别对 QueueBlockingQueue 进行了扩展。

Deque 是一个双端队列,实现了在队列头和队列尾的高效插入和移除,具体实现包括:

  • ArrayDeque
  • LinkedBlockingDeque

正如阻塞队列适用于 生产者--消费者 模式,双端队列同样适用另一种相关模式,即 工作密取(Work Stealing)。

在生产者--消费者模式中,所有消费者有一个共享的工作队列,而在工作密取设计中,每个消费者都有各自的双端队列。如果一个消费者完成了自己双端队列中的全部工作,那么它可以从其他消费者双端队列末尾秘密地获取工作。工作密取模式比传统的生产者--消费这模式具有更高的可伸缩性。在大多数时候,它们都只是访问自己的双端队列,从而极大地减少了竞争。当工作线程需要访问另一个队列时,它会从队列的尾部而不是从头部获取工作,因此进一步降低了队列上的竞争程度。

工作密取非常适用于既是消费者也是生产者问题---当执行某个工作时可能导致出现更多的工作。例如网页爬虫处理页面、搜索图的算法、在垃圾回收阶段对堆进行标记等。当一个工作线程找到新的任务单元时,它会将其放到自己队列的末尾(或者放入其他工作线程的队列中)。当双端队列为空时,它会在另一个线程的队列队尾查找新的任务,从而确保每个线程都保持忙碌状态。

4. 阻塞方法与中断方法

线程可能会阻塞或暂停执行,原因有多种:等待I/O操作结束,等待获得一个锁,等待从 Thread.sleep 方法中醒来,或是等待另一个线程的计算结果。当线程阻塞时,它通常被挂起,并处于某种阻塞状态(BLOCKEDWAITINGTIME_WAITING)。

BlockingQueueputtake 等方法会抛出受检查异常 InterruptedException,这与类库中其他一些方法的做法相同,例如 Thread.sleep,当某方法抛出 InterruptedException 时,表示该方法是一个阻塞方法,如果这个方法被中断,那么它将努力提前结束阻塞状态。Thread 提供了 interrupt 方法,用于中断线程或者查询线程是否已经被中断。每个线程都有一个布尔类型的属性,表示线程的中断状态,当中断线程时将设置这个状态。

中断是一种协作机制。当线程 A 中断 B 时,A 仅仅是要求 B 在执行到某个可以暂停的地方停止正在执行的操作(当然前提是如果线程 B 愿意停止下来)。最常使用中断的情况就是取消某个操作,如果程序对中断请求的响应度越高,就越容易及时取消那些执行时间很长的操作。

当在代码中调用了一个将抛出 InterruptedException 的方法时,自身方法也就变成了一个阻塞方法,并且必须要处理对中断的响应。

这里有两种常见的方法:

  • 传递 InterruptedException,只需要把 InterruptedException 传递给方法的调用者,要么根本不捕获异常,或者捕获该异常,然后在执行某种简单的清理工作后再次抛出这个异常。
  • 恢复中断,当代码是 Runnable 的一部分时,在这种情况下必须捕获 InterruptedException,并通过调用当前线程上的 interrupt 方法恢复中断状态,这样在调用栈中更高层的代码将看到引发了一个中断。

下面看下恢复中断状态的示例:

    public class TaskRunnable implements Runnable {
        BlockingQueue<Task> queue;
        
        // ...
        
        public void run() {
            try {
                processTask(queue.take());
            } catch (InterruptedException e) {
                // 恢复被中断的状态
                Thread.currentThread().interrupt();
            }
        }
    }

总结

当然还可以采用一些更复杂的中断处理方法,但上述两种方法已经可以应对大多数情况了。关于取消和中断等操作,这里只是简单提及,笔者将会在后续的博文中进一步介绍,敬请期待!!!

目录
相关文章
|
29天前
|
安全 Java 程序员
深入理解Java内存模型与并发编程####
本文旨在探讨Java内存模型(JMM)的复杂性及其对并发编程的影响,不同于传统的摘要形式,本文将以一个实际案例为引子,逐步揭示JMM的核心概念,包括原子性、可见性、有序性,以及这些特性在多线程环境下的具体表现。通过对比分析不同并发工具类的应用,如synchronized、volatile关键字、Lock接口及其实现等,本文将展示如何在实践中有效利用JMM来设计高效且安全的并发程序。最后,还将简要介绍Java 8及更高版本中引入的新特性,如StampedLock,以及它们如何进一步优化多线程编程模型。 ####
31 0
|
1月前
|
Java 程序员
Java编程中的异常处理:从基础到高级
在Java的世界中,异常处理是代码健壮性的守护神。本文将带你从异常的基本概念出发,逐步深入到高级用法,探索如何优雅地处理程序中的错误和异常情况。通过实际案例,我们将一起学习如何编写更可靠、更易于维护的Java代码。准备好了吗?让我们一起踏上这段旅程,解锁Java异常处理的秘密!
|
11天前
|
存储 缓存 Java
Java 并发编程——volatile 关键字解析
本文介绍了Java线程中的`volatile`关键字及其与`synchronized`锁的区别。`volatile`保证了变量的可见性和一定的有序性,但不能保证原子性。它通过内存屏障实现,避免指令重排序,确保线程间数据一致。相比`synchronized`,`volatile`性能更优,适用于简单状态标记和某些特定场景,如单例模式中的双重检查锁定。文中还解释了Java内存模型的基本概念,包括主内存、工作内存及并发编程中的原子性、可见性和有序性。
Java 并发编程——volatile 关键字解析
|
15天前
|
算法 Java 调度
java并发编程中Monitor里的waitSet和EntryList都是做什么的
在Java并发编程中,Monitor内部包含两个重要队列:等待集(Wait Set)和入口列表(Entry List)。Wait Set用于线程的条件等待和协作,线程调用`wait()`后进入此集合,通过`notify()`或`notifyAll()`唤醒。Entry List则管理锁的竞争,未能获取锁的线程在此排队,等待锁释放后重新竞争。理解两者区别有助于设计高效的多线程程序。 - **Wait Set**:线程调用`wait()`后进入,等待条件满足被唤醒,需重新竞争锁。 - **Entry List**:多个线程竞争锁时,未获锁的线程在此排队,等待锁释放后获取锁继续执行。
49 12
|
11天前
|
存储 安全 Java
Java多线程编程秘籍:各种方案一网打尽,不要错过!
Java 中实现多线程的方式主要有四种:继承 Thread 类、实现 Runnable 接口、实现 Callable 接口和使用线程池。每种方式各有优缺点,适用于不同的场景。继承 Thread 类最简单,实现 Runnable 接口更灵活,Callable 接口支持返回结果,线程池则便于管理和复用线程。实际应用中可根据需求选择合适的方式。此外,还介绍了多线程相关的常见面试问题及答案,涵盖线程概念、线程安全、线程池等知识点。
93 2
|
28天前
|
安全 算法 Java
Java多线程编程中的陷阱与最佳实践####
本文探讨了Java多线程编程中常见的陷阱,并介绍了如何通过最佳实践来避免这些问题。我们将从基础概念入手,逐步深入到具体的代码示例,帮助开发者更好地理解和应用多线程技术。无论是初学者还是有经验的开发者,都能从中获得有价值的见解和建议。 ####
|
28天前
|
Java 调度
Java中的多线程编程与并发控制
本文深入探讨了Java编程语言中多线程编程的基础知识和并发控制机制。文章首先介绍了多线程的基本概念,包括线程的定义、生命周期以及在Java中创建和管理线程的方法。接着,详细讲解了Java提供的同步机制,如synchronized关键字、wait()和notify()方法等,以及如何通过这些机制实现线程间的协调与通信。最后,本文还讨论了一些常见的并发问题,例如死锁、竞态条件等,并提供了相应的解决策略。
50 3
|
2月前
|
缓存 Java 开发者
Java多线程并发编程:同步机制与实践应用
本文深入探讨Java多线程中的同步机制,分析了多线程并发带来的数据不一致等问题,详细介绍了`synchronized`关键字、`ReentrantLock`显式锁及`ReentrantReadWriteLock`读写锁的应用,结合代码示例展示了如何有效解决竞态条件,提升程序性能与稳定性。
158 6
|
1月前
|
开发框架 安全 Java
Java 反射机制:动态编程的强大利器
Java反射机制允许程序在运行时检查类、接口、字段和方法的信息,并能操作对象。它提供了一种动态编程的方式,使得代码更加灵活,能够适应未知的或变化的需求,是开发框架和库的重要工具。
47 3
|
2月前
|
安全 Java 开发者
Java中的多线程编程:从基础到实践
本文深入探讨了Java多线程编程的核心概念和实践技巧,旨在帮助读者理解多线程的工作原理,掌握线程的创建、管理和同步机制。通过具体示例和最佳实践,本文展示了如何在Java应用中有效地利用多线程技术,提高程序性能和响应速度。
69 1