一文弄懂java中的Queue家族

简介: 一文弄懂java中的Queue家族

目录



java中Queue家族简介


简介


java中Collection集合有三大家族List,Set和Queue。当然Map也算是一种集合类,但Map并不继承Collection接口。


List,Set在我们的工作中会经常使用,通常用来存储结果数据,而Queue由于它的特殊性,通常用在生产者消费者模式中。


现在很火的消息中间件比如:Rabbit MQ等都是Queue这种数据结构的展开。


今天这篇文章将带大家进入Queue家族。


Queue接口


先看下Queue的继承关系和其中定义的方法:


image.png

image.png


Queue继承自Collection,Collection继承自Iterable。


Queue有三类主要的方法,我们用个表格来看一下他们的区别:


方法类型 方法名称 方法名称 区别
Insert add offer 两个方法都表示向Queue中添加某个元素,不同之处在于添加失败的情况,add只会返回true,如果添加失败,会抛出异常。offer在添加失败的时候会返回false。所以对那些有固定长度的Queue,优先使用offer方法。
Remove remove poll 如果Queue是空的情况下,remove会抛出异常,而poll会返回null。
Examine element peek 获取Queue头部的元素,但不从Queue中删除。两者的区别还是在于Queue为空的情况下,element会抛出异常,而peek返回null。


注意,因为对poll和peek来说null是有特殊含义的,所以一般来说Queue中禁止插入null,但是在实现中还是有一些类允许插入null比如LinkedList。


尽管如此,我们在使用中还是要避免插入null元素。


Queue的分类


一般来说Queue可以分为BlockingQueue,Deque和TransferQueue三种。


BlockingQueue


BlockingQueue是Queue的一种实现,它提供了两种额外的功能:


  1. 当当前Queue是空的时候,从BlockingQueue中获取元素的操作会被阻塞。
  2. 当当前Queue达到最大容量的时候,插入BlockingQueue的操作会被阻塞。


BlockingQueue的操作可以分为下面四类:


操作类型 Throws exception Special value Blocks Times out
Insert add(e) offer(e) put(e) offer(e, time, unit)
Remove remove() poll() take() poll(time, unit)
Examine element() peek() not applicable not applicable


第一类是会抛出异常的操作,当遇到插入失败,队列为空的时候抛出异常。


第二类是不会抛出异常的操作。


第三类是会Block的操作。当Queue为空或者达到最大容量的时候。


第四类是time out的操作,在给定的时间里会Block,超时会直接返回。


BlockingQueue是线程安全的Queue,可以在生产者消费者模式的多线程中使用,如下所示:


class Producer implements Runnable {
   private final BlockingQueue queue;
   Producer(BlockingQueue q) { queue = q; }
   public void run() {
     try {
       while (true) { queue.put(produce()); }
     } catch (InterruptedException ex) { ... handle ...}
   }
   Object produce() { ... }
 }
 class Consumer implements Runnable {
   private final BlockingQueue queue;
   Consumer(BlockingQueue q) { queue = q; }
   public void run() {
     try {
       while (true) { consume(queue.take()); }
     } catch (InterruptedException ex) { ... handle ...}
   }
   void consume(Object x) { ... }
 }
 class Setup {
   void main() {
     BlockingQueue q = new SomeQueueImplementation();
     Producer p = new Producer(q);
     Consumer c1 = new Consumer(q);
     Consumer c2 = new Consumer(q);
     new Thread(p).start();
     new Thread(c1).start();
     new Thread(c2).start();
   }
 }


最后,在一个线程中向BlockQueue中插入元素之前的操作happens-before另外一个线程中从BlockQueue中删除或者获取的操作。


Deque


Deque是Queue的子类,它代表double ended queue,也就是说可以从Queue的头部或者尾部插入和删除元素。


同样的,我们也可以将Deque的方法用下面的表格来表示,Deque的方法可以分为对头部的操作和对尾部的操作:


方法类型 Throws exception Special value Throws exception Special value
Insert addFirst(e) offerFirst(e) addLast(e) offerLast(e)
Remove removeFirst() pollFirst() removeLast() pollLast()
Examine getFirst() peekFirst() getLast() peekLast()


和Queue的方法描述基本一致,这里就不多讲了。


当Deque以 FIFO (First-In-First-Out)的方法处理元素的时候,Deque就相当于一个Queue。


当Deque以LIFO (Last-In-First-Out)的方式处理元素的时候,Deque就相当于一个Stack。


TransferQueue


TransferQueue继承自BlockingQueue,为什么叫Transfer呢?因为TransferQueue提供了一个transfer的方法,生产者可以调用这个transfer方法,从而等待消费者调用take或者poll方法从Queue中拿取数据。


还提供了非阻塞和timeout版本的tryTransfer方法以供使用。


我们举个TransferQueue实现的生产者消费者的问题。


先定义一个生产者:


@Slf4j
@Data
@AllArgsConstructor
class Producer implements Runnable {
    private TransferQueue<String> transferQueue;
    private String name;
    private Integer messageCount;
    public static final AtomicInteger messageProduced = new AtomicInteger();
    @Override
    public void run() {
        for (int i = 0; i < messageCount; i++) {
            try {
                boolean added = transferQueue.tryTransfer( "第"+i+"个", 2000, TimeUnit.MILLISECONDS);
                log.info("transfered {} 是否成功: {}","第"+i+"个",added);
                if(added){
                    messageProduced.incrementAndGet();
                }
            } catch (InterruptedException e) {
                log.error(e.getMessage(),e);
            }
        }
        log.info("total transfered {}",messageProduced.get());
    }
}


在生产者的run方法中,我们调用了tryTransfer方法,等待2秒钟,如果没成功则直接返回。


再定义一个消费者:


@Slf4j
@Data
@AllArgsConstructor
public class Consumer implements Runnable {
    private TransferQueue<String> transferQueue;
    private String name;
    private int messageCount;
    public static final AtomicInteger messageConsumed = new AtomicInteger();
    @Override
    public void run() {
        for (int i = 0; i < messageCount; i++) {
            try {
                String element = transferQueue.take();
                log.info("take {}",element );
                messageConsumed.incrementAndGet();
                Thread.sleep(500);
            } catch (InterruptedException e) {
                log.error(e.getMessage(),e);
            }
        }
        log.info("total consumed {}",messageConsumed.get());
    }
}


在run方法中,调用了transferQueue.take方法来取消息。


下面先看一下一个生产者,零个消费者的情况:


@Test
    public void testOneProduceZeroConsumer() throws InterruptedException {
        TransferQueue<String> transferQueue = new LinkedTransferQueue<>();
        ExecutorService exService = Executors.newFixedThreadPool(10);
        Producer producer = new Producer(transferQueue, "ProducerOne", 5);
        exService.execute(producer);
        exService.awaitTermination(50000, TimeUnit.MILLISECONDS);
        exService.shutdown();
    }


输出结果:


[pool-1-thread-1] INFO com.flydean.Producer - transfered 第0个 是否成功: false
[pool-1-thread-1] INFO com.flydean.Producer - transfered 第1个 是否成功: false
[pool-1-thread-1] INFO com.flydean.Producer - transfered 第2个 是否成功: false
[pool-1-thread-1] INFO com.flydean.Producer - transfered 第3个 是否成功: false
[pool-1-thread-1] INFO com.flydean.Producer - transfered 第4个 是否成功: false
[pool-1-thread-1] INFO com.flydean.Producer - total transfered 0


可以看到,因为没有消费者,所以消息并没有发送成功。


再看下一个有消费者的情况:


@Test
    public void testOneProduceOneConsumer() throws InterruptedException {
        TransferQueue<String> transferQueue = new LinkedTransferQueue<>();
        ExecutorService exService = Executors.newFixedThreadPool(10);
        Producer producer = new Producer(transferQueue, "ProducerOne", 2);
        Consumer consumer = new Consumer(transferQueue, "ConsumerOne", 2);
        exService.execute(producer);
        exService.execute(consumer);
        exService.awaitTermination(50000, TimeUnit.MILLISECONDS);
        exService.shutdown();
    }


输出结果:


[pool-1-thread-2] INFO com.flydean.Consumer - take 第0个
[pool-1-thread-1] INFO com.flydean.Producer - transfered 第0个 是否成功: true
[pool-1-thread-2] INFO com.flydean.Consumer - take 第1个
[pool-1-thread-1] INFO com.flydean.Producer - transfered 第1个 是否成功: true
[pool-1-thread-1] INFO com.flydean.Producer - total transfered 2
[pool-1-thread-2] INFO com.flydean.Consumer - total consumed 2


可以看到Producer和Consumer是一个一个来生产和消费的。


总结


本文介绍了Queue接口和它的三大分类,这三大分类又有非常多的实现类,我们将会在后面的文章中再详细介绍。

相关文章
|
6月前
|
前端开发 Java
java中的Queue队列的用法
java中的Queue队列的用法
|
24天前
|
存储 缓存 安全
在Java的Map家族中,HashMap和TreeMap各具特色
【10月更文挑战第19天】在Java的Map家族中,HashMap和TreeMap各具特色。HashMap基于哈希表实现,提供O(1)时间复杂度的高效操作,适合性能要求高的场景;TreeMap基于红黑树,提供O(log n)时间复杂度的有序操作,适合需要排序和范围查询的场景。两者在不同需求下各有优势,选择时需根据具体应用场景权衡。
28 2
|
1月前
|
算法 Java API
【用Java学习数据结构系列】对象的比较(Priority Queue实现的前提)
【用Java学习数据结构系列】对象的比较(Priority Queue实现的前提)
26 1
|
3月前
|
缓存 Java 调度
【Java 并发秘籍】线程池大作战:揭秘 JDK 中的线程池家族!
【8月更文挑战第24天】Java的并发库提供多种线程池以应对不同的多线程编程需求。本文通过实例介绍了四种主要线程池:固定大小线程池、可缓存线程池、单一线程线程池及定时任务线程池。固定大小线程池通过预设线程数管理任务队列;可缓存线程池能根据需要动态调整线程数量;单一线程线程池确保任务顺序执行;定时任务线程池支持周期性或延时任务调度。了解并正确选用这些线程池有助于提高程序效率和资源利用率。
52 2
|
6月前
|
安全 Java
【JAVA】在 Queue 中 poll()和 remove()有什么区别
【JAVA】在 Queue 中 poll()和 remove()有什么区别
|
3月前
|
存储 Java 索引
|
5月前
|
监控 安全 Java
java中并发Queue种类与各自API特点
java中并发Queue种类与各自API特点
39 0
|
5月前
|
存储 安全 Java
Java Queue实战:LinkedList是如何帮我轻松解决排队问题的?
【6月更文挑战第18天】在Java编程中,`LinkedList`常用于解决排队问题,如在多线程应用处理任务队列。`TaskQueue`类展示了如何使用`LinkedList`作为线程安全的`Queue&lt;Runnable&gt;`:添加任务到队列(`addTask`)和执行并移除队列首任务(`executeTask`)均通过同步方法保证并发安全性。这样确保了任务按顺序执行,提升了程序效率和稳定性。
71 8
|
5月前
|
Java 开发者
Queue大比拼:为何LinkedList能在众多Java集合中脱颖而出?
【6月更文挑战第18天】**Java的LinkedList作为队列的优势在于其双向链表实现,支持O(1)时间复杂度的首尾操作,适合作为Queue接口的实现。它也是线程不安全的,但在单线程环境下性能优越,并可通过Collections同步化。此外,它的灵活性使其也能胜任栈和双端队列的角色。**
41 5
|
5月前
|
安全 Java
Java Queue新玩法:用LinkedList打造高效队列,让你的代码飞起来!
【6月更文挑战第18天】Java集合框架中的`LinkedList`不仅是列表,还可作为高效队列。由于其在链表两端进行添加/移除操作的时间复杂度为O(1),故适合实现并发环境下的任务队列。通过案例展示了如何创建、添加任务及确保线程安全,揭示了`LinkedList`提升代码性能的秘密,特别是在多线程应用中的价值。
50 4