问题引出
由于实现消费者-生产者模型,每一次实现都比较麻烦,比如sychronized的同步处理,或者通过锁实现。这些实现起来都比较繁琐,为了简单就能实现这种模型,JUC提供了阻塞队列接口:BlockingQueue(单端阻塞队列)和BlockingDeque(双端阻塞队列)
一.单端阻塞队列(BlockingQueue)
原理:通过使用FIFO模式处理的集合结构
什么是FIFO?
FIFO(First-In, First-Out)是一种常见的处理数据的方式,也被称为先进先出模式。在FIFO模式中,首先进入队列的数据首先被处理,而最后进入队列的数据最后被处理。
可以将FIFO模式理解为排队等候的情景,比如在超市的收银台,顾客按照先后顺序排队结账。当一个顾客结完账离开后,下一个顾客才能开始结账。这就是FIFO模式的处理顺序。
在计算机科学中,FIFO模式通常用于数据缓冲区、队列和调度算法等场景。例如,在操作系统中,进程调度算法可以使用FIFO模式,根据进程到达的先后顺序来决定执行顺序;在网络通信中,消息队列可以使用FIFO模式确保消息按照发送的先后顺序被接收和处理。
单端阻塞队列BlockQueue的常用方法:
方法 | 描述 |
put(item) |
将指定的项放入队列中,如果队列已满则阻塞,直到有空间可用 |
take() |
从队列中获取并移除一个项,如果队列为空则阻塞,直到有项可取 |
offer(item) |
尝试将指定的项放入队列中,如果队列已满则立即返回false,否则返回true |
poll(timeout) |
从队列中获取并移除一个项,在指定的超时时间内如果队列为空则返回null |
peek() |
返回队列中的第一个项,但不对队列进行修改,如果队列为空则返回null |
size() |
返回队列中当前的项数 |
isEmpty() |
检查队列是否为空 |
isFull() |
检查队列是否已满 |
clear() |
清空队列,移除所有的项 |
单端阻塞队列接口BlockingQueue提供多个子类ArrayBlockingQueue(数组结构)、LinkedBlockingQueue(链表单端阻塞队列)、PriorityBlockingQueue(优先级阻塞队列)、SynchronousQueue(同步队列)
ArrayBlockingQueue
案例代码:
上述代码修改后如下
package Example2129; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.TimeUnit; public class javaDemo { public static void main(String[] args){ // 创建对象和资源量 BlockingQueue<String> queue = new ArrayBlockingQueue<String>(2); // 创建一个包含各种美食的String数组 String[] foods = {"披萨","汉堡", "寿司", "墨西哥炸玉米卷", "牛排", "意大利面", "烤鸭", "富士山寿司", "印度咖喱", "巴西烤肉",}; int id; // 两个厨师 for (int i=0;i<2;i++){ id = i; new Thread(()->{ for (int j=0;j<10;j++){ try { // 模拟做菜时间 TimeUnit.SECONDS.sleep(3); System.out.println(Thread.currentThread().getName()+"已经做完菜肴"+foods[j]+"并端上座子"); queue.put(foods[j]); }catch (Exception e){ e.printStackTrace(); } } },"厨师"+id).start(); } for (int i=0;i<10;i++){ id = i; new Thread(()->{ for (int j=0;j<2;j++){ try { // 模拟客人吃饭的时间 TimeUnit.SECONDS.sleep(1); System.out.println(Thread.currentThread().getName()+"享用完"+queue.take()+"这道菜"); }catch (Exception e){ e.printStackTrace(); } } },"客人"+i).start(); } } }
编辑
注意:阻塞队列虽然解决了数据存满则线程等待的情况,但是并没有解决线程并发的问题
LinkedBlockingQueue
案例代码:
package Example2130; import java.util.Random; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; public class javaDemo { public static void main(String[] args) { // 设置容量 BlockingQueue<String> queue = new LinkedBlockingQueue<>(2); Random random = new Random(); new Thread(()->{ while (true){ try { if (queue.size()==2){ System.out.println("队列已满"); TimeUnit.SECONDS.sleep(1); }else { TimeUnit.SECONDS.sleep(random.nextInt(3)); System.out.println("存入数据"); queue.put("存入数据"); } } catch (InterruptedException e) { throw new RuntimeException(e); } } }).start(); new Thread(()->{ while (true){ try { TimeUnit.SECONDS.sleep(random.nextInt(3)); if (queue.isEmpty()){ System.out.println("队列空了啊"); TimeUnit.SECONDS.sleep(1); }else { System.out.println("取出数据"); queue.take(); } } catch (InterruptedException e) { throw new RuntimeException(e); } } }).start(); } }
编辑
PriorityBlockingQueue
package Example2131; import java.util.Random; import java.util.concurrent.BlockingQueue; import java.util.concurrent.PriorityBlockingQueue; public class javaDemo { public static void main(String[] args) { BlockingQueue<Integer> queue = new PriorityBlockingQueue<>(); Random random = new Random(); new Thread(()->{ try { for (int i=0;i<5;i++){ queue.put(random.nextInt(10)); } } catch (InterruptedException e) { throw new RuntimeException(e); } }).start(); new Thread(()->{ try { for (int i=0;i<5;i++){ System.out.println("取出数据"+queue.take()); } } catch (InterruptedException e) { throw new RuntimeException(e); } }).start(); } }
编辑
PriorityBlockingQueue
的特点是:
- 元素按照优先级进行排序。在示例中,较小的数字具有较高的优先级。
- 插入和移除操作的时间复杂度为O(logN),其中N为队列中的元素个数。
SynchronousQueue
package Example2132; import java.util.concurrent.BlockingQueue; import java.util.concurrent.SynchronousQueue; import java.util.concurrent.TimeUnit; public class javaDemo { public static void main(String[] args){ // 创建对象和资源量 BlockingQueue<String> queue = new SynchronousQueue<>(); // 创建一个包含各种美食的String数组 String[] foods = {"披萨","汉堡", "寿司", "墨西哥炸玉米卷", "牛排", "意大利面", "烤鸭", "富士山寿司", "印度咖喱", "巴西烤肉",}; int id; // 两个厨师 new Thread(()->{ for (int j=0;j<10;j++){ try { // 模拟做菜时间 TimeUnit.SECONDS.sleep(3); System.out.println(Thread.currentThread().getName()+"已经做完菜肴"+foods[j]+"并端上座子"); queue.put(foods[j]); }catch (Exception e){ e.printStackTrace(); } } },"厨师").start(); for (int i=0;i<10;i++){ id = i; new Thread(()->{ try { // 模拟客人吃饭的时间 TimeUnit.SECONDS.sleep(1); System.out.println(Thread.currentThread().getName()+"享用完"+queue.take()+"这道菜"); }catch (Exception e){ e.printStackTrace(); } },"客人"+i).start(); } } }
SynchronousQueue
的特点是:
- 队列没有容量,每次插入操作必须等待对应的删除操作,反之亦然。
- 插入和删除操作是成对的,即一个元素的插入必须等待其被消费取出。
实现子类之间的区别:
ArrayBlockingQueue
(数组结构阻塞队列):
- 基于数组实现的有界队列,具有固定容量。
- 具有公平(FIFO)和非公平(默认)两种策略的可选择性。
- 内部使用单个锁来实现线程安全。
- 插入和移除元素的时间复杂度为O(1)。
LinkedBlockingQueue
(链表单端阻塞队列):
- 基于链表实现的可选有界或无界队列。
- 默认情况下是无界的,但可以指定最大容量来创建有界队列。
- 内部使用两个锁来实现线程安全,一个用于插入操作,一个用于移除操作。
- 插入和移除元素的时间复杂度为O(1)。
PriorityBlockingQueue
(优先级阻塞队列):
- 基于堆实现的无界优先级队列。
- 元素按照优先级进行排序,优先级通过元素的自然顺序或者自定义比较器进行确定。
- 内部不允许存储
null
元素。 - 插入和移除元素的时间复杂度为O(logN),其中N为队列中的元素个数。
SynchronousQueue
(同步队列):
- 一个没有缓冲区的阻塞队列,用于线程之间直接传输元素。
- 每个插入操作必须等待相应的移除操作,反之亦然。
- 队列本身不存储元素,仅用于线程之间的数据传递。
- 插入和移除操作通常具有较高的可伸缩性性能。
二.双端阻塞队列(BlockingDeque)
BlockingDeque ,可以实现FIFO与FILO操作
什么是FILO?
- FILO(First-In, Last-Out)是一种数据处理方式,也被称为后进先出模式。在FILO模式中,最后进入的数据会首先被处理,而最先进入的数据会最后被处理。
- 可以将FILO模式理解为堆叠物品的情景,比如在一个书架上放置书籍。当我们将一本新书放在书架上时,它会被放在已有书籍的顶部,因此最后放置的书会处于最上方。当我们需要取出一本书时,会优先从顶部取出最后放置的那本书。这符合FILO模式的处理顺序。
- 在计算机科学中,FILO模式常用于栈(Stack)数据结构的操作。栈是一种具有特定数据插入和删除规则的数据结构,最后插入的数据会成为栈顶,最先插入的数据会成为栈底。当需要访问或移除数据时,我们通常会先操作栈顶的数据。
- 总之,FILO模式即后进先出模式,用于保持数据处理顺序的一种方式。类似于堆叠物品或栈数据结构,最后进入的数据会首先被处理,而最先进入的数据会最后被处理。
BlockingDeque 的常用方法:
方法 | 描述 |
addFirst(item) |
将指定的项添加到双端队列的开头,如果队列已满则抛出异常 |
addLast(item) |
将指定的项添加到双端队列的末尾,如果队列已满则抛出异常 |
offerFirst(item) |
尝试将指定的项添加到双端队列的开头,如果队列已满则立即返回false,否则返回true |
offerLast(item) |
尝试将指定的项添加到双端队列的末尾,如果队列已满则立即返回false,否则返回true |
putFirst(item) |
将指定的项放入双端队列的开头,如果队列已满则阻塞,直到有空间可用 |
putLast(item) |
将指定的项放入双端队列的末尾,如果队列已满则阻塞,直到有空间可用 |
pollFirst(timeout) |
从双端队列的开头获取并移除一个项,在指定的超时时间内如果队列为空则返回null |
pollLast(timeout) |
从双端队列的末尾获取并移除一个项,在指定的超时时间内如果队列为空则返回null |
takeFirst() |
从双端队列的开头获取并移除一个项,如果队列为空则阻塞,直到有项可取 |
takeLast() |
从双端队列的末尾获取并移除一个项,如果队列为空则阻塞,直到有项可取 |
getFirst() |
返回双端队列的开头项,但不对队列进行修改,如果队列为空则抛出异常 |
getLast() |
返回双端队列的末尾项,但不对队列进行修改,如果队列为空则抛出异常 |
peekFirst() |
返回双端队列的开头项,但不对队列进行修改,如果队列为空则返回null |
peekLast() |
返回双端队列的末尾项,但不对队列进行修改,如果队列为空则返回null |
size() |
返回双端队列中当前的项数 |
isEmpty() |
检查双端队列是否为空 |
clear() |
清空双端队列,移除所有的项 |
双端阻塞队列只有一个实现的子类LinkedBlockingDeque
案例代码:
package Example2133; import java.util.concurrent.BlockingDeque; import java.util.concurrent.LinkedBlockingDeque; import java.util.concurrent.TimeUnit; public class javaDemo { public static void main(String[] args) { BlockingDeque<Integer> deque = new LinkedBlockingDeque<>(); new Thread(()->{ for (int i=0;i<10;i++){ try { deque.putFirst(i); } catch (InterruptedException e) { throw new RuntimeException(e); } } }).start(); new Thread(()->{ while (true){ try { TimeUnit.SECONDS.sleep(1); System.out.println(deque.takeLast()); } catch (InterruptedException e) { throw new RuntimeException(e); } if (deque.isEmpty()){ System.out.println("队列空了啦"); break; } } }).start(); } }
编辑
可以看到双端情况下可以将数据放在头或者尾,获取也可以获取头和尾
三.延迟队列(DelayQueue)
在JUC中提供自动弹出数据延迟的队列DelayQueue,该类属于BlockingQueue的实现子类。如果是创建类对象插入到延迟队列中的话,类需要继承Delayed,并且覆写 compareTo()和getDelay()方法
常用方法:
方法名 | 描述 |
enqueue(item, delay) |
将指定的 item 入队,并在 delay 毫秒后执行。 |
dequeue() |
出队并返回最早的延迟任务。 |
getDelay(item) |
返回指定 item 的剩余延迟时间(以毫秒为单位),如果 item 已经过期则返回负数。 |
remove(item) |
从队列中移除指定的 item 。 |
size() |
返回队列中延迟任务的数量。 |
isEmpty() |
判断队列是否为空。 |
clear() |
清空队列,移除所有的延迟任务。 |
getExpiredItems(now) |
返回所有已过期的任务,并从队列中移除它们。 |
getNextExpiringItem() |
返回下一个即将过期的任务,但不从队列中移除它。 |
案例代码:
package Example2134; import org.jetbrains.annotations.NotNull; import java.util.concurrent.BlockingQueue; import java.util.concurrent.DelayQueue; import java.util.concurrent.Delayed; import java.util.concurrent.TimeUnit; class Student implements Delayed { private String name; // 设置停留时间 private long delay; // 设置离开时间 private long expire; Student(String name, long delay , TimeUnit unit){ this.name=name; this.delay = TimeUnit.MILLISECONDS.convert(delay,unit); this.expire = System.currentTimeMillis()+this.delay; } @Override public String toString() { return this.name+"同学已经到达预计停留的时间"+TimeUnit.SECONDS.convert(this.delay,TimeUnit.MILLISECONDS)+"秒,已经离开了"; } // 延迟时间计算 @Override public long getDelay(@NotNull TimeUnit unit) { return unit.convert(this.expire - System.currentTimeMillis(),TimeUnit.MILLISECONDS); } // 队列弹出计算 @Override public int compareTo(@NotNull Delayed o) { return (int) (this.delay-this.getDelay(TimeUnit.MILLISECONDS)); } } public class javaDemo { public static void main(String[] args) throws InterruptedException { BlockingQueue<Student> students = new DelayQueue<Student>(); students.put(new Student("黄小龙",3,TimeUnit.SECONDS)); students.put(new Student("张三",1,TimeUnit.SECONDS)); students.put(new Student("李四",5,TimeUnit.SECONDS)); while (!students.isEmpty()){ Student stu = students.take(); System.out.println(stu); TimeUnit.SECONDS.sleep(1); } } }
编辑