在 Java 中,阻塞队列(Blocking Queue)是一种特殊的队列实现,它具有阻塞操作的特性,能够在队列为空或满时使线程阻塞。阻塞队列在多线程编程中广泛应用,用于线程之间的安全通信和协调。Java 提供了一些标准的阻塞队列实现:
1. ArrayBlockingQueue:
ArrayBlockingQueue
是一个基于数组结构的有界阻塞队列。它的大小在创建时就指定,并且不可动态改变。在队列满时,尝试插入元素的线程将被阻塞;在队列空时,尝试取出元素的线程将被阻塞。
importjava.util.concurrent.ArrayBlockingQueue; importjava.util.concurrent.BlockingQueue; publicclassArrayBlockingQueueExample { publicstaticvoidmain(String[] args) { BlockingQueue<String>blockingQueue=newArrayBlockingQueue<>(5); // 生产者线程newThread(() -> { try { blockingQueue.put("Element 1"); blockingQueue.put("Element 2"); // ... } catch (InterruptedExceptione) { e.printStackTrace(); } }).start(); // 消费者线程newThread(() -> { try { Stringelement=blockingQueue.take(); // 处理元素 } catch (InterruptedExceptione) { e.printStackTrace(); } }).start(); } }
2. LinkedBlockingQueue:
LinkedBlockingQueue
是一个基于链表结构的有界或无界阻塞队列。如果创建时指定了容量,则为有界队列;如果未指定容量,则为无界队列。在队列满时,尝试插入元素的线程将被阻塞;在队列空时,尝试取出元素的线程将被阻塞。
importjava.util.concurrent.BlockingQueue; importjava.util.concurrent.LinkedBlockingQueue; publicclassLinkedBlockingQueueExample { publicstaticvoidmain(String[] args) { BlockingQueue<String>blockingQueue=newLinkedBlockingQueue<>(5); // 生产者线程newThread(() -> { try { blockingQueue.put("Element 1"); blockingQueue.put("Element 2"); // ... } catch (InterruptedExceptione) { e.printStackTrace(); } }).start(); // 消费者线程newThread(() -> { try { Stringelement=blockingQueue.take(); // 处理元素 } catch (InterruptedExceptione) { e.printStackTrace(); } }).start(); } }
3. PriorityBlockingQueue:
PriorityBlockingQueue
是一个支持优先级的无界阻塞队列。元素插入队列时按照优先级进行排序,具有最高优先级的元素将被优先取出。不同于 ArrayBlockingQueue
和 LinkedBlockingQueue
,PriorityBlockingQueue
不要求队列元素实现 Comparable
接口,也可以通过构造函数传入 Comparator
来指定排序规则。
importjava.util.concurrent.BlockingQueue; importjava.util.concurrent.PriorityBlockingQueue; publicclassPriorityBlockingQueueExample { publicstaticvoidmain(String[] args) { BlockingQueue<String>blockingQueue=newPriorityBlockingQueue<>(); // 生产者线程newThread(() -> { blockingQueue.put("Element 1"); blockingQueue.put("Element 2"); // ... }).start(); // 消费者线程newThread(() -> { try { Stringelement=blockingQueue.take(); // 处理元素 } catch (InterruptedExceptione) { e.printStackTrace(); } }).start(); } }
4. DelayQueue:
DelayQueue
是一个支持延迟元素的无界阻塞队列。元素只有在其指定的延迟时间过后才能被消费。该队列内的元素必须实现 Delayed
接口。
importjava.util.concurrent.BlockingQueue; importjava.util.concurrent.DelayQueue; importjava.util.concurrent.Delayed; importjava.util.concurrent.TimeUnit; publicclassDelayQueueExample { staticclassDelayedElementimplementsDelayed { privateStringdata; privatelongdelayTime; DelayedElement(Stringdata, longdelayTime) { this.data=data; this.delayTime=System.currentTimeMillis() +delayTime; } publiclonggetDelay(TimeUnitunit) { returnunit.convert(delayTime-System.currentTimeMillis(), TimeUnit.MILLISECONDS); } publicintcompareTo(Delayedo) { returnLong.compare(this.delayTime, ((DelayedElement) o).delayTime); } } publicstaticvoidmain(String[] args) { BlockingQueue<DelayedElement>blockingQueue=newDelayQueue<>(); // 生产者线程newThread(() -> { blockingQueue.put(newDelayedElement("Element 1", 5000)); blockingQueue.put(newDelayedElement("Element 2", 10000)); // ... }).start(); // 消费者线程newThread(() -> { try { DelayedElementelement=blockingQueue.take(); // 处理元素 } catch (InterruptedExceptione) { e.printStackTrace(); } }).start(); } }
5. LinkedTransferQueue:
LinkedTransferQueue
是一个无界阻塞队列,它结合了 LinkedBlockingQueue
和 SynchronousQueue
的特点。它支持普通的 FIFO 队列操作,同时还可以作为一个异步传输队列。
importjava.util.concurrent.BlockingQueue; importjava.util.concurrent.LinkedTransferQueue; publicclassLinkedTransferQueueExample { publicstaticvoidmain(String[] args) { BlockingQueue<String>blockingQueue=newLinkedTransferQueue<>(); // 生产者线程newThread(() -> { blockingQueue.offer("Element 1"); blockingQueue.offer("Element 2"); // ... }).start(); // 消费者线程newThread(() -> { try { Stringelement=blockingQueue.take(); // 处理元素 } catch (InterruptedExceptione) { e.printStackTrace(); } }).start(); } }