以下代码在《Java多线程设计模式》(结城浩著,博硕文化译,中国铁道出版社,2005)第8章"Worler Thread --等到工作来,来了就工作"代码基础上改进而得。
改动主要为:
(1)提炼了IRequest接口
(2)添加了终结工作线程的方法。
参与者:
Client--委托方。
Channel--生产线
Request--生产任务
WorkerThread--工作线程
委托方把Request放入Channel中。Worker从Channel中取出Request,进行加工。
代码:
Channel.java
IRequest.java
WorkerThread.java
Client方调用方法:
(1) 首先初始化Channel,设定工作线程数。
int workerCount = ......;
channel = new Channel(workerCount);
(2)启动生产线
channel.startWorkers();
(3)放置Request
channel.putRequest(aRequest);
(4)下班了--停止生产线
channel.stopWorkers();
改动主要为:
(1)提炼了IRequest接口
(2)添加了终结工作线程的方法。
参与者:
Client--委托方。
Channel--生产线
Request--生产任务
WorkerThread--工作线程
委托方把Request放入Channel中。Worker从Channel中取出Request,进行加工。
代码:
Channel.java
public
class
Channel
{
private static final int MAX_REQUEST = 100 ;
private final IRequest[] requestQueue;
private int tail; // 下一个putRequest的地方
private int head; // 下一个takeRequest的地方
private int count; // Request的数量
private boolean stoped = true ;
private final WorkerThread[] threadPool;
public Channel( int threads)
{
this .requestQueue = new Request[MAX_REQUEST];
this .head = 0 ;
this .tail = 0 ;
this .count = 0 ;
threadPool = new WorkerThread[threads];
for ( int i = 0 ; i < threadPool.length; i ++ )
{
threadPool[i] = new WorkerThread( this );
}
}
public void startWorkers()
{
this .stoped = false ;
for ( int i = 0 ; i < threadPool.length; i ++ )
{
threadPool[i].start();
}
}
public synchronized int getCount()
{
return this .count;
}
public synchronized boolean isStoped()
{
return this .stoped;
}
public synchronized void stopWorkers()
{
this .stoped = true ;
}
public synchronized void putRequest(IRequest request)
{
while (count >= requestQueue.length)
{
try
{
wait();
}
catch (InterruptedException e)
{
}
}
requestQueue[tail] = request;
tail = (tail + 1 ) % requestQueue.length;
count ++ ;
notifyAll();
}
public synchronized IRequest takeRequest()
{
while (count <= 0 )
{
try
{
wait();
}
catch (InterruptedException e)
{
}
}
IRequest request = requestQueue[head];
head = (head + 1 ) % requestQueue.length;
count -- ;
notifyAll();
return request;
}
}
{
private static final int MAX_REQUEST = 100 ;
private final IRequest[] requestQueue;
private int tail; // 下一个putRequest的地方
private int head; // 下一个takeRequest的地方
private int count; // Request的数量
private boolean stoped = true ;
private final WorkerThread[] threadPool;
public Channel( int threads)
{
this .requestQueue = new Request[MAX_REQUEST];
this .head = 0 ;
this .tail = 0 ;
this .count = 0 ;
threadPool = new WorkerThread[threads];
for ( int i = 0 ; i < threadPool.length; i ++ )
{
threadPool[i] = new WorkerThread( this );
}
}
public void startWorkers()
{
this .stoped = false ;
for ( int i = 0 ; i < threadPool.length; i ++ )
{
threadPool[i].start();
}
}
public synchronized int getCount()
{
return this .count;
}
public synchronized boolean isStoped()
{
return this .stoped;
}
public synchronized void stopWorkers()
{
this .stoped = true ;
}
public synchronized void putRequest(IRequest request)
{
while (count >= requestQueue.length)
{
try
{
wait();
}
catch (InterruptedException e)
{
}
}
requestQueue[tail] = request;
tail = (tail + 1 ) % requestQueue.length;
count ++ ;
notifyAll();
}
public synchronized IRequest takeRequest()
{
while (count <= 0 )
{
try
{
wait();
}
catch (InterruptedException e)
{
}
}
IRequest request = requestQueue[head];
head = (head + 1 ) % requestQueue.length;
count -- ;
notifyAll();
return request;
}
}
IRequest.java
public
interface
IRequest
{
public abstract void execute() throws Exception;
}
{
public abstract void execute() throws Exception;
}
WorkerThread.java
public
class
WorkerThread
extends
Thread
{
private final Channel channel;
public WorkerThread(Channel channel)
{
this .channel = channel;
}
public void run()
{
System.out.println( " [Thread]: WorkerThread " + this .getName()
+ " start! " );
while ( true )
{
if (channel.getCount() <= 0 && channel.isStoped())
{
System.out.println( " [Thread]: WorkerThread " + this .getName()
+ " stop! " );
stop();
}
else
{
IRequest request = channel.takeRequest();
try
{
request.execute();
}
catch (Exception e)
{
System.out.println(e.getMessage());
}
}
}
}
}
{
private final Channel channel;
public WorkerThread(Channel channel)
{
this .channel = channel;
}
public void run()
{
System.out.println( " [Thread]: WorkerThread " + this .getName()
+ " start! " );
while ( true )
{
if (channel.getCount() <= 0 && channel.isStoped())
{
System.out.println( " [Thread]: WorkerThread " + this .getName()
+ " stop! " );
stop();
}
else
{
IRequest request = channel.takeRequest();
try
{
request.execute();
}
catch (Exception e)
{
System.out.println(e.getMessage());
}
}
}
}
}
Client方调用方法:
(1) 首先初始化Channel,设定工作线程数。
int workerCount = ......;
channel = new Channel(workerCount);
(2)启动生产线
channel.startWorkers();
(3)放置Request
channel.putRequest(aRequest);
(4)下班了--停止生产线
channel.stopWorkers();
workers把目前生产线上的Request处理完后,自己Stop掉自己。
本文转自xiaotie博客园博客,原文链接http://www.cnblogs.com/xiaotie/archive/2005/11/02/267044.html如需转载请自行联系原作者
xiaotie 集异璧实验室(GEBLAB)