调度模式·Worker-Channel-Request

简介:
以下代码在《Java多线程设计模式》(结城浩著,博硕文化译,中国铁道出版社,2005)第8章"Worler Thread --等到工作来,来了就工作"代码基础上改进而得。

改动主要为:
(1)提炼了IRequest接口
(2)添加了终结工作线程的方法。

参与者:
Client--委托方。
Channel--生产线
Request--生产任务
WorkerThread--工作线程

委托方把Request放入Channel中。Worker从Channel中取出Request,进行加工。

代码:

Channel.java

public   class  Channel
{
    
private   static   final   int  MAX_REQUEST  =   100 ;

    
private   final  IRequest[] requestQueue;

    
private   int  tail;  //  下一个putRequest的地方

    
private   int  head;  //  下一个takeRequest的地方

    
private   int  count;  //  Request的数量
    
    
private   boolean  stoped  =   true ;

    
private   final  WorkerThread[] threadPool;

    
public  Channel( int  threads)
    {
        
this .requestQueue  =   new  Request[MAX_REQUEST];
        
this .head  =   0 ;
        
this .tail  =   0 ;
        
this .count  =   0 ;

        threadPool 
=   new  WorkerThread[threads];
        
for  ( int  i  =   0 ; i  <  threadPool.length; i ++ )
        {
            threadPool[i] 
=   new  WorkerThread( this );
        }
    }

    
public   void  startWorkers()
    {
        
this .stoped  =   false ;
        
for  ( int  i  =   0 ; i  <  threadPool.length; i ++ )
        {
            threadPool[i].start();
        }
    }
    
    
public   synchronized   int  getCount()
    {
        
return   this .count;
    }
    
    
public   synchronized   boolean  isStoped()
    {
        
return   this .stoped;
    }
    
    
public   synchronized   void  stopWorkers()
    {
        
this .stoped  =   true ;
    }

    
public   synchronized   void  putRequest(IRequest request)
    {
        
while  (count  >=  requestQueue.length)
        {
            
try
            {
                wait();
            }
            
catch  (InterruptedException e)
            {
            }
        }
        requestQueue[tail] 
=  request;
        tail 
=  (tail  +   1 %  requestQueue.length;
        count
++ ;
        notifyAll();
    }

    
public   synchronized  IRequest takeRequest()
    {
        
while  (count  <=   0 )
        {
            
try
            {
                wait();
            }
            
catch  (InterruptedException e)
            {
            }
        }
        IRequest request 
=  requestQueue[head];
        head 
=  (head  +   1 %  requestQueue.length;
        count
-- ;
        notifyAll();
        
return  request;
    }
}

IRequest.java

public   interface  IRequest
{
    
public   abstract   void  execute()  throws  Exception;
}

WorkerThread.java

public   class  WorkerThread  extends  Thread
{
    
private   final  Channel channel;

    
public  WorkerThread(Channel channel)
    {
        
this .channel  =  channel;
    }

    
public   void  run()
    {
        System.out.println(
" [Thread]: WorkerThread  "   +   this .getName()
                
+   "  start! " );

        
while  ( true )
        {
            
if  (channel.getCount()  <=   0   &&  channel.isStoped())
            {
                System.out.println(
" [Thread]: WorkerThread  "   +   this .getName()
                        
+   "  stop! " );
                stop();
            }
            
else
            {
                IRequest request 
=  channel.takeRequest();
                
try
                {
                    request.execute();
                }
                
catch  (Exception e)
                {
                    System.out.println(e.getMessage());
                }
            }
        }
    }
}

Client方调用方法:

(1) 首先初始化Channel,设定工作线程数。
int workerCount = ......;
channel = new Channel(workerCount);
(2)启动生产线
channel.startWorkers();
(3)放置Request
channel.putRequest(aRequest);
(4)下班了--停止生产线
channel.stopWorkers();

workers把目前生产线上的Request处理完后,自己Stop掉自己。

本文转自xiaotie博客园博客,原文链接http://www.cnblogs.com/xiaotie/archive/2005/11/02/267044.html如需转载请自行联系原作者


xiaotie 集异璧实验室(GEBLAB)

相关文章
|
1月前
|
消息中间件 算法 Serverless
Serverless 应用引擎操作报错合集之阿里函数计算中,总是报错“Process exited unexpectedly before completing request (duration: 0ms, maxMemoryUsage: 0.00MB)”如何解决
Serverless 应用引擎(SAE)是阿里云提供的Serverless PaaS平台,支持Spring Cloud、Dubbo、HSF等主流微服务框架,简化应用的部署、运维和弹性伸缩。在使用SAE过程中,可能会遇到各种操作报错。以下是一些常见的报错情况及其可能的原因和解决方法。
|
1月前
|
API
RT-Thread快速入门-中断管理
RT-Thread快速入门-中断管理
44 0
|
11月前
|
设计模式 并行计算 安全
并发编程模式(future,Master-Worker,生产者消费者模式)
在网上购物时,提交订单后,在收货的这段时间里无需一直在家里等候,可以先干别的事情。类推到程序设计中时,当提交请求时,期望得到答复时,如果这个答复可能很慢。传统的是一直等待到这个答复收到时再去做别的事情,但如果利用Future设计模式就无需等待答复的到来,在等待答复的过程中可以干其他事情。
|
调度 数据库
【Netty】 异步任务调度 ( TaskQueue | ScheduleTaskQueue | SocketChannel 管理 )
【Netty】 异步任务调度 ( TaskQueue | ScheduleTaskQueue | SocketChannel 管理 )
475 0
|
分布式计算 网络协议 Scala
Spark worker 注册功能完成 | 学习笔记
快速学习 Spark worker 注册功能完成
144 0
Spark worker 注册功能完成 | 学习笔记
|
分布式计算 Scala Spark
Spark worker 定时更新心跳 | 学习笔记
快速学习 Spark worker 定时更新心跳
160 0
Spark worker 定时更新心跳 | 学习笔记
|
网络协议 Linux Scala
指定 Master 与 Worker 的启动参数 | 学习笔记
快速学习指定 Master 与 Worker 的启动参数
107 0
|
存储 算法 Unix
bthread源码剖析(四): 通过ParkingLot实现Worker间任务状态同步
通过之前的文章我们知道TaskGroup(以下简称TG)是在死循环等待任务,然后切换栈去执行任务。在当前TG没有任务的时候会进行“工作窃取”窃取其他TG的任务。在没有任务的时候TG会“休眠”,当任务出现的时候被唤醒然后消费。
259 0
|
分布式计算 Spark
SparkStreming:使用Checkpoint创建StreamingContext修改executor-cores、executor-memory等资源信息不生效。
在使用SparkStreaming时,使用StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext _)创建StreamingContext。
1043 0

热门文章

最新文章