坚持学习WF(19):工作流线程

简介:

置顶]坚持学习WF文章索引

 

WF中提供了很多内置的服务,其中工作流计划服务是用来管理工作流实例线程的。默认情况下WF会自动使用DefaultWorkflowSchedulerService服务。使用该服务时在工作流运行时引擎上以异步方式运行工作流实例的线程。 等待运行的工作流存储在 DefaultWorkflowSchedulerService 的内部队列中.当 DefaultWorkflowSchedulerService 要启动工作流时,从 .NET Framework 线程池中获取一个线程并使用此线程运行工作流。 MaxSimultaneousWorkflows 属性确定调度程序服务同一时间允许的并发线程数。 例如,如果限值为 4,则 DefaultWorkflowSchedulerService 将从 .NET Framework 线程池中最多获取 4 个线程来执行工作流。 如果已经运行 4 个工作流,则其他工作项(工作流)将放入队列中,最终在线程可用时执行。

 

除了使用WF默认提供的,我们还可以手动去加载ManualWorkflowSchedulerService服务。它提供了一个线程服务,该服务允许创建工作流实例的宿主应用程序指定用于运行工作流实例的 Thread。 使用此线程服务,宿主应用程序可在单个 Thread 上运行一个工作流实例(即处于同步模式)。 此模式将阻止宿主应用程序的执行,直到工作流实例进入空闲状态。 随后,只能通过使用此服务的 RunWorkflow 方法执行工作流实例。

 

下面我们通过MSDN中的一个例子来说明工作流中的线程以及如何使用DefaultWorkflowSchedulerService和ManualWorkflowSchedulerService服务。

 

一:在该实例中首先开发一个自定义活动WaitForMessageActivity.cs,代码如下:

using System;
using System.ComponentModel;
using System.Threading;
using System.Workflow.Activities;
using System.Workflow.ComponentModel;
using System.Workflow.ComponentModel.Design;
using System.Workflow.Runtime;
namespace Microsoft.Samples.Workflow.WorkflowThreading
{
    [ToolboxItem(typeof(ActivityToolboxItem))]
    public partial class WaitForMessageActivity: Activity
   
{
        WorkflowQueue workflowQueue;

        public WaitForMessageActivity()
        {
            InitializeComponent();
        }

        protected override void Initialize(IServiceProvider provider)
        {
            ThreadMonitor.WriteToConsole(Thread.CurrentThread, "WaitForMessageActivity",
                                         "WaitForMessageActivity执行构造函数");
            WorkflowQueuingService queuingService = (WorkflowQueuingService)provider.
                                         GetService(typeof(WorkflowQueuingService));
            this.workflowQueue = queuingService.CreateWorkflowQueue(
                                         "WaitForMessageActivityQueue", false);
            this.workflowQueue.QueueItemAvailable += this.HandleExternalEvent;
        }

        private void HandleExternalEvent(Object sender, QueueEventArgs args)
        {
            ThreadMonitor.WriteToConsole(Thread.CurrentThread, "WaitForMessageActivity",
                                         "WaitForMessageActivity外部事件处理程序");
            object data = this.workflowQueue.Dequeue();
            ActivityExecutionContext context = sender as ActivityExecutionContext;
            context.CloseActivity();
        }

        protected override ActivityExecutionStatus Execute(ActivityExecutionContext context)
        {
            ThreadMonitor.WriteToConsole(Thread.CurrentThread, "WaitForMessageActivity",
                                         "WaitForMessageActivity等待外部事件处理程序");
            return ActivityExecutionStatus.Executing;
        }
    }
}

二:工作流设计如下图:
 
WFThread1 
 
DelayActivity对工作流的影响我们在前面的文章中有过说明,我们就不管这个了。工作流的代码如下:
 
using System;
using System.ComponentModel;
using System.Workflow.Activities;
using System.Threading;
namespace Microsoft.Samples.Workflow.WorkflowThreading
{
    public sealed partial class ThreadingWorkflow : SequentialWorkflowActivity
    {
        private string branchFlag;

        public ThreadingWorkflow()
        {
            InitializeComponent();
            ThreadMonitor.WriteToConsole(Thread.CurrentThread, "ThreadingWorkflow", 
"ThreadingWorkflow: 执行构造函数"); } public string BranchFlag { get { return this.branchFlag;} set { this.branchFlag = value;} } private void OnCodeActivity1ExecuteCode(object sender, EventArgs e) { ThreadMonitor.WriteToConsole(Thread.CurrentThread, "ThreadingWorkflow",
"CodeActivity1的事件处理程序"); } private void OnCodeActivity2ExecuteCode(object sender, EventArgs e) { ThreadMonitor.WriteToConsole(Thread.CurrentThread, "ThreadingWorkflow",
"CodeActivity2的事件处理程序"); } private void OnCodeActivity3ExecuteCode(object sender, EventArgs e) { ThreadMonitor.WriteToConsole(Thread.CurrentThread, "ThreadingWorkflow",
"CodeActivity3的事件处理程序"); } private void IfElseBranchActivityCodeCondition(object sender, ConditionalEventArgs e) { e.Result = !this.BranchFlag.Equals("Delay", StringComparison.OrdinalIgnoreCase); } } }
三:宿主程序代码如下:
 
using System;
using System.Collections.Generic;
using System.Threading;
using System.Workflow.Runtime;
using System.Workflow.Runtime.Hosting;
using System.Drawing;
namespace Microsoft.Samples.Workflow.WorkflowThreading
{
    class Program
    {
        static AutoResetEvent waitHandle = new AutoResetEvent(false);
        static AutoResetEvent readyHandle = new AutoResetEvent(false);
        static WorkflowInstance workflowInstance;
        static WorkflowRuntime workflowRuntime;

        static void Main(string[] args)
        {
            if (args.Length < 2)
            {
                Console.WriteLine("使用该形式命令行WorkflowThreading.exe [Single | Multi] 
[Delay | WaitForMessage]"
); return; } if (!args[0].Equals("Single", StringComparison.OrdinalIgnoreCase) &&
!args[0].Equals("Multi", StringComparison.OrdinalIgnoreCase)) { Console.WriteLine("指定Single 或者Multi 作为第一个命令行参数"); return; } if (!args[1].Equals("Delay", StringComparison.OrdinalIgnoreCase) &&
!args[1].Equals("WaitForMessage", StringComparison.OrdinalIgnoreCase)) { Console.WriteLine("指定Delay 或者WaitForMessage 作为第二个命令行参数"); return; } ThreadMonitor.Enlist(Thread.CurrentThread, "主机"); Console.ForegroundColor = ConsoleColor.White; using (workflowRuntime = new WorkflowRuntime()) { ManualWorkflowSchedulerService scheduler = null; if (args[0].ToString().Equals("Single", StringComparison.OrdinalIgnoreCase)) { scheduler = new ManualWorkflowSchedulerService(); workflowRuntime.AddService(scheduler); } workflowRuntime.StartRuntime(); workflowRuntime.WorkflowCompleted += OnWorkflowCompleted; workflowRuntime.WorkflowTerminated += OnWorkflowTerminated; workflowRuntime.WorkflowIdled += OnWorkflowIdled; workflowRuntime.WorkflowCreated += OnWorkflowCreated; Type type = typeof(ThreadingWorkflow); Dictionary<string, object> workflowParameters = new Dictionary<string, object>(); workflowParameters.Add("BranchFlag", args[1]); Console.WriteLine("\n--- 开始工作流之前---\n"); workflowInstance = workflowRuntime.CreateWorkflow(type, workflowParameters); workflowInstance.Start(); Console.WriteLine("\n--- 开始工作流之后---\n"); if (scheduler != null) scheduler.RunWorkflow(workflowInstance.InstanceId); readyHandle.WaitOne(); if (args[1].Equals("WaitForMessage", StringComparison.OrdinalIgnoreCase)) { workflowInstance.EnqueueItem("WaitForMessageActivityQueue", "Hello",
null, null); } if (scheduler != null) scheduler.RunWorkflow(workflowInstance.InstanceId); waitHandle.WaitOne(); workflowRuntime.StopRuntime(); } } static void OnWorkflowCreated(object sender, WorkflowEventArgs e) { ThreadMonitor.WriteToConsole(Thread.CurrentThread, "Host",
"Host: Processed WorkflowCreated Event"); } static void OnWorkflowIdled(object sender, WorkflowEventArgs e) { if (workflowRuntime.GetService<ManualWorkflowSchedulerService>() != null) SetReloadWorkflowTimer(); else readyHandle.Set(); ThreadMonitor.WriteToConsole(Thread.CurrentThread, "Host",
"Host: Processed WorkflowIdle Event"); Console.WriteLine("\n--- 工作流空闲---\n"); } static void SetReloadWorkflowTimer() { DateTime reloadTime = workflowInstance.GetWorkflowNextTimerExpiration(); if (reloadTime == DateTime.MaxValue) { readyHandle.Set(); } else { TimeSpan timeDifference = reloadTime - DateTime.UtcNow + new TimeSpan(0, 0, 0, 0, 1); Timer timer = new System.Threading.Timer( new TimerCallback(ReloadWorkflow), null,timeDifference < TimeSpan.Zero ? TimeSpan.Zero : timeDifference, new TimeSpan(-1)); } } static void ReloadWorkflow(object state) { if (workflowInstance.GetWorkflowNextTimerExpiration() > DateTime.UtcNow) SetReloadWorkflowTimer(); else readyHandle.Set(); } static void OnWorkflowCompleted(object sender, WorkflowCompletedEventArgs e) { ThreadMonitor.WriteToConsole(Thread.CurrentThread, "Host",
"Host: Processed WorkflowCompleted Event"); waitHandle.Set(); Console.WriteLine("\n--- 工作流完成---\n"); } static void OnWorkflowTerminated(object sender, WorkflowTerminatedEventArgs e) { Console.WriteLine(e.Exception.Message); waitHandle.Set(); } } }
 
在宿主程序我们根据命令行参数的不同来决定是否加载ManualWorkflowSchedulerService ,如果加载了该服务我们
调用该类的RunWorkflow方法来运行工作流实例。
 
四:ThreadMonitor类的用途是用不同的颜色为每个线程的输出着色。代码如下:
 
using System;
using System.Threading;
using System.Collections.Generic;
using System.Text;
namespace Microsoft.Samples.Workflow.WorkflowThreading
{
    public static class ThreadMonitor
    {
        private static int threadCount;
        private static Dictionary<string, ConsoleColor> threadList = new 
Dictionary<string, ConsoleColor>(); public static void Enlist(Thread thread, string instanceName) { if ((thread.Name == null) || (thread.Name.Length == 0)) { thread.Name = string.Format("{0} 线程[{1}]", instanceName, threadCount++); if (!threadList.ContainsKey(thread.Name)) threadList.Add(thread.Name, GetConsoleColor()); } } public static void WriteToConsole(Thread thread, string instanceName, string message) { Enlist(thread, instanceName); WriteToConsole(thread, message); } public static void WriteToConsole(Thread thread, string message) { if (threadList.ContainsKey(thread.Name)) Console.ForegroundColor = threadList[thread.Name]; Console.WriteLine("{0} --> {1}", thread.Name, message); } private static ConsoleColor GetConsoleColor() { if ((int)Console.ForegroundColor < 9) return ConsoleColor.White; else return (ConsoleColor)(int)Console.ForegroundColor - 1; } } }
五:下图是执行的结果对比,我们可以清晰的看到线程的变化情况。
 
WFThread2 

本文转自生鱼片博客园博客,原文链接:http://www.cnblogs.com/carysun/archive/2008/09/07/WFThread.html,如需转载请自行联系原作者
相关文章
|
4月前
|
监控 Java 调度
【Java学习】多线程&JUC万字超详解
本文详细介绍了多线程的概念和三种实现方式,还有一些常见的成员方法,CPU的调动方式,多线程的生命周期,还有线程安全问题,锁和死锁的概念,以及等待唤醒机制,阻塞队列,多线程的六种状态,线程池等
220 6
|
7月前
|
NoSQL Redis
Redis系列学习文章分享---第五篇(Redis实战篇--优惠券秒杀,全局唯一id 添加优惠券 实现秒杀下单 库存超卖问题分析 乐观锁解决超卖 实现一人一单功能 集群下的线程并发安全问题)
Redis系列学习文章分享---第五篇(Redis实战篇--优惠券秒杀,全局唯一id 添加优惠券 实现秒杀下单 库存超卖问题分析 乐观锁解决超卖 实现一人一单功能 集群下的线程并发安全问题)
146 0
|
7月前
|
调度 Python
Python多线程学习优质方法分享
Python多线程学习优质方法分享
27 0
|
7月前
|
安全 API C++
逆向学习Windows篇:C++中多线程的使用和回调函数的实现
逆向学习Windows篇:C++中多线程的使用和回调函数的实现
230 0
|
8月前
|
Java 调度
【JAVA学习之路 | 提高篇】进程与线程(Thread)
【JAVA学习之路 | 提高篇】进程与线程(Thread)
|
8月前
|
安全 Java
java-多线程学习记录
java-多线程学习记录
|
7月前
|
Java
Java线程学习经典例子-读写者演示
Java线程学习经典例子-读写者演示
27 0
|
8月前
|
Java 调度
【JAVA学习之路 | 提高篇】线程的通信
【JAVA学习之路 | 提高篇】线程的通信
|
8月前
|
存储 Java
【JAVA学习之路 | 提高篇】线程安全问题及解决
【JAVA学习之路 | 提高篇】线程安全问题及解决
|
8月前
|
Java
【JAVA学习之路 | 提高篇】创建与启动线程之二(继承Thread类)(实现Runnable接口)
【JAVA学习之路 | 提高篇】创建与启动线程之二(继承Thread类)(实现Runnable接口)