Java并发新构件之PriorityBlockingQueue

简介:

An unbounded blocking queue that uses the same ordering rules as class PriorityQueue and supplies blocking retrieval operations. While this queue is logically unbounded, attempted additions may fail due to resource exhaustion (causing OutOfMemoryError). 

    PriorityBlockingQueue是一个很基础的优先级队列,它在PriorityQueue的基础上提供了可阻塞的读取操作。它是无限制的,就是说向Queue里面增加元素可能会失败(导致OurOfMemoryError)。下面是一个示例,其中在优先级队列中的对象是按照优先级顺序依次出队列的:

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
import  java.util.ArrayList;
import  java.util.List;
import  java.util.Queue;
import  java.util.Random;
import  java.util.concurrent.ExecutorService;
import  java.util.concurrent.Executors;
import  java.util.concurrent.PriorityBlockingQueue;
import  java.util.concurrent.TimeUnit;
 
class  PrioritizedTask  implements  Runnable, Comparable<PrioritizedTask> {
     private  static  int  counter =  1 ;
     private  final  int  priority;
     private  Random random =  new  Random( 47 );
     private  final  int  id = counter++; //这个id不是static的,因此
     protected  static  List<PrioritizedTask> sequence =  new  ArrayList<>();
     public  PrioritizedTask( int  priority) {
         this .priority = priority;
         sequence.add( this );
     }
     @Override
     public  int  compareTo(PrioritizedTask o) {
         int  val =  this .priority - o.priority;
         //higher value, higher priority
         return  val <  0  1  : (val >  0  ? - 1  0 );
     }
     @Override
     public  void  run() {
         try  {
             TimeUnit.MILLISECONDS.sleep(random.nextInt( 250 ));
         catch  (InterruptedException e) {
         }
         System.out.println( this );
     }
     @Override
     public  String toString() {
         return  String.format( "P=[%1$-3d]" , priority) +  ", ID="  + id;
     }
     public  static  class  EndFlagTask  extends  PrioritizedTask {
         private  ExecutorService exec;
         public  EndFlagTask(ExecutorService executorService) {
             super (- 1 ); //最低的优先级
             exec = executorService;
         }
         @Override
         public  void  run() {
             System.out.println( this  " calling shutdownNow()" );
             exec.shutdownNow();
         }
     }
}
 
class  PrioritizedTaskProducer  implements  Runnable {
     private  Queue<Runnable> queue;
     private  ExecutorService exec;
     public  PrioritizedTaskProducer(Queue<Runnable> queue, ExecutorService exec) {
         this .queue = queue;
         this .exec = exec;
     }
     @Override
     public  void  run() {
         try  {
             //慢慢的添加高优先级的任务
             for  ( int  i =  0 ; i <  6 ; i++) {
                 TimeUnit.MILLISECONDS.sleep( 250 );
                 queue.add( new  PrioritizedTask( 9 ));  //6个优先级10
             }
             //先创建2个P=0的任务
             queue.add( new  PrioritizedTask( 0 ));
             queue.add( new  PrioritizedTask( 0 ));
             //添加低优先级的任务
             for  ( int  i =  0 ; i <  6 ; i++) { // 优先级0-5
                 queue.add( new  PrioritizedTask(i));
             }
             //添加一个结束标志的任务
             queue.add( new  PrioritizedTask.EndFlagTask(exec));
             
         catch  (InterruptedException e) {
             // TODO: handle exception
         }
         System.out.println( "Finished PrioritizedTaskProducer." );
     }
}
 
class  PrioritizedTaskConsumer  implements  Runnable {
     private  PriorityBlockingQueue<Runnable> queue;
     public  PrioritizedTaskConsumer(PriorityBlockingQueue<Runnable> queue) {
         this .queue = queue;
     }
     @Override
     public  void  run() {
         try  {
             //不停的从queue里面取任务,直到exec停止。
             while (!Thread.interrupted()) {
                 //使用当前线程来跑这些任务
                 queue.take().run();
             }
         catch  (InterruptedException e) {
             
         }
         System.out.println( "Finished PrioritizedTaskConsumer." );
     }
}
 
public  final  class  PriorityBlockingQueueDemo {
     public  static  void  main(String[] args) {
         ExecutorService exec = Executors.newCachedThreadPool();
         PriorityBlockingQueue<Runnable> queue =  new  PriorityBlockingQueue<>();
         exec.execute( new  PrioritizedTaskProducer(queue, exec));
         exec.execute( new  PrioritizedTaskConsumer(queue));
     }
}

执行结果:

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
P=[ 9   ], ID= 1
P=[ 9   ], ID= 2
P=[ 9   ], ID= 3
P=[ 9   ], ID= 4
P=[ 9   ], ID= 5
Finished PrioritizedTaskProducer.
P=[ 9   ], ID= 6
P=[ 5   ], ID= 14
P=[ 4   ], ID= 13
P=[ 3   ], ID= 12
P=[ 2   ], ID= 11
P=[ 1   ], ID= 10
P=[ 0   ], ID= 7
P=[ 0   ], ID= 9
P=[ 0   ], ID= 8
P=[- 1  ], ID= 15  calling shutdownNow()
Finished PrioritizedTaskConsumer.

    PrioritizedTask对象的创建序列被记录在sequenceList中,用于和实际的顺序比较。run()方法将休眠一小段随机的时间,然后打印对象信息,而EndFlagTask提供了停止ExecutorService的功能,要确保它是队列中的最后一个对象,因此给它设置了最低的优先级(-1,优先级值越大,优先级越高)。

    PrioritizedTaskProducer和PrioritizedTaskConsumer通过PriorityBlockingQueue彼此链接。因为这种队列的阻塞特性提供了所有必须的同步,所以你应该注意到了,这里不需要任何显式的同步——不必考虑当你从这种队列中读取时,其中是否有元素,因为这个队列在没有元素时,将直接阻塞读取者。

    从执行结果中可以看到,最先出队列的是Priority为9的6个Task,因为这几个任务先创建。

?
1
Finished PrioritizedTaskProducer.

    这句话的打印表示生产者已经将所有的任务放到队列中了,由于将任务放到Queue中和从Queue中提取任务并执行时两个不同的任务(即Producer和Consumer),因此Producer先输出“Finished PrioritizedTaskProducer.”。输出这句话的时候,前面只有5个P=9的任务出列了,因此队列中还有1个P=9的任务没出列,同时还有后续放入各种任务。由于Queue中的任务里面,优先级P最高的是P=9的,因此第6个P=9的任务先出队列。剩下的任务按照P的大小依次出列。

    任务的ID属性表示了它们的创建顺序,因为ID是自增的,每创建一个任务,ID就增加。因此从

?
1
P=[ 5   ], ID= 14

    可以很明显的看出:P=5的任务,它的ID最大,所以是最后创建的。从我们的代码中也可以看出来,P=5的任务的确是最后创建的。

    还有一点可以看出,当P相同的时候,出Queue的顺序是不确定的,例如:

?
1
2
3
P=[ 0   ], ID= 7
P=[ 0   ], ID= 9
P=[ 0   ], ID= 8

    另外,在使用此类的时候需要注意:

This class does not permit null elements. A priority queue relying on natural ordering also does not permit insertion of non-comparable objects (doing so results in ClassCastException). 

目录
相关文章
|
1天前
|
Java API 调度
[Java并发基础]多进程编程
[Java并发基础]多进程编程
|
6天前
|
安全 Java
深入理解 Java 多线程和并发工具类
【4月更文挑战第19天】本文探讨了Java多线程和并发工具类在实现高性能应用程序中的关键作用。通过继承`Thread`或实现`Runnable`创建线程,利用`Executors`管理线程池,以及使用`Semaphore`、`CountDownLatch`和`CyclicBarrier`进行线程同步。保证线程安全、实现线程协作和性能调优(如设置线程池大小、避免不必要同步)是重要环节。理解并恰当运用这些工具能提升程序效率和可靠性。
|
8天前
|
Java 开发者
Java中多线程并发控制的实现与优化
【4月更文挑战第17天】 在现代软件开发中,多线程编程已成为提升应用性能和响应能力的关键手段。特别是在Java语言中,由于其平台无关性和强大的运行时环境,多线程技术的应用尤为广泛。本文将深入探讨Java多线程的并发控制机制,包括基本的同步方法、死锁问题以及高级并发工具如java.util.concurrent包的使用。通过分析多线程环境下的竞态条件、资源争夺和线程协调问题,我们提出了一系列实现和优化策略,旨在帮助开发者构建更加健壮、高效的多线程应用。
7 0
|
9天前
|
存储 缓存 安全
Java并发基础之互斥同步、非阻塞同步、指令重排与volatile
在Java中,多线程编程常常涉及到共享数据的访问,这时候就需要考虑线程安全问题。Java提供了多种机制来实现线程安全,其中包括互斥同步(Mutex Synchronization)、非阻塞同步(Non-blocking Synchronization)、以及volatile关键字等。 互斥同步(Mutex Synchronization) 互斥同步是一种基本的同步手段,它要求在任何时刻,只有一个线程可以执行某个方法或某个代码块,其他线程必须等待。Java中的synchronized关键字就是实现互斥同步的常用手段。当一个线程进入一个synchronized方法或代码块时,它需要先获得锁,如果
24 0
|
17天前
|
存储 缓存 安全
【企业级理解】高效并发之Java内存模型
【企业级理解】高效并发之Java内存模型
|
24天前
|
安全 Java
Java中的多线程并发控制
在Java中,多线程是实现并发执行任务的一种重要方式。然而,随着多个线程同时访问共享资源,可能会导致数据不一致和其他并发问题。因此,了解并掌握Java中的多线程并发控制机制显得尤为重要。本文将深入探讨Java的多线程并发控制,包括synchronized关键字、Lock接口、Semaphore类以及CountDownLatch类等,并通过实例代码演示其使用方法和注意事项。
12 2
|
25天前
|
存储 安全 Java
Java线程池ThreadPoolExcutor源码解读详解04-阻塞队列之PriorityBlockingQueue原理及扩容机制详解
1. **继承实现图关系**: - `PriorityBlockingQueue`实现了`BlockingQueue`接口,提供了线程安全的队列操作。 - 内部基于优先级堆(小顶堆或大顶堆)的数据结构实现,可以保证元素按照优先级顺序出队。 2. **底层数据存储结构**: - 默认容量是11,存储数据的数组会在需要时动态扩容。 - 数组长度总是2的幂,以满足堆的性质。 3. **构造器**: - 无参构造器创建一个默认容量的队列,元素需要实现`Comparable`接口。 - 指定容量构造器允许设置初始容量,但不指定排序规则。 - 可指定容量和比较
42 2
|
29天前
|
缓存 NoSQL Java
Java项目:支持并发的秒杀项目(基于Redis)
Java项目:支持并发的秒杀项目(基于Redis)
26 0
|
1天前
|
缓存 Java
【Java基础】简说多线程(上)
【Java基础】简说多线程(上)
5 0
|
1天前
|
并行计算 算法 安全
Java从入门到精通:2.1.3深入学习Java核心技术——掌握Java多线程编程
Java从入门到精通:2.1.3深入学习Java核心技术——掌握Java多线程编程