写一个简单的工作流(四)资源的处理

简介:
  昨天晚上搞到深夜,终于将资源模块搞定。到今天已经完成的功能包括:
1.四种基本路由:顺序、选择、并行、循环
2.流程定义文件和系统配置文件的读取和解析
3.使用内存作为流程数据和案例数据存储的MemoryWorkFlowDAO的开发
4.资源模块的开发
5.并发情况下的正确性测试等

    计划中的功能:
1.一个GUI的流程定义工具,这个不急,也还没想好用什么做,web还是桌面?
2.各个数据库版本的WorkFlowDAO的开发,将流程数据和案例数据保存在数据库中。
3.更多的测试和example试验。

    回到资源这个概念,工作流中工作项(work item)的由资源来驱动的,这个资源(resource)可能是用户、角色、定时时间或者某个事件消息。在标准petri网中,工作项对应于transition(变迁),变迁都是自动的,不需要所谓资源来驱动,显然,这与工作流系统不同。具体到insect workflow(我取的名字,小巧之意),每个transition都有一个resource,用于驱动自身的firing,所有的resource都实现Resource接口:
public   interface  Resource  extends  Serializable {

    
public   void  start(Transition transition, Token token, Object args);
           
    
public  ResourceType getType();

    
public   long  getId();

}
    每个资源都有一个类型,以及这个类型中独一无二的id,start方法用于驱动transtion的firing。一般情况下,你不需要实现这个接口,只要继承这个接口的抽象实现类AbstractResource,AbstractResource的start方法默认实现是首先调用模板方法doAction(稍后解释),然后检查触发条件,如果通过就直接调用transition的fire方法:
public   abstract   class  AbstractResource  implements  Resource {
         
    
public   void  start(Transition transition, Token token, Object args) {
        doAction(transition, token, args);

        
if  (transition.getCondition()  !=   null
                
&&   ! transition.getCondition().check(token))
            
throw   new  ConditionException(transition.getName()
                    
+   "  transition没有满足触发条件 " );
        transition.fire(token, args);
    }
    
public   abstract   void  doAction(Transition transition, Token token,
            Object args)
;
       
}

    Transtion类的fire方法有三个操作组成:从输入库所移走token,往输出库所放入token,回调handler:
    public   void  fire(Token token, Object args) {
        removeTokenFromInputs(token);
        addTokenToOutputs(token);
        invokeHandler(token, args);
    }
    那么具体的资源显然要实现AbstractResource中的doAction抽象方法,系统内置了五种资源:自动资源(AutoResource)、用户(User)、用户组(Group)、定时器(TimerResource)和事件监听器(ObserverResource)。显然,AutoResource、User和Group的doAction方法不需要做任何事情:
public   class  User  extends  AbstractResource {
    
protected  Group group;
            
    @Override
    
public   void  doAction(Transition transition, Token token, Object arg) {
    }

}
   
    而TimerResource就需要做特殊处理了,比如我们要达到这样的效果:节点1状态已经处于就绪,可以被触发,可我们希望在就绪后延迟半分钟再触发,或者在晚上10点触发等等。这样的定时需求很常见,我采用了jdk5引入的ScheduledExecutorService来处理。系统中启动这样一个线程池,每个类似上面的请求都提交给这个线程池来处理,那么TimerResource就需要进行相应的修改:
public   abstract   class  TimerResource  extends  AbstractResource {

    
protected   int  pool_size;

    
protected   static  ScheduledExecutorService scheduledExecutorService;

    @Override
    
public   long  getId() {
        
//  TODO Auto-generated method stub
         return  Common.TIMER_RESOURCE_ID;
    }

    
public  TimerResource() {
        
this .pool_size  =   5 ;
        scheduledExecutorService 
=  Executors.newScheduledThreadPool(pool_size);
    }

    
public   static   void  shutdownPool() {
        
if  (scheduledExecutorService  !=   null )
            scheduledExecutorService.shutdown();
    }

    
public   final   void  start(Transition transition, Token token, Object args)
            
throws  InterruptedException {
        
if  (transition.getCondition()  !=   null
                
&&   ! transition.getCondition().check(token))
            
throw   new  ConditionException(transition.getName()
                    
+   "  transition没有满足触发条件 " );
        transition.removeTokenFromInputs(token);
        doAction(transition, token, args);
    }

    
protected   class  ChangeRunner  implements  Runnable {
        
private  Transition transition;

        
private  Token token;

        
private  Object[] args;

        
public  ChangeRunner(Transition transition, Token token, Object args) {
            
this .transition  =  transition;
            
this .token  =  token;
            
this .args  =  args;
        }

        
public   void  run() {
            
if  (transition.getCondition()  !=   null
                    
&&   ! transition.getCondition().check(token))
                
throw   new  ConditionException(transition.getName()
                        
+   "  transition没有满足触发条件 " );
            transition.addTokenToOutputs(token);
            Object real_args[] 
=   new  Object[args.length  -   2 ];
            
for  ( int  i  =   0 ; i  <  real_args.length; i ++ )
                real_args[i] 
=  args[i  +   2 ];
            transition.invokeHandler(token, real_args);
            
try  {
                
//  回调
                ((WorkFlowAlgorithm) args[ 1 ]).enabledTraversing(token
                        .getWorkFlow());
                ((WorkFlowManager) args[
0 ]).doAction(token.getId());

            } 
catch  (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }
    }
}
    注意到,start方法不再是直接调用transition的fire方法,而仅仅是进行了第一步操作:移除输入库所的place防止重复提交。后两步操作都延迟到了提交给线程池的任务中,也就是代码中的 ChangeRunner类中的run方法。例如TimerResource的子类DelayTimerResource用于处理延迟的触发,doAction就像这样:
public   class  DelayTimerResource  extends  TimerResource {
    
    @Override
    
public   void  doAction(Transition transition, Token token, Object args) {
        scheduledExecutorService.schedule(
new  ChangeRunner(transition, token,
                args), 
this .delay,  this .timeUnit);

    }
}
    延迟的时间,时间单位这些信息都可以在流程定义文件中设置。事件监听器资源与此类似,ObserverResource实现了java.util.Observer接口,往输出库所放入token和回调handler两步操作被放在了update方法中提供给Subject回调。
   文章转自庄周梦蝶  ,原文发布时间2007-10-13
目录
相关文章
|
17天前
工作流介绍
工作流介绍
|
10月前
工作流
工作流
126 0
|
6月前
|
弹性计算 运维 持续交付
基于资源编排服务(ROS)实现存量资源的IaC化
如果您需要一种简单而有效的方法来管理大量云资源并实现自动化部署,推荐使用阿里云的资源编排服务ROS(Resource Orchestration Service)。ROS能够将存量资源转化为IaC(基础设施即代码),通过资源场景创建、模版生成和资源栈导入等功能,实现资源的统一管理和自动化部署。这不仅提高了资源管理的效率,还降低了成本。如果您想了解如何更轻松地管理云资源并加速部署流程,ROS是一个值得深入了解的工具。
79 1
powerjob配置工作流
powerjob工作流内配置依赖关系及判断节点
785 0
powerjob配置工作流
|
存储 弹性计算 监控
浅析数据工作流Prefect
简述 Prefect 是一种新的工作流管理系统,专为现代基础设施而设计,由开源的 Prefect Core 工作流引擎提供支持。 用户只需将任务组织成流程,Prefect 负责其余的工作,可让您非常容易使用数据工作流并添加重试、日志记录、动态映射、缓存、失败通知等语义。
|
开发工具 git
GitFlow工作流
GitFlow工作流
|
存储 弹性计算 运维
资源编排ROS之资源场景
# 背景 [资源编排服务](https://help.aliyun.com/document_detail/28852.html)(Resource Orchestration Service, 简称ROS)是阿里云提供的一项简化云计算资源管理的服务。您可以遵循ROS定义的模板规范编写资源栈模板,在模板中定义所需的云计算资源(例如ECS实例、RDS数据库实例)、资源间的依赖关系等。ROS的编排引擎
209 0
资源编排ROS之资源场景
|
弹性计算 运维 关系型数据库
资源编排ROS之资源栈组StackGroup
## 背景 [资源编排服务](https://help.aliyun.com/document_detail/28852.html)(Resource Orchestration Service, 简称 ROS)是阿里云提供的一项简化云计算资源管理的服务。您可以遵循 ROS 定义的模板规范编写资源栈模板,在模板中定义所需的云计算资源(例如 ECS 实例、RDS 数据库实例)、资源间的依赖关系
894 0
|
消息中间件 JSON 弹性计算
资源编排ROS之自定制资源(基础篇)
本文介绍资源编排ROS的基础知识配置。
1788 1
资源编排ROS之自定制资源(基础篇)