基于队列的线程池

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
import  java.text.SimpleDateFormat;
import  java.util.Date;
import  java.util.concurrent.ArrayBlockingQueue;
import  java.util.concurrent.ThreadPoolExecutor;
import  java.util.concurrent.TimeUnit;
 
public  class  TestThreadPool {
     //    public static BlockingQueue<Runnable> queue = new ArrayBlockingQueue<Runnable>(10);
     public  static  ArrayBlockingQueue<Runnable> queue =  new  ArrayBlockingQueue<Runnable>( 10 );
 
     public  static  void  main(String[] args) {
         for  ( int  i =  0 ; i <  3 ; i++) {
             queue.add( new  TestThread( "初始化" ));
         }
 
         final  ThreadPoolExecutor executor =  new  ThreadPoolExecutor( 2 3 15 , TimeUnit.SECONDS, queue);
         System.out.println( "getActiveCount="  + executor.getActiveCount()
                         ";getKeepAliveTime="  + executor.getKeepAliveTime(TimeUnit.SECONDS)
                         ";getCompletedTaskCount="  + executor.getCompletedTaskCount()
                         ";getCorePoolSize="  + executor.getCorePoolSize()
                         ";getLargestPoolSize="  + executor.getLargestPoolSize()
                         ";getMaximumPoolSize="  + executor.getMaximumPoolSize()
                         ";getPoolSize="  + executor.getPoolSize()
                         ";getTaskCount="  + executor.getTaskCount()
                         ";getQueue().size()="  + executor.getQueue().size()
         );
         executor.execute(queue.poll());
         System.out.println( "getActiveCount="  + executor.getActiveCount()
                         ";getKeepAliveTime="  + executor.getKeepAliveTime(TimeUnit.SECONDS)
                         ";getCompletedTaskCount="  + executor.getCompletedTaskCount()
                         ";getCorePoolSize="  + executor.getCorePoolSize()
                         ";getLargestPoolSize="  + executor.getLargestPoolSize()
                         ";getMaximumPoolSize="  + executor.getMaximumPoolSize()
                         ";getPoolSize="  + executor.getPoolSize()
                         ";getTaskCount="  + executor.getTaskCount()
                         ";getQueue().size()="  + executor.getQueue().size()
         );
 
         new  Thread( new  Runnable() {
             @Override
             public  void  run() {
                 while  ( true ) {
                     System.out.println( "getActiveCount="  + executor.getActiveCount()
                                     ";getKeepAliveTime="  + executor.getKeepAliveTime(TimeUnit.SECONDS)
                                     ";getCompletedTaskCount="  + executor.getCompletedTaskCount()
                                     ";getCorePoolSize="  + executor.getCorePoolSize()
                                     ";getLargestPoolSize="  + executor.getLargestPoolSize()
                                     ";getMaximumPoolSize="  + executor.getMaximumPoolSize()
                                     ";getPoolSize="  + executor.getPoolSize()
                                     ";getTaskCount="  + executor.getTaskCount()
                                     ";getQueue().size()="  + executor.getQueue().size()
                     );
                     try  {
                         Thread.currentThread().sleep(1000L);
                     catch  (InterruptedException e) {
                         e.printStackTrace();
                     }
                 }
             }
         }).start();
 
         new  Thread( new  Runnable() {
             @Override
             public  void  run() {
                 int  i =  0 ;
                 while  ( true ) {
                     queue.add( new  TestThread( "生产者" ));
                     try  {
                         Thread.currentThread().sleep(500L);
                     catch  (InterruptedException e) {
                         e.printStackTrace();
                     }
                     i++;
                     if  (i >  10 break ;
                 }
             }
         }).start();
     }
}
 
class  TestThread  implements  Runnable {
     public  static  SimpleDateFormat sdf =  new  SimpleDateFormat( "yyyy-MM-dd HH:mm:ss" );
     private  String name;         //创建者
     private  Date addDate;        //添加到队列的日期
 
     TestThread(String name) {
         this .name = name;
         this .addDate =  new  Date();
     }
 
     @Override
     public  void  run() {
         System.out.println(Thread.currentThread().getName() +
                 ":创建者="  + name +  ",创建时间="  + sdf.format(addDate) +  ",执行时间="  + sdf.format( new  Date()) +  ",当前队列大小="  + TestThreadPool.queue.size());
         try  {
             Thread.currentThread().sleep(1000L);
         catch  (InterruptedException e) {
             e.printStackTrace();
         }
     }
}


执行结果:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
getActiveCount=0;getKeepAliveTime=15;getCompletedTaskCount=0;getCorePoolSize=2;getLargestPoolSize=0;getMaximumPoolSize=3;getPoolSize=0;getTaskCount=3;getQueue().size()=3
getActiveCount=1;getKeepAliveTime=15;getCompletedTaskCount=0;getCorePoolSize=2;getLargestPoolSize=1;getMaximumPoolSize=3;getPoolSize=1;getTaskCount=3;getQueue().size()=2
pool-1-thread-1:创建者=初始化,创建时间=2014-08-20 12:08:19,执行时间=2014-08-20 12:08:19,当前队列大小=2
getActiveCount=1;getKeepAliveTime=15;getCompletedTaskCount=0;getCorePoolSize=2;getLargestPoolSize=1;getMaximumPoolSize=3;getPoolSize=1;getTaskCount=3;getQueue().size()=2
getActiveCount=0;getKeepAliveTime=15;getCompletedTaskCount=1;getCorePoolSize=2;getLargestPoolSize=1;getMaximumPoolSize=3;getPoolSize=1;getTaskCount=6;getQueue().size()=4
pool-1-thread-1:创建者=初始化,创建时间=2014-08-20 12:08:19,执行时间=2014-08-20 12:08:20,当前队列大小=4
getActiveCount=0;getKeepAliveTime=15;getCompletedTaskCount=2;getCorePoolSize=2;getLargestPoolSize=1;getMaximumPoolSize=3;getPoolSize=1;getTaskCount=8;getQueue().size()=5
pool-1-thread-1:创建者=初始化,创建时间=2014-08-20 12:08:19,执行时间=2014-08-20 12:08:21,当前队列大小=5
getActiveCount=0;getKeepAliveTime=15;getCompletedTaskCount=3;getCorePoolSize=2;getLargestPoolSize=1;getMaximumPoolSize=3;getPoolSize=1;getTaskCount=10;getQueue().size()=6
pool-1-thread-1:创建者=生产者,创建时间=2014-08-20 12:08:19,执行时间=2014-08-20 12:08:22,当前队列大小=6
getActiveCount=0;getKeepAliveTime=15;getCompletedTaskCount=4;getCorePoolSize=2;getLargestPoolSize=1;getMaximumPoolSize=3;getPoolSize=1;getTaskCount=12;getQueue().size()=7
pool-1-thread-1:创建者=生产者,创建时间=2014-08-20 12:08:19,执行时间=2014-08-20 12:08:23,当前队列大小=7
getActiveCount=1;getKeepAliveTime=15;getCompletedTaskCount=5;getCorePoolSize=2;getLargestPoolSize=1;getMaximumPoolSize=3;getPoolSize=1;getTaskCount=14;getQueue().size()=8
pool-1-thread-1:创建者=生产者,创建时间=2014-08-20 12:08:20,执行时间=2014-08-20 12:08:24,当前队列大小=8
getActiveCount=0;getKeepAliveTime=15;getCompletedTaskCount=6;getCorePoolSize=2;getLargestPoolSize=1;getMaximumPoolSize=3;getPoolSize=1;getTaskCount=14;getQueue().size()=7
pool-1-thread-1:创建者=生产者,创建时间=2014-08-20 12:08:20,执行时间=2014-08-20 12:08:25,当前队列大小=7
getActiveCount=0;getKeepAliveTime=15;getCompletedTaskCount=7;getCorePoolSize=2;getLargestPoolSize=1;getMaximumPoolSize=3;getPoolSize=1;getTaskCount=14;getQueue().size()=6
pool-1-thread-1:创建者=生产者,创建时间=2014-08-20 12:08:21,执行时间=2014-08-20 12:08:26,当前队列大小=6
getActiveCount=0;getKeepAliveTime=15;getCompletedTaskCount=8;getCorePoolSize=2;getLargestPoolSize=1;getMaximumPoolSize=3;getPoolSize=1;getTaskCount=14;getQueue().size()=5
pool-1-thread-1:创建者=生产者,创建时间=2014-08-20 12:08:21,执行时间=2014-08-20 12:08:27,当前队列大小=5
getActiveCount=0;getKeepAliveTime=15;getCompletedTaskCount=9;getCorePoolSize=2;getLargestPoolSize=1;getMaximumPoolSize=3;getPoolSize=1;getTaskCount=14;getQueue().size()=4
pool-1-thread-1:创建者=生产者,创建时间=2014-08-20 12:08:22,执行时间=2014-08-20 12:08:28,当前队列大小=4
getActiveCount=0;getKeepAliveTime=15;getCompletedTaskCount=10;getCorePoolSize=2;getLargestPoolSize=1;getMaximumPoolSize=3;getPoolSize=1;getTaskCount=14;getQueue().size()=3
pool-1-thread-1:创建者=生产者,创建时间=2014-08-20 12:08:22,执行时间=2014-08-20 12:08:29,当前队列大小=3
getActiveCount=1;getKeepAliveTime=15;getCompletedTaskCount=11;getCorePoolSize=2;getLargestPoolSize=1;getMaximumPoolSize=3;getPoolSize=1;getTaskCount=14;getQueue().size()=2
pool-1-thread-1:创建者=生产者,创建时间=2014-08-20 12:08:23,执行时间=2014-08-20 12:08:30,当前队列大小=2
getActiveCount=0;getKeepAliveTime=15;getCompletedTaskCount=12;getCorePoolSize=2;getLargestPoolSize=1;getMaximumPoolSize=3;getPoolSize=1;getTaskCount=14;getQueue().size()=1
pool-1-thread-1:创建者=生产者,创建时间=2014-08-20 12:08:23,执行时间=2014-08-20 12:08:31,当前队列大小=1
getActiveCount=1;getKeepAliveTime=15;getCompletedTaskCount=12;getCorePoolSize=2;getLargestPoolSize=1;getMaximumPoolSize=3;getPoolSize=1;getTaskCount=14;getQueue().size()=1
pool-1-thread-1:创建者=生产者,创建时间=2014-08-20 12:08:24,执行时间=2014-08-20 12:08:32,当前队列大小=0
getActiveCount=0;getKeepAliveTime=15;getCompletedTaskCount=14;getCorePoolSize=2;getLargestPoolSize=1;getMaximumPoolSize=3;getPoolSize=1;getTaskCount=14;getQueue().size()=0
getActiveCount=0;getKeepAliveTime=15;getCompletedTaskCount=14;getCorePoolSize=2;getLargestPoolSize=1;getMaximumPoolSize=3;getPoolSize=1;getTaskCount=14;getQueue().size()=0
getActiveCount=0;getKeepAliveTime=15;getCompletedTaskCount=14;getCorePoolSize=2;getLargestPoolSize=1;getMaximumPoolSize=3;getPoolSize=1;getTaskCount=14;getQueue().size()=0
getActiveCount=0;getKeepAliveTime=15;getCompletedTaskCount=14;getCorePoolSize=2;getLargestPoolSize=1;getMaximumPoolSize=3;getPoolSize=1;getTaskCount=14;getQueue().size()=0
getActiveCount=0;getKeepAliveTime=15;getCompletedTaskCount=14;getCorePoolSize=2;getLargestPoolSize=1;getMaximumPoolSize=3;getPoolSize=1;getTaskCount=14;getQueue().size()=0
getActiveCount=0;getKeepAliveTime=15;getCompletedTaskCount=14;getCorePoolSize=2;getLargestPoolSize=1;getMaximumPoolSize=3;getPoolSize=1;getTaskCount=14;getQueue().size()=0
getActiveCount=0;getKeepAliveTime=15;getCompletedTaskCount=14;getCorePoolSize=2;getLargestPoolSize=1;getMaximumPoolSize=3;getPoolSize=1;getTaskCount=14;getQueue().size()=0
getActiveCount=0;getKeepAliveTime=15;getCompletedTaskCount=14;getCorePoolSize=2;getLargestPoolSize=1;getMaximumPoolSize=3;getPoolSize=1;getTaskCount=14;getQueue().size()=0
getActiveCount=0;getKeepAliveTime=15;getCompletedTaskCount=14;getCorePoolSize=2;getLargestPoolSize=1;getMaximumPoolSize=3;getPoolSize=1;getTaskCount=14;getQueue().size()=0
getActiveCount=0;getKeepAliveTime=15;getCompletedTaskCount=14;getCorePoolSize=2;getLargestPoolSize=1;getMaximumPoolSize=3;getPoolSize=1;getTaskCount=14;getQueue().size()=0
getActiveCount=0;getKeepAliveTime=15;getCompletedTaskCount=14;getCorePoolSize=2;getLargestPoolSize=1;getMaximumPoolSize=3;getPoolSize=1;getTaskCount=14;getQueue().size()=0
getActiveCount=0;getKeepAliveTime=15;getCompletedTaskCount=14;getCorePoolSize=2;getLargestPoolSize=1;getMaximumPoolSize=3;getPoolSize=1;getTaskCount=14;getQueue().size()=0
getActiveCount=0;getKeepAliveTime=15;getCompletedTaskCount=14;getCorePoolSize=2;getLargestPoolSize=1;getMaximumPoolSize=3;getPoolSize=1;getTaskCount=14;getQueue().size()=0
getActiveCount=0;getKeepAliveTime=15;getCompletedTaskCount=14;getCorePoolSize=2;getLargestPoolSize=1;getMaximumPoolSize=3;getPoolSize=1;getTaskCount=14;getQueue().size()=0
getActiveCount=0;getKeepAliveTime=15;getCompletedTaskCount=14;getCorePoolSize=2;getLargestPoolSize=1;getMaximumPoolSize=3;getPoolSize=1;getTaskCount=14;getQueue().size()=0
getActiveCount=0;getKeepAliveTime=15;getCompletedTaskCount=14;getCorePoolSize=2;getLargestPoolSize=1;getMaximumPoolSize=3;getPoolSize=1;getTaskCount=14;getQueue().size()=0
getActiveCount=0;getKeepAliveTime=15;getCompletedTaskCount=14;getCorePoolSize=2;getLargestPoolSize=1;getMaximumPoolSize=3;getPoolSize=1;getTaskCount=14;getQueue().size()=0
getActiveCount=0;getKeepAliveTime=15;getCompletedTaskCount=14;getCorePoolSize=2;getLargestPoolSize=1;getMaximumPoolSize=3;getPoolSize=1;getTaskCount=14;getQueue().size()=0
getActiveCount=0;getKeepAliveTime=15;getCompletedTaskCount=14;getCorePoolSize=2;getLargestPoolSize=1;getMaximumPoolSize=3;getPoolSize=1;getTaskCount=14;getQueue().size()=0
getActiveCount=0;getKeepAliveTime=15;getCompletedTaskCount=14;getCorePoolSize=2;getLargestPoolSize=1;getMaximumPoolSize=3;getPoolSize=1;getTaskCount=14;getQueue().size()=0
getActiveCount=0;getKeepAliveTime=15;getCompletedTaskCount=14;getCorePoolSize=2;getLargestPoolSize=1;getMaximumPoolSize=3;getPoolSize=1;getTaskCount=14;getQueue().size()=0
getActiveCount=0;getKeepAliveTime=15;getCompletedTaskCount=14;getCorePoolSize=2;getLargestPoolSize=1;getMaximumPoolSize=3;getPoolSize=1;getTaskCount=14;getQueue().size()=0
getActiveCount=0;getKeepAliveTime=15;getCompletedTaskCount=14;getCorePoolSize=2;getLargestPoolSize=1;getMaximumPoolSize=3;getPoolSize=1;getTaskCount=14;getQueue().size()=0
getActiveCount=0;getKeepAliveTime=15;getCompletedTaskCount=14;getCorePoolSize=2;getLargestPoolSize=1;getMaximumPoolSize=3;getPoolSize=1;getTaskCount=14;getQueue().size()=0
getActiveCount=0;getKeepAliveTime=15;getCompletedTaskCount=14;getCorePoolSize=2;getLargestPoolSize=1;getMaximumPoolSize=3;getPoolSize=1;getTaskCount=14;getQueue().size()=0
getActiveCount=0;getKeepAliveTime=15;getCompletedTaskCount=14;getCorePoolSize=2;getLargestPoolSize=1;getMaximumPoolSize=3;getPoolSize=1;getTaskCount=14;getQueue().size()=0
 
Process  finished with exit code -1


这个是理想情况,如果生产者创建速度大于消费者速度,则会随着时间推移耗尽系统资源,这个需要通过RejectedExecutionHandler来实现。


对这个例子,做了一些改动,可以更加清楚看到执行过程:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
import  java.text.SimpleDateFormat;
import  java.util.Date;
import  java.util.concurrent.ArrayBlockingQueue;
import  java.util.concurrent.LinkedBlockingQueue;
import  java.util.concurrent.ThreadPoolExecutor;
import  java.util.concurrent.TimeUnit;
 
public  class  TestThreadPool {
     //    public static BlockingQueue<Runnable> queue = new ArrayBlockingQueue<Runnable>(10);
//    public static ArrayBlockingQueue<Runnable> queue = new ArrayBlockingQueue<Runnable>(10);
//    public static ArrayBlockingQueue<Runnable> queue = new ArrayBlockingQueue<Runnable>(10);
     public  static  LinkedBlockingQueue<Runnable> queue =  new  LinkedBlockingQueue<Runnable>();
 
     public  static  void  main(String[] args) {
         for  ( int  i =  0 ; i <  2 ; i++) {
             queue.add( new  TestThread( "初始化" ));
         }
 
         final  ThreadPoolExecutor executor =  new  ThreadPoolExecutor( 1 3 15 , TimeUnit.SECONDS, queue);
//        System.out.println("getActiveCount=" + executor.getActiveCount()
//                        + ";getKeepAliveTime=" + executor.getKeepAliveTime(TimeUnit.SECONDS)
//                        + ";getCompletedTaskCount=" + executor.getCompletedTaskCount()
//                        + ";getCorePoolSize=" + executor.getCorePoolSize()
//                        + ";getLargestPoolSize=" + executor.getLargestPoolSize()
//                        + ";getMaximumPoolSize=" + executor.getMaximumPoolSize()
//                        + ";getPoolSize=" + executor.getPoolSize()
//                        + ";getTaskCount=" + executor.getTaskCount()
//                        + ";getQueue().size()=" + executor.getQueue().size()
//        );
//        executor.execute(new Thread());
         executor.prestartCoreThread();
//        System.out.println("getActiveCount=" + executor.getActiveCount()
//                        + ";getKeepAliveTime=" + executor.getKeepAliveTime(TimeUnit.SECONDS)
//                        + ";getCompletedTaskCount=" + executor.getCompletedTaskCount()
//                        + ";getCorePoolSize=" + executor.getCorePoolSize()
//                        + ";getLargestPoolSize=" + executor.getLargestPoolSize()
//                        + ";getMaximumPoolSize=" + executor.getMaximumPoolSize()
//                        + ";getPoolSize=" + executor.getPoolSize()
//                        + ";getTaskCount=" + executor.getTaskCount()
//                        + ";getQueue().size()=" + executor.getQueue().size()
//        );
 
         new  Thread( new  Runnable() {
             @Override
             public  void  run() {
                 while  ( true ) {
                     System.out.println( "getActiveCount="  + executor.getActiveCount()
                                     ";getKeepAliveTime="  + executor.getKeepAliveTime(TimeUnit.SECONDS)
                                     ";getCompletedTaskCount="  + executor.getCompletedTaskCount()
                                     ";getCorePoolSize="  + executor.getCorePoolSize()
                                     ";getLargestPoolSize="  + executor.getLargestPoolSize()
                                     ";getMaximumPoolSize="  + executor.getMaximumPoolSize()
                                     ";getPoolSize="  + executor.getPoolSize()
                                     ";getTaskCount="  + executor.getTaskCount()
                                     ";getQueue().size()="  + executor.getQueue().size()
                     );
                     try  {
                         Thread.currentThread().sleep(200L);
                     catch  (InterruptedException e) {
                         e.printStackTrace();
                     }
                 }
             }
         }).start();
 
         new  Thread( new  Runnable() {
             @Override
             public  void  run() {
                 int  i =  0 ;
                 while  ( true ) {
                     queue.add( new  TestThread( "生产者" ));
                     try  {
                         Thread.currentThread().sleep(100L);
                     catch  (InterruptedException e) {
                         e.printStackTrace();
                     }
                     i++;
                     if  (i >  100 break ;
                 }
             }
         }).start();
     }
}
 
class  TestThread  implements  Runnable {
     public  static  SimpleDateFormat sdf =  new  SimpleDateFormat( "yyyy-MM-dd HH:mm:ss" );
     private  String name;         //创建者
     private  Date addDate;        //添加到队列的日期
 
     TestThread(String name) {
         this .name = name;
         this .addDate =  new  Date();
     }
 
     @Override
     public  void  run() {
         System.out.println(Thread.currentThread().getName() +
                 ":创建者="  + name +  ",创建时间="  + sdf.format(addDate) +  ",执行时间="  + sdf.format( new  Date()) +  ",当前队列大小="  + TestThreadPool.queue.size());
 
         System.out.println(TestThreadPool.queue.peek());
         try  {
             Thread.currentThread().sleep(1000L);
         catch  (InterruptedException e) {
             e.printStackTrace();
         }
     }
}