坚持学习WF(6):开发可复用的宿主程序

简介:

我们之前写工作流宿主程序一般都是直接写在program.cs文件里,这样复用性比较差。我们就简单的写两个类,这两个类主要实现的是对WorkflowInstance和WorkflowRuntime的封装。我们以后的例子都会使用这两个类。

第一个类是WorkflowInstanceWrapper,代码如下:


 
 
[Serializable]
     public  class WorkflowInstanceWrapper
    {
         private WorkflowInstance _workflowInstance;
         private ManualResetEvent _waitHandle =  new ManualResetEvent( false);
         private Dictionary<String, Object> _outputParameters=  new Dictionary
                < stringobject>();
         private Exception _exception;
         private String _reasonSuspended = String.Empty;

         public WorkflowInstanceWrapper(WorkflowInstance instance)
        {
            _workflowInstance = instance;
        }        
         public Guid Id
        {
             get
            {
                 if (_workflowInstance !=  null)
                    return _workflowInstance.InstanceId;
                 else
                    return Guid.Empty;
            }
        }        
         public Dictionary<String, Object> OutputParameters
        {
             get {  return _outputParameters; }
             set { _outputParameters = value; }
        }        
         public ManualResetEvent WaitHandle
        {
             get {  return _waitHandle; }
             set { _waitHandle = value; }
        }        
         public Exception Exception
        {
             get {  return _exception; }
             set { _exception = value; }
        }        
         public String ReasonSuspended
        {
             get {  return _reasonSuspended; }
             set { _reasonSuspended = value; }
        }        
         public WorkflowInstance WorkflowInstance
        {
             get {  return _workflowInstance; }
        }        
         public  void StopWaiting()
        {
            _waitHandle.Set();
        }
    }

1._exception,_reasonSuspended:表示当工作流非正常终止或挂起时的相关信息。
2. OutputParameters:用来接收工作流的输出参数,工作流运行时引擎将引发 WorkflowCompleted事件。
工作流运行时引擎将在WorkflowCompletedEventArgs 中传入工作流的所有输出参数。 这些参数包括工作
流的 out 和 ref 参数。
 

第二类是WorkflowManager代码如下:  

  public  class WorkflowRuntimeManager : IDisposable
    {
         private WorkflowRuntime _workflowRuntime;
         private Dictionary<Guid, WorkflowInstanceWrapper> _workflows
                               =  new Dictionary<Guid, WorkflowInstanceWrapper>();
      
         public WorkflowRuntimeManager(WorkflowRuntime instance)
        {
            _workflowRuntime = instance;
             if (instance ==  null)
            {
                 throw  new NullReferenceException(
                     " A non-null WorkflowRuntime instance is required ");
            }            
            SubscribeToEvents(instance);
        }

      
         public WorkflowInstanceWrapper StartWorkflow(Type workflowType,
            Dictionary<String, Object> parameters)
        {
            WorkflowInstance instance = _workflowRuntime.CreateWorkflow(
                workflowType, parameters);
            WorkflowInstanceWrapper wrapper
                = AddWorkflowInstance(instance);
            instance.Start();
             return wrapper;
        }
        
         public WorkflowInstanceWrapper StartWorkflow(String markupFileName,
            String rulesMarkupFileName,
            Dictionary<String, Object> parameters)
        {
            WorkflowInstance instance =  null;
            WorkflowInstanceWrapper wrapper =  null;
            XmlReader wfReader =  null;
            XmlReader rulesReader =  null;
             try
            {
                wfReader = XmlReader.Create(markupFileName);
                 if (!String.IsNullOrEmpty(rulesMarkupFileName))
                {
                    rulesReader = XmlReader.Create(rulesMarkupFileName);
                    instance = _workflowRuntime.CreateWorkflow( wfReader, rulesReader, parameters);
                }
                 else
                {
                    instance = _workflowRuntime.CreateWorkflow(wfReader,  null, parameters);
                }
                wrapper = AddWorkflowInstance(instance);
                instance.Start();
            }
             finally
            {
                 if (wfReader !=  null)
                {
                    wfReader.Close();
                }
                 if (rulesReader !=  null)
                {
                    rulesReader.Close();
                }
            }
             return wrapper;
        }

        public WorkflowRuntime WorkflowRuntime
        {
             get {  return _workflowRuntime; }
        }        
         public Dictionary<Guid, WorkflowInstanceWrapper> Workflows
        {
             get {  return _workflows; }
        }        
         public  event EventHandler<WorkflowLogEventArgs> MessageEvent;    

       
         public  void ClearWorkflow(Guid workflowId)
        {
             if (_workflows.ContainsKey(workflowId))
            {
                _workflows.Remove(workflowId);
            }
        }
      
         public  void ClearAllWorkflows()
        {
            _workflows.Clear();
        }
       
         private WorkflowInstanceWrapper AddWorkflowInstance(
            WorkflowInstance instance)
        {
            WorkflowInstanceWrapper wrapper =  null;
             if (!_workflows.ContainsKey(instance.InstanceId))
            {
                wrapper =  new WorkflowInstanceWrapper(instance);
                _workflows.Add(wrapper.Id, wrapper);
            }
             return wrapper;
        }
     
         public WorkflowInstanceWrapper FindWorkflowInstance(Guid workflowId)
        {
            WorkflowInstanceWrapper result =  null;
             if (_workflows.ContainsKey(workflowId))
            {
                result = _workflows[workflowId];
            }
             return result;
        }
       
         public  void WaitAll(Int32 msecondsTimeout)
        {
             if (_workflows.Count >  0)
            {
                WaitHandle[] handles =  new WaitHandle[_workflows.Count];
                Int32 index =  0;
                 foreach (WorkflowInstanceWrapper wrapper
                     in _workflows.Values)
                {
                    handles[index] = wrapper.WaitHandle;
                    index++;
                }
                WaitHandle.WaitAll(handles, msecondsTimeout,  false);
            }
        }
     
         public  void Dispose()
        {
             if (_workflowRuntime !=  null)
            {
                _workflowRuntime.StopRuntime();
                _workflowRuntime.Dispose();
            }
            ClearAllWorkflows();
        }
      
         private  void SubscribeToEvents(WorkflowRuntime runtime)
        {
            runtime.Started +=  new EventHandler<WorkflowRuntimeEventArgs>( runtime_Started);
            runtime.Stopped +=  new EventHandler<WorkflowRuntimeEventArgs>(runtime_Stopped);
            runtime.WorkflowAborted+= ......
            runtime.WorkflowCompleted+= ......
            runtime.WorkflowCreated += ......   
            ............
        }

         void runtime_Started( object sender, WorkflowRuntimeEventArgs e)
        {
            LogStatus(Guid.Empty,  " Started ");
        }
         void runtime_Stopped( object sender, WorkflowRuntimeEventArgs e)
        {
            LogStatus(Guid.Empty,  " Stopped ");
        }
         void runtime_WorkflowCreated( object sender, WorkflowEventArgs e)
        {
            LogStatus(e.WorkflowInstance.InstanceId,  " WorkflowCreated ");
        }
         void runtime_WorkflowStarted( object sender, WorkflowEventArgs e)
        {
            LogStatus(e.WorkflowInstance.InstanceId,  " WorkflowStarted ");
        }
         void runtime_WorkflowIdled( object sender, WorkflowEventArgs e)
        {
            LogStatus(e.WorkflowInstance.InstanceId,  " WorkflowIdled ");
        }

         void runtime_WorkflowCompleted( object sender, WorkflowCompletedEventArgs e)
        {
            LogStatus(e.WorkflowInstance.InstanceId,  " WorkflowCompleted ");
            WorkflowInstanceWrapper wrapper
                = FindWorkflowInstance(e.WorkflowInstance.InstanceId);
             if (wrapper !=  null)
            {
                wrapper.OutputParameters = e.OutputParameters;
                wrapper.StopWaiting();
            }
        }

         void runtime_WorkflowTerminated( object sender,
            WorkflowTerminatedEventArgs e)
        {
            LogStatus(e.WorkflowInstance.InstanceId,  " WorkflowTerminated ");
            WorkflowInstanceWrapper wrapper = FindWorkflowInstance(e.WorkflowInstance.InstanceId);
             if (wrapper !=  null)
            {
                wrapper.Exception = e.Exception;
                wrapper.StopWaiting();
            }
        }

         void runtime_WorkflowSuspended( object sender, WorkflowSuspendedEventArgs e)
        {
            LogStatus(e.WorkflowInstance.InstanceId,  " WorkflowSuspended ");
            WorkflowInstanceWrapper wrapper = FindWorkflowInstance(e.WorkflowInstance.InstanceId);
             if (wrapper !=  null)
            {
                wrapper.ReasonSuspended = e.Error;
            }
        }

         void runtime_WorkflowResumed( object sender, WorkflowEventArgs e)
        {
            LogStatus(e.WorkflowInstance.InstanceId,  " WorkflowResumed ");
        }
         void runtime_WorkflowPersisted( object sender, WorkflowEventArgs e)
        {
            LogStatus(e.WorkflowInstance.InstanceId,  " WorkflowPersisted ");
        }
         void runtime_WorkflowLoaded( object sender, WorkflowEventArgs e)
        {
            LogStatus(e.WorkflowInstance.InstanceId,  " WorkflowLoaded ");
        }
         void runtime_WorkflowAborted( object sender, WorkflowEventArgs e)
        {
            LogStatus(e.WorkflowInstance.InstanceId,  " WorkflowAborted ");
            WorkflowInstanceWrapper wrapper
                = FindWorkflowInstance(e.WorkflowInstance.InstanceId);
             if (wrapper !=  null)
            {
                wrapper.StopWaiting();
            }
        }

         void runtime_WorkflowUnloaded( object sender, WorkflowEventArgs e)
        {
            LogStatus(e.WorkflowInstance.InstanceId,  " WorkflowUnloaded ");
        }

         private  void LogStatus(Guid instanceId, String msg)
        {
             if (MessageEvent !=  null)
            {
                String formattedMsg;
                 if (instanceId == Guid.Empty)
                {
                    formattedMsg = String.Format( " Runtime - {0} ", msg);
                }
                 else
                {
                    formattedMsg = String.Format( " {0} - {1} ", instanceId, msg);
                }
                 // raise the event
                MessageEvent( thisnew WorkflowLogEventArgs(formattedMsg));
            }
        }

    }   
     public  class WorkflowLogEventArgs : EventArgs
    {
         private String _msg = String.Empty;
         public WorkflowLogEventArgs(String msg)
        {
            _msg = msg;
        }

         public String Message
        {
             get {  return _msg; }
        }

1._workflows:一个key为Guid,value为WorkflowInstanceWrapper的字典。
2.SubscribeToEvent():给workflowRuntime订阅事件.
3.StartWorkflow():实现创建,开始工作流.
4.MessageEvent:对Message进行格式化。
5.WaitAll()用来挂起当前的线程直到所有的workflows完成,每个WorkflowInstanceWrapper有一个WaitHandle属性以便宿主程序能灵活控制。 

下面是测试代码:

 
 
using (WorkflowRuntimeManager manager
        =  new WorkflowRuntimeManager( new WorkflowRuntime( " WorkflowRuntime ")))
{
   manager.MessageEvent +=  delegate(Object sender, WorkflowLogEventArgs e)
   {
      Console.WriteLine(e.Message);
   };
   manager.WorkflowRuntime.StartRuntime();
   Dictionary<String, Object> wfArguments=  new Dictionary< stringobject>();
   wfArguments.Add( " InputString "" one ");
   WorkflowInstanceWrapper instance = manager.StartWorkflow(
            typeof(SharedWorkflows.Workflow1), wfArguments);
   instance.WorkflowInstance.Terminate( " Manually terminated ");
   instance.WaitHandle.WaitOne( 10000false);
    foreach (WorkflowInstanceWrapper wrapperin manager.Workflows.Values)
   {
        if (wrapper.OutputParameters.ContainsKey( " Result "))
       {
            Console.WriteLine(wrapper.OutputParameters[ " Result "]);
       }
        if (wrapper.Exception !=  null)
       {
     Console.WriteLine( " {0}-Exception:{1} ",wrapper.Id,wrapper.Exception.Message);
       }
        if (wrapper.ReasonSuspended.Length >  0)
       {
    Console.WriteLine( " {0}-Suspended: {1} ",wrapper.Id, wrapper.ReasonSuspended);
       }
}    manager.ClearAllWorkflows();























下面运行结果,从该结果可以清晰的看出工作流的执行过程:


1
 上一篇:坚持学习WF(5):自定义活动(CustomActivity) 
下一篇:坚持学习WF(7):流程控制(Flow Control)



本文转自生鱼片博客园博客,原文链接:http://www.cnblogs.com/carysun/archive/2008/04/21/1164609.html,如需转载请自行联系原作者

目录
相关文章
|
20小时前
|
弹性计算 关系型数据库 微服务
基于 Docker 与 Kubernetes(K3s)的微服务:阿里云生产环境扩容实践
在微服务架构中,如何实现“稳定扩容”与“成本可控”是企业面临的核心挑战。本文结合 Python FastAPI 微服务实战,详解如何基于阿里云基础设施,利用 Docker 封装服务、K3s 实现容器编排,构建生产级微服务架构。内容涵盖容器构建、集群部署、自动扩缩容、可观测性等关键环节,适配阿里云资源特性与服务生态,助力企业打造低成本、高可靠、易扩展的微服务解决方案。
1048 0
|
9天前
|
人工智能 运维 安全
|
20小时前
|
弹性计算 Kubernetes jenkins
如何在 ECS/EKS 集群中有效使用 Jenkins
本文探讨了如何将 Jenkins 与 AWS ECS 和 EKS 集群集成,以构建高效、灵活且具备自动扩缩容能力的 CI/CD 流水线,提升软件交付效率并优化资源成本。
238 0
|
7天前
|
人工智能 异构计算
敬请锁定《C位面对面》,洞察通用计算如何在AI时代持续赋能企业创新,助力业务发展!
敬请锁定《C位面对面》,洞察通用计算如何在AI时代持续赋能企业创新,助力业务发展!
|
8天前
|
人工智能 测试技术 API
智能体(AI Agent)搭建全攻略:从概念到实践的终极指南
在人工智能浪潮中,智能体(AI Agent)正成为变革性技术。它们具备自主决策、环境感知、任务执行等能力,广泛应用于日常任务与商业流程。本文详解智能体概念、架构及七步搭建指南,助你打造专属智能体,迎接智能自动化新时代。
|
8天前
|
机器学习/深度学习 人工智能 自然语言处理
B站开源IndexTTS2,用极致表现力颠覆听觉体验
在语音合成技术不断演进的背景下,早期版本的IndexTTS虽然在多场景应用中展现出良好的表现,但在情感表达的细腻度与时长控制的精准性方面仍存在提升空间。为了解决这些问题,并进一步推动零样本语音合成在实际场景中的落地能力,B站语音团队对模型架构与训练策略进行了深度优化,推出了全新一代语音合成模型——IndexTTS2 。
701 23