坚持学习WF(11):工作流通信与队列

简介:

[置顶]坚持学习WF文章索引 

WF 提供的通信模型是构建于队列系统的基础之上,我们可以使用自定义活动来注册以接收关于队列的消息,而宿主应用程序中的服务则发送关于队列的消息。自定义活动可以使用此模型来处理外部事件,也可以传递异步活动执行的完成。这样,您的活动可以先执行到某一点,然后等待激发因素的到来以便继续执行。下图描述了宿主应用程序中的代码与工作流中的代码(或活动)之间的通信模型。

CouQueue

下面这张图是WF类库中和队列相关的三个类:

QueueClass

为了使自定义活动能够侦听消息是否到达某个队列,我们通常有以下步骤:

1.使用WorkflowQueuingService 创建工作流队列,该类还提供了创建、查找或删除工作流队列所需的方法。一般我们会在自定义活动的 Initialize 或 Execute 方法中来实现。

2.自定义活动必须注册才能接收到这些通知,方法是在工作流队列自身中注册 QueueItemAvailable 事件。您可以使用 RegisterForQueueItemAvailable 方法为 QueueItemAvailable 事件注册一个订户。QueueItemAvailable事件用于通知订户项已经传送(以异步方式)至此 WorkflowQueue。在确保队列存在并注册事件后,当队列中有可用项目时,您的活动会得到通知,之后,您可以从队列中取出该项目并对其进行处理。

3.我们自定义活动要能够充当事件接收器的活动(如 HandleExternalEvent 活动),您还需要实现 IEventActivity 接口。如果您的活动要侦听事件,则此接口用于定义该活动的主要职责:


 
 
public  interface IEventActivity 
{
    void Subscribe(ActivityExecutionContext parentContext, IActivityEventListener<QueueEventArgs> parentEventHandler);
    void Unsubscribe(ActivityExecutionContext parentContext, IActivityEventListener<QueueEventArgs> parentEventHandler);
    IComparable QueueName get; } 
}

WF中所有的通信的活动都实现了这个接口。

QueueName 属性必须返回 IComparable 值,消息加入队列时,它可以唯一地标识您的活动。对于用于将消息加入队列以通知工作流运行时的代码,也需要使用这同一个队列名。

通过此接口,能够命令活动在其执行前订阅事件并让活动知道何时取消订阅。在订阅和取消订阅方法中,该活动负责确保使用 QueueName 来创建队列并在处理结束时删除队列。此外,这也为您的活动能够向任何本地服务注册信息提供了机会,这些本地服务将代表活动来执行逻辑并通过将消息加入队列予以响应。

本地服务是您定义并从主机添加到工作流运行时的一个类,它可以被您的宿主代码、工作流或您的活动所利用。只要宿主应用程序处于运行状态,本地服务就能够维护事件处理程序或其他侦听程序,从而可通过将消息加入队列来确保相应的数据到达工作流。您传递给本地服务的信息应包括队列名和工作流实例 ID 的相关信息,以及该服务发起工作或向您的活动返回结果时所需的任何信息。

1.下面我们先来实现一个这样的自定义活动,利用该活动得到对列中的信息,然后在将该信息发送给宿主程序代码如下:

 

RequestResponseData.cs

1.1.OutputValues和InputValues两个依赖属性,代表输入和输出参数。
1.2.重写了Execute方法,在这之中我们完成了创建队列,注册事件等操作。
1.3.ProcessMessage在该方法中我们得到队列的消息并输出,注意这个中我们使用的是Peek方法。
1.4.重写了OnSequenceComplete方法,并将队列中的信息保存在MessageHelper中。

2.一个辅助类MessageHelper.cs

MessageHelper .cs

 

3.然后我们新建一个顺序工作流,添加我们自定义的活动,在自定义活动中拖一个CodeActivity,实现工作流 如下图:

QueueFlow

完整代码如下:

 

Workflow1.cs


4.实现宿主程序,代码如下

 

Program


在宿主程序中我们将inputs 参数加入工作流实例队列中,我们使用的是WrokflowInstance的EnqueueItemOnIdle方法,该类还包含另一个方法EnqueueItem,如下:

public  void EnqueueItem(IComparable queueName,  object item, 
    IPendingWork pendingWork,  object workItem); 

public  void EnqueueItemOnIdle(IComparable queueName,  object item, 
    IPendingWork pendingWork,  object workItem); 

两个方法都使用相同的参数,包括要排队的队列名称和对象。(我会简要介绍其他两个参数。)此时,实例 ID 和队列名称是如此重要的原因就显而易见了。为了获得 WorkflowInstance 对象,您需要实例 ID,而为了将数据排入该实例的队列,您需要队列名称。两种方法之间的差异在于它们如何将数据传送到队列。

EnqueueItem 方法试图将数据立即传送到队列,并假定工作流已到达已创建队列并在等待数据的那个点。在某些情况下,这正好是您希望发生的情况。在另一些情况下,这类行为可能会导致出现一种争用状况,即您的工作流尚未到达该活动可以创建您感兴趣的那个队列的点。您可以通过调用 GetWorkflowQueueData 并在队列就位之前一直检查来检查这种情况,但这非常麻烦。

EnqueueItemOnIdle 方法旨在解决这些争用状况。当您使用此方法将数据排入队列时,运行时在该工作流变成空闲前,不会尝试将数据传送到队列中(当目前执行的所有活动都在等待某种输入时)。这可以让宿主应用程序更加确定,工作流已为正在提交的数据做好准备,因为更有可能创建队列的活动已经开始执行,并且已真正创建了该队列。

在两个方法的第三个参数为IPendingWork ,该接口标识可添加到工作流的工作批并参与事务的类。workItem 参数是在提交或完成事务时,通过事务,您可以在消息已成功传送到队列且工作流已提交当前事务时获得通知。这是发挥这两个额外参数 EnqueueItem 和 EnqueueItemOnIdle 的作用的地方。该类在实现 IPendingWork 过程中可以使用的状态或数据。EnqueuePendingWorkItem是一个简单的类,它实现了 IPendingWork 接口,并可用来在数据被传送到队列时获得通知。代码如下:

EnqueuePendingWorkItem .cs

5.工作流执行的结果如下:

Request:inputvalueone 
Request:inputvaluetwo 
Commit called on pending work item 
Outcome of the submission: Success 
Response:inputvalueone 
Response:inputvaluetwo 
请按任意键继续. . .



本文转自生鱼片博客园博客,原文链接:http://www.cnblogs.com/carysun/archive/2008/06/07/WFQueue.html,如需转载请自行联系原作者

目录
相关文章
|
弹性计算 负载均衡 关系型数据库
如何提高业务系统的稳定性
【6月更文挑战第21天】如何提高业务系统的稳定性
基于GA-PSO遗传粒子群混合优化算法的CVRP问题求解matlab仿真
本文介绍了一种基于GA-PSO混合优化算法求解带容量限制的车辆路径问题(CVRP)的方法。在MATLAB2022a环境下运行,通过遗传算法的全局搜索与粒子群算法的局部优化能力互补,高效寻找最优解。程序采用自然数编码策略,通过选择、交叉、变异操作及粒子速度和位置更新,不断迭代直至满足终止条件,旨在最小化总行驶距离的同时满足客户需求和车辆载重限制。
|
jenkins 持续交付 开发者
利用Docker容器化部署应用的实战指南
【6月更文挑战第27天】本文详述Docker应用部署,涵盖Docker基本概念、安装、镜像制作及运行。通过编写Dockerfile构建镜像,使用`docker build`、`run`、`push`及`stop`命令管理。集成CI/CD工具如Jenkins,实现自动化构建、测试和部署,提升开发效率与部署质量。Docker助力轻量级、可移植的微服务架构。
|
机器学习/深度学习 人工智能 分布式计算
人工智能平台PAI问题之loss为负数如何解决
人工智能平台PAI是指阿里云提供的机器学习平台服务,支持建模、训练和部署机器学习模型;本合集将介绍机器学习PAI的功能和操作流程,以及在使用过程中遇到的问题和解决方案。
329 1
|
存储 JSON 监控
【Go】基于 Gin 从0到1搭建 Web 管理后台系统后端服务(一)项目初始化、配置和日志(上)
【Go】基于 Gin 从0到1搭建 Web 管理后台系统后端服务(一)项目初始化、配置和日志(上)
|
机器学习/深度学习 存储 文字识别
基于ERNIELayout&pdfplumber-UIE的多方案学术论文信息抽取
基于ERNIELayout&pdfplumber-UIE的多方案学术论文信息抽取,小样本能力强悍,OCR、版面分析、信息抽取一应俱全。
|
应用服务中间件 Linux nginx
Linux下启动Nginx时报错:nginx: [emerg] bind() to 0.0.0.0:80 failed (98: Address already in use)
Linux下启动Nginx时报错:nginx: [emerg] bind() to 0.0.0.0:80 failed (98: Address already in use)
2209 0
Linux下启动Nginx时报错:nginx: [emerg] bind() to 0.0.0.0:80 failed (98: Address already in use)
|
JavaScript
我熬夜开发了一款简约实用、支持多平台的Markdown在线编辑器(开源)
我熬夜开发了一款简约实用、支持多平台的Markdown在线编辑器(开源)
我熬夜开发了一款简约实用、支持多平台的Markdown在线编辑器(开源)
|
前端开发 Java 关系型数据库
你们要的测试练习网站来了
搭建测试环境首选的有tpshop、shopxo、iwebshop这类php开发的电商网站,虽然部署方便,但是却跟企业实际的架构相差太远,不利于我们更好的了解和学习软件测试。
你们要的测试练习网站来了
|
存储 Dart Shell
探索Flutter中线程模型/消息循环的底层逻辑
多线程模型以及线程中的事件循环机制在 OS 里都是必不可少的一部分,也扮演着非常重要的角色,主要用来做异步任务的分发与调度。例如浏览器 JSEngine 中的单线程事件循环机制,那么 Flutter 中的线程模型与事件循环是如何实现的呢?
探索Flutter中线程模型/消息循环的底层逻辑