Java并发新构件之PriorityBlockingQueue

简介:

An unbounded blocking queue that uses the same ordering rules as class PriorityQueue and supplies blocking retrieval operations. While this queue is logically unbounded, attempted additions may fail due to resource exhaustion (causing OutOfMemoryError). 

    PriorityBlockingQueue是一个很基础的优先级队列,它在PriorityQueue的基础上提供了可阻塞的读取操作。它是无限制的,就是说向Queue里面增加元素可能会失败(导致OurOfMemoryError)。下面是一个示例,其中在优先级队列中的对象是按照优先级顺序依次出队列的:

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
import  java.util.ArrayList;
import  java.util.List;
import  java.util.Queue;
import  java.util.Random;
import  java.util.concurrent.ExecutorService;
import  java.util.concurrent.Executors;
import  java.util.concurrent.PriorityBlockingQueue;
import  java.util.concurrent.TimeUnit;
 
class  PrioritizedTask  implements  Runnable, Comparable<PrioritizedTask> {
     private  static  int  counter =  1 ;
     private  final  int  priority;
     private  Random random =  new  Random( 47 );
     private  final  int  id = counter++; //这个id不是static的,因此
     protected  static  List<PrioritizedTask> sequence =  new  ArrayList<>();
     public  PrioritizedTask( int  priority) {
         this .priority = priority;
         sequence.add( this );
     }
     @Override
     public  int  compareTo(PrioritizedTask o) {
         int  val =  this .priority - o.priority;
         //higher value, higher priority
         return  val <  0  1  : (val >  0  ? - 1  0 );
     }
     @Override
     public  void  run() {
         try  {
             TimeUnit.MILLISECONDS.sleep(random.nextInt( 250 ));
         catch  (InterruptedException e) {
         }
         System.out.println( this );
     }
     @Override
     public  String toString() {
         return  String.format( "P=[%1$-3d]" , priority) +  ", ID="  + id;
     }
     public  static  class  EndFlagTask  extends  PrioritizedTask {
         private  ExecutorService exec;
         public  EndFlagTask(ExecutorService executorService) {
             super (- 1 ); //最低的优先级
             exec = executorService;
         }
         @Override
         public  void  run() {
             System.out.println( this  " calling shutdownNow()" );
             exec.shutdownNow();
         }
     }
}
 
class  PrioritizedTaskProducer  implements  Runnable {
     private  Queue<Runnable> queue;
     private  ExecutorService exec;
     public  PrioritizedTaskProducer(Queue<Runnable> queue, ExecutorService exec) {
         this .queue = queue;
         this .exec = exec;
     }
     @Override
     public  void  run() {
         try  {
             //慢慢的添加高优先级的任务
             for  ( int  i =  0 ; i <  6 ; i++) {
                 TimeUnit.MILLISECONDS.sleep( 250 );
                 queue.add( new  PrioritizedTask( 9 ));  //6个优先级10
             }
             //先创建2个P=0的任务
             queue.add( new  PrioritizedTask( 0 ));
             queue.add( new  PrioritizedTask( 0 ));
             //添加低优先级的任务
             for  ( int  i =  0 ; i <  6 ; i++) { // 优先级0-5
                 queue.add( new  PrioritizedTask(i));
             }
             //添加一个结束标志的任务
             queue.add( new  PrioritizedTask.EndFlagTask(exec));
             
         catch  (InterruptedException e) {
             // TODO: handle exception
         }
         System.out.println( "Finished PrioritizedTaskProducer." );
     }
}
 
class  PrioritizedTaskConsumer  implements  Runnable {
     private  PriorityBlockingQueue<Runnable> queue;
     public  PrioritizedTaskConsumer(PriorityBlockingQueue<Runnable> queue) {
         this .queue = queue;
     }
     @Override
     public  void  run() {
         try  {
             //不停的从queue里面取任务,直到exec停止。
             while (!Thread.interrupted()) {
                 //使用当前线程来跑这些任务
                 queue.take().run();
             }
         catch  (InterruptedException e) {
             
         }
         System.out.println( "Finished PrioritizedTaskConsumer." );
     }
}
 
public  final  class  PriorityBlockingQueueDemo {
     public  static  void  main(String[] args) {
         ExecutorService exec = Executors.newCachedThreadPool();
         PriorityBlockingQueue<Runnable> queue =  new  PriorityBlockingQueue<>();
         exec.execute( new  PrioritizedTaskProducer(queue, exec));
         exec.execute( new  PrioritizedTaskConsumer(queue));
     }
}

执行结果:

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
P=[ 9   ], ID= 1
P=[ 9   ], ID= 2
P=[ 9   ], ID= 3
P=[ 9   ], ID= 4
P=[ 9   ], ID= 5
Finished PrioritizedTaskProducer.
P=[ 9   ], ID= 6
P=[ 5   ], ID= 14
P=[ 4   ], ID= 13
P=[ 3   ], ID= 12
P=[ 2   ], ID= 11
P=[ 1   ], ID= 10
P=[ 0   ], ID= 7
P=[ 0   ], ID= 9
P=[ 0   ], ID= 8
P=[- 1  ], ID= 15  calling shutdownNow()
Finished PrioritizedTaskConsumer.

    PrioritizedTask对象的创建序列被记录在sequenceList中,用于和实际的顺序比较。run()方法将休眠一小段随机的时间,然后打印对象信息,而EndFlagTask提供了停止ExecutorService的功能,要确保它是队列中的最后一个对象,因此给它设置了最低的优先级(-1,优先级值越大,优先级越高)。

    PrioritizedTaskProducer和PrioritizedTaskConsumer通过PriorityBlockingQueue彼此链接。因为这种队列的阻塞特性提供了所有必须的同步,所以你应该注意到了,这里不需要任何显式的同步——不必考虑当你从这种队列中读取时,其中是否有元素,因为这个队列在没有元素时,将直接阻塞读取者。

    从执行结果中可以看到,最先出队列的是Priority为9的6个Task,因为这几个任务先创建。

?
1
Finished PrioritizedTaskProducer.

    这句话的打印表示生产者已经将所有的任务放到队列中了,由于将任务放到Queue中和从Queue中提取任务并执行时两个不同的任务(即Producer和Consumer),因此Producer先输出“Finished PrioritizedTaskProducer.”。输出这句话的时候,前面只有5个P=9的任务出列了,因此队列中还有1个P=9的任务没出列,同时还有后续放入各种任务。由于Queue中的任务里面,优先级P最高的是P=9的,因此第6个P=9的任务先出队列。剩下的任务按照P的大小依次出列。

    任务的ID属性表示了它们的创建顺序,因为ID是自增的,每创建一个任务,ID就增加。因此从

?
1
P=[ 5   ], ID= 14

    可以很明显的看出:P=5的任务,它的ID最大,所以是最后创建的。从我们的代码中也可以看出来,P=5的任务的确是最后创建的。

    还有一点可以看出,当P相同的时候,出Queue的顺序是不确定的,例如:

?
1
2
3
P=[ 0   ], ID= 7
P=[ 0   ], ID= 9
P=[ 0   ], ID= 8

    另外,在使用此类的时候需要注意:

This class does not permit null elements. A priority queue relying on natural ordering also does not permit insertion of non-comparable objects (doing so results in ClassCastException). 

目录
相关文章
|
1月前
|
存储 安全 Java
Java多线程编程中的并发容器:深入解析与实战应用####
在本文中,我们将探讨Java多线程编程中的一个核心话题——并发容器。不同于传统单一线程环境下的数据结构,并发容器专为多线程场景设计,确保数据访问的线程安全性和高效性。我们将从基础概念出发,逐步深入到`java.util.concurrent`包下的核心并发容器实现,如`ConcurrentHashMap`、`CopyOnWriteArrayList`以及`BlockingQueue`等,通过实例代码演示其使用方法,并分析它们背后的设计原理与适用场景。无论你是Java并发编程的初学者还是希望深化理解的开发者,本文都将为你提供有价值的见解与实践指导。 --- ####
|
1月前
|
存储 设计模式 分布式计算
Java中的多线程编程:并发与并行的深度解析####
在当今软件开发领域,多线程编程已成为提升应用性能、响应速度及资源利用率的关键手段之一。本文将深入探讨Java平台上的多线程机制,从基础概念到高级应用,全面解析并发与并行编程的核心理念、实现方式及其在实际项目中的应用策略。不同于常规摘要的简洁概述,本文旨在通过详尽的技术剖析,为读者构建一个系统化的多线程知识框架,辅以生动实例,让抽象概念具体化,复杂问题简单化。 ####
|
1月前
|
Java 数据库连接 数据库
如何构建高效稳定的Java数据库连接池,涵盖连接池配置、并发控制和异常处理等方面
本文介绍了如何构建高效稳定的Java数据库连接池,涵盖连接池配置、并发控制和异常处理等方面。通过合理配置初始连接数、最大连接数和空闲连接超时时间,确保系统性能和稳定性。文章还探讨了同步阻塞、异步回调和信号量等并发控制策略,并提供了异常处理的最佳实践。最后,给出了一个简单的连接池示例代码,并推荐使用成熟的连接池框架(如HikariCP、C3P0)以简化开发。
56 2
|
2月前
|
Java
【编程进阶知识】揭秘Java多线程:并发与顺序编程的奥秘
本文介绍了Java多线程编程的基础,通过对比顺序执行和并发执行的方式,展示了如何使用`run`方法和`start`方法来控制线程的执行模式。文章通过具体示例详细解析了两者的异同及应用场景,帮助读者更好地理解和运用多线程技术。
36 1
|
3月前
|
Java API 容器
JAVA并发编程系列(10)Condition条件队列-并发协作者
本文通过一线大厂面试真题,模拟消费者-生产者的场景,通过简洁的代码演示,帮助读者快速理解并复用。文章还详细解释了Condition与Object.wait()、notify()的区别,并探讨了Condition的核心原理及其实现机制。
|
4月前
|
存储 Java
Java 中 ConcurrentHashMap 的并发级别
【8月更文挑战第22天】
66 5
|
4月前
|
缓存 Java 调度
【Java 并发秘籍】线程池大作战:揭秘 JDK 中的线程池家族!
【8月更文挑战第24天】Java的并发库提供多种线程池以应对不同的多线程编程需求。本文通过实例介绍了四种主要线程池:固定大小线程池、可缓存线程池、单一线程线程池及定时任务线程池。固定大小线程池通过预设线程数管理任务队列;可缓存线程池能根据需要动态调整线程数量;单一线程线程池确保任务顺序执行;定时任务线程池支持周期性或延时任务调度。了解并正确选用这些线程池有助于提高程序效率和资源利用率。
63 2
|
1天前
|
Java
Java—多线程实现生产消费者
本文介绍了多线程实现生产消费者模式的三个版本。Version1包含四个类:`Producer`(生产者)、`Consumer`(消费者)、`Resource`(公共资源)和`TestMain`(测试类)。通过`synchronized`和`wait/notify`机制控制线程同步,但存在多个生产者或消费者时可能出现多次生产和消费的问题。 Version2将`if`改为`while`,解决了多次生产和消费的问题,但仍可能因`notify()`随机唤醒线程而导致死锁。因此,引入了`notifyAll()`来唤醒所有等待线程,但这会带来性能问题。
Java—多线程实现生产消费者
|
3天前
|
安全 Java Kotlin
Java多线程——synchronized、volatile 保障可见性
Java多线程中,`synchronized` 和 `volatile` 关键字用于保障可见性。`synchronized` 保证原子性、可见性和有序性,通过锁机制确保线程安全;`volatile` 仅保证可见性和有序性,不保证原子性。代码示例展示了如何使用 `synchronized` 和 `volatile` 解决主线程无法感知子线程修改共享变量的问题。总结:`volatile` 确保不同线程对共享变量操作的可见性,使一个线程修改后,其他线程能立即看到最新值。
|
3天前
|
消息中间件 缓存 安全
Java多线程是什么
Java多线程简介:本文介绍了Java中常见的线程池类型,包括`newCachedThreadPool`(适用于短期异步任务)、`newFixedThreadPool`(适用于固定数量的长期任务)、`newScheduledThreadPool`(支持定时和周期性任务)以及`newSingleThreadExecutor`(保证任务顺序执行)。同时,文章还讲解了Java中的锁机制,如`synchronized`关键字、CAS操作及其实现方式,并详细描述了可重入锁`ReentrantLock`和读写锁`ReadWriteLock`的工作原理与应用场景。