前序:

   Thread-Per-Message Pattern,是一种对于每个命令或请求,都分配一个线程,由这个线程执行工作。它将“委托消息的一端”和“执行消息的一端”用两个不同的线程来实现。该线程模式主要包括三个部分:

   1,Request参与者(委托人),也就是消息发送端或者命令请求端

   2,Host参与者,接受消息的请求,负责为每个消息分配一个工作线程。

   3,Worker参与者,具体执行Request参与者的任务的线程,由Host参与者来启动。

   由于常规调用一个方法后,必须等待该方法完全执行完毕后才能继续执行下一步操作,而利用线程后,就不必等待具体任务执行完毕,就可以马上返回继续执行下一步操作。

背景:

   由于在Thread-Per-Message Pattern中对于每一个请求都会生成启动一个线程,而线程的启动是很花费时间的工作,所以鉴于此,提出了Worker Thread,重复利用已经启动的线程。

线程池:

   Worker Thread,也称为工人线程或背景线程,不过一般都称为线程池。该模式主要在于,事先启动一定数目的工作线程。当没有请求工作的时候,所有的工人线程都会等待新的请求过来,一旦有工作到达,就马上从线程池中唤醒某个线程来执行任务,执行完毕后继续在线程池中等待任务池的工作请求的到达。

   任务池:主要是存储接受请求的集合,利用它可以缓冲接受到的请求,可以设置大小来表示同时能够接受最大请求数目。这个任务池主要是供线程池来访问。

   线程池:这个是工作线程所在的集合,可以通过设置它的大小来提供并发处理的工作量。对于线程池的大小,可以事先生成一定数目的线程,根据实际情况来动态增加或者减少线程数目。线程池的大小不是越大越好,线程的切换也会耗时的。

   存放池的数据结构,可以用数组也可以利用集合,在集合类中一般使用Vector,这个是线程安全的。

  Worker Thread的所有参与者:

   1,Client参与者,发送Request的参与者

   2,Channel参与者,负责缓存Request的请求,初始化启动线程,分配工作线程

   3,Worker参与者,具体执行Request的工作线程

   4,Request参与者

注意:将在Worker线程内部等待任务池非空的方式称为正向等待

     将在Channel线程提供Worker线程来判断任务池非空的方式称为反向等待

线程池实例1:

   利用同步方法来实现,使用数组来作为任务池的存放数据结构。在Channel有缓存请求方法和处理请求方法,利用生成者与消费者模式来处理存储请求,利用反向等待来判断任务池的非空状态。

Channel参与者:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
package  whut.threadpool;
//用到了生产者与消费者模式
//生成线程池,接受客户端线程的请求,找到一个工作线程分配该客户端请求
public  class  Channel {
     private  static  final  int  MAX_REQUEST =  100 ; // 并发数目,就是同时可以接受多少个客户端请求
     //利用数组来存放请求,每次从数组末尾添加请求,从开头移除请求来处理
     private  final  Request[] requestQueue; // 存储接受客户线程的数目
     private  int  tail; //下一次存放Request的位置
     private  int  head; //下一次获取Request的位置
     private  int  count; // 当前request数量
     private  final  WorkerThread[] threadPool; // 存储线程池中的工作线程
     // 运用数组来存储
     public  Channel( int  threads) {
         this .requestQueue =  new  Request[MAX_REQUEST];
         this .head =  0 ;
         this .head =  0 ;
         this .count =  0 ;
         threadPool =  new  WorkerThread[threads];
         // 启动工作线程
         for  ( int  i =  0 ; i < threadPool.length; i++) {
             threadPool[i] =  new  WorkerThread( "Worker-"  + i,  this );
         }
     }
     public  void  startWorkers() {
         for  ( int  i =  0 ; i < threadPool.length; i++) {
             threadPool[i].start();
         }
     }
     // 接受客户端请求线程
     public  synchronized  void  putRequest(Request request) {
         // 当Request的数量大于或等于同时接受的数目时候,要等待
         while  (count >= requestQueue.length)
             try  {
                 wait();
             catch  (InterruptedException e) {
             }
         requestQueue[tail] = request;
         tail = (tail +  1 ) % requestQueue.length;
         count++;
         notifyAll();
     }
     // 处理客户端请求线程
     public  synchronized  Request takeRequest() {
         while  (count <=  0 )
             try  {
                 wait();
             catch  (InterruptedException e) {
             }
         Request request = requestQueue[head];
         head = (head +  1 ) % requestQueue.length;
         count--;
         notifyAll();
         return  request;
     }
}

客户端请求线程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
package  whut.threadpool;
import  java.util.Random;
//向Channel发送Request请求的
public  class  ClientThread  extends  Thread{
     private  final  Channel channel;
     private  static  final  Random random= new  Random();
                                                               
     public  ClientThread(String name,Channel channel)
     {
         super (name);
         this .channel=channel;
     }
     public  void  run()
     {
         try {
             for ( int  i= 0 ; true ;i++)
             {
                 Request request= new  Request(getName(),i);
                 channel.putRequest(request);
                 Thread.sleep(random.nextInt( 1000 ));
             }
         } catch (InterruptedException e)
         {
         }
     }
}

工作线程:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
package  whut.threadpool;
//具体工作线程
public  class  WorkerThread  extends  Thread{
                                                      
     private  final  Channel channel;
     public  WorkerThread(String name,Channel channel)
     {
       super (name);
       this .channel=channel;
     }
                                                      
     public  void  run()
     {
         while ( true )
         {
             Request request=channel.takeRequest();
             request.execute();
         }
     }
}

线程池实例2:

   利用同步块来处理,利用Vector来存储客户端请求。在Channel有缓存请求方法和处理请求方法,利用生成者与消费者模式来处理存储请求,利用正向等待来判断任务池的非空状态。

   这种实例,可以借鉴到网络ServerSocket处理用户请求的模式中,有很好的扩展性与实用性。

   利用Vector来存储,依旧是每次集合的最后一个位置添加请求,从开始位置移除请求来处理

Channel参与者:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
package  whut.threadpool2;
import  java.util.Vector;
/*
  * 这个主要的作用如下
  * 0,缓冲客户请求线程(利用生产者与消费者模式)
  * 1,存储客户端请求的线程
  * 2,初始化启动一定数量的线程
  * 3,主动来唤醒处于任务池中wait set的一些线程来执行任务
  */
public  class  Channel {
     public  final  static  int  THREAD_COUNT= 4 ;
     public  static  void  main(String[] args) {
       //定义两个集合,一个是存放客户端请求的,利用Vector,
       //一个是存储线程的,就是线程池中的线程数目
                             
       //Vector是线程安全的,它实现了Collection和List
       //Vector 类可以实现可增长的对象数组。与数组一样,
       //它包含可以使用整数索引进行访问的组件。但Vector 的大小可以根据需要增大或缩小,
       //以适应创建 Vector 后进行添加或移除项的操作。
       //Collection中主要包括了list相关的集合以及set相关的集合,Queue相关的集合
       //注意:Map不是Collection的子类,都是java.util.*下的同级包
       Vector pool= new  Vector();
       //工作线程,初始分配一定限额的数目
       WorkerThread[] workers= new  WorkerThread[THREAD_COUNT];
                          
       //初始化启动工作线程
       for ( int  i= 0 ;i<workers.length;i++)
       {
           workers[i]= new  WorkerThread(pool);
           workers[i].start();
       }
                           
       //接受新的任务,并且将其存储在Vector中
       Object task= new  Object(); //模拟的任务实体类
       //此处省略具体工作
       //在网络编程中,这里就是利用ServerSocket来利用ServerSocket.accept接受一个Socket从而唤醒线程
                           
       //当有具体的请求达到
       synchronized (pool)
       {
           pool.add(pool.size(), task);
           pool.notifyAll(); //通知所有在pool wait set中等待的线程,唤醒一个线程进行处理
       }
       //注意上面这步骤添加任务池请求,以及通知线程,都可以放在工作线程内部实现
       //只需要定义该方法为static,在方法体用同步块,且共享的线程池也是static即可
                           
       //下面这步,可以有可以没有根据实际情况
       //取消等待的线程
       for ( int  i= 0 ;i<workers.length;i++)
       {
           workers[i].interrupt();
       }
     }
}

工作线程:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
package  whut.threadpool2;
import  java.util.List;
public  class  WorkerThread  extends  Thread {
     private  List pool; //任务请求池
     private  static  int  fileCompressed= 0 ; //所有实例共享的
                     
     public  WorkerThread(List pool)
     {
           this .pool=pool; 
     }
                     
     //利用静态synchronized来作为整个synchronized类方法,仅能同时一个操作该类的这个方法
     private  static  synchronized  void  incrementFilesCompressed()
     {
         fileCompressed++;
     }
                     
     public  void  run()
     {
         //保证无限循环等待中
         while ( true )
         {
             //共享互斥来访问pool变量
             synchronized (pool)
             {
                 //利用多线程设计模式中的
                 //Guarded Suspension Pattern,警戒条件为pool不为空,否则无限的等待中
                 while (pool.isEmpty())
                 {
                     try {
                         pool.wait(); //进入pool的wait set中等待着,释放了pool的锁
                     } catch (InterruptedException e)
                     {
                     }
                 }
                 //当线程被唤醒,需要重新获取pool的锁,
                 //再次继续执行synchronized代码块中其余的工作
                 //当不为空的时候,继续再判断是否为空,如果不为空,则跳出循环
                 //必须先从任务池中移除一个任务来执行,统一用从末尾添加,从开始处移除
                                 
                 pool.remove( 0 ); //获取任务池中的任务,并且要进行转换
             }
             //下面是线程所要处理的具体工作
         }
     }
}