开源工作流引擎 Workflow Core 的研究和使用教程

简介: 开源工作流引擎 Workflow Core 的研究和使用教程[TOC]一,工作流对象和使用前说明为了避免歧义,事先约定。工作流有很多节点组成,一个节点成为步骤点(Step)。1,IWorkflow / IWorkflowBuilderWorkflow Core 中,用于构建工作流的类继承 IWorkflow,代表一条有任务规则的工作流,可以表示工作流任务的开始或者 Do() 方法,或工作流分支获取其它方法。

开源工作流引擎 Workflow Core 的研究和使用教程

[TOC]

一,工作流对象和使用前说明

为了避免歧义,事先约定。

工作流有很多节点组成,一个节点成为步骤点(Step)。

1,IWorkflow / IWorkflowBuilder

Workflow Core 中,用于构建工作流的类继承 IWorkflow,代表一条有任务规则的工作流,可以表示工作流任务的开始或者 Do() 方法,或工作流分支获取其它方法。

IWorkflow 有两个同名接口:

    public interface IWorkflow<TData>
        where TData : new()
    {
        string Id { get; }
        int Version { get; }
        void Build(IWorkflowBuilder<TData> builder);
    }

    public interface IWorkflow : IWorkflow<object>
    {
    }

Id:此工作流的唯一标识符;

Version:此工作流的版本。

void Build:在此方法内构建工作流。

工作流运作过程中,可以传递数据。有两种传递方法:使用泛型,从运行工作流时就要传入;使用 object 简单类型,由单独的步骤产生并且传递给下一个节点。

IWorkflowBuilder 是工作流对象,构建一个具有逻辑规则的工作流。可以构建复杂的、具有循环、判断的工作流规则,或者并行或者异步处理工作流任务。

一个简单的工作流规则:

    public class DeferSampleWorkflow : IWorkflow
    {
        public string Id => "DeferSampleWorkflow";
            
        public int Version => 1;
            
        public void Build(IWorkflowBuilder<object> builder)
        {
            builder
                .StartWith(context =>
                {
                    // 开始工作流任务
                    Console.WriteLine("Workflow started");
                    return ExecutionResult.Next();
                })
                .Then<SleepStep>()
                    .Input(step => step.Period, data => TimeSpan.FromSeconds(20))
                .Then(context =>
                {
                    Console.WriteLine("workflow complete");
                    return ExecutionResult.Next();
                });
        }
    }

2,EndWorkflow

此对象表示当前工作流任务已经结束,可以表示主工作流或者工作流分支任务的完成。

        /// Ends the workflow and marks it as complete
        IStepBuilder<TData, TStepBody> EndWorkflow();

因为工作流是可以出现分支的,每个工作流各自独立工作,每个分支都有其生命周期。

3,容器

ForEachWhileIfWhenScheduleRecur步骤容器。都返回IContainerStepBuilder<TData, Schedule, TStepBody>

Parallel、Saga是步骤的容器,都返回 IStepBuilder<TData, Sequence>

ForEach、While、If、When、Schedule、Recur 返回类型的接口:

    public interface IContainerStepBuilder<TData, TStepBody, TReturnStep>
        where TStepBody : IStepBody
        where TReturnStep : IStepBody
    {
        /// The block of steps to execute
        IStepBuilder<TData, TReturnStep> Do(Action<IWorkflowBuilder<TData>> builder);

Parallel、Saga :

        /// Execute multiple blocks of steps in parallel
        IParallelStepBuilder<TData, Sequence> Parallel();

        /// Execute a sequence of steps in a container
        IStepBuilder<TData, Sequence> Saga(Action<IWorkflowBuilder<TData>> builder);

也就是说,ForEach、While、If、When、Schedule、Recur 是真正的容器。

按照我的理解,继承了 IContainerStepBuilder的,是一个容器,一个流程下的一个步骤/容器;因为 Workflow Core 作者对接口的命名很明显表达了 This a container

因为里面包含了一组操作,可以说是一个步骤里面包含了一个流程,这个流程由一系列操作组成,它是线性的,是顺序的里面是一条工作流(Workflow)。

而 Parllel、Saga,相当于步骤点的容器。

更直观的理解是电路,继承 IContainerStepBuilder 的是串联设备的容器,是顺序的;

Parllel 是并联电路/设备的一个容器,它既是一个开关,使得一条电路变成多条并流的电路,又包含了这些电路的电器。里面可以产生多条工作流,是多分支的、不同步的、独立的。

1

从实现接口上看,ForEach、While、If、When、Schedule、Recur、Parllel 都实现了 Do() 方法,而 Saga 没有实现。

关于 Saga,后面说明。

4,工作流的步骤点

实现接口如下:

IStepBuilder<TData, TStep> StartWith<TStep>(Action<IStepBuilder<TData, TStep>> stepSetup = null) where TStep : IStepBody;

        IStepBuilder<TData, InlineStepBody> StartWith(Func<IStepExecutionContext, ExecutionResult> body);

        IStepBuilder<TData, ActionStepBody> StartWith(Action<IStepExecutionContext> body);

        IEnumerable<WorkflowStep> GetUpstreamSteps(int id);

        IWorkflowBuilder<TData> UseDefaultErrorBehavior(WorkflowErrorHandling behavior, TimeSpan? retryInterval = null);
方法名称 说明
StartWith 任务的开始,必须调用此方法
GetUpstreamSteps 获取上一个步骤(StepBody)的ID
UseDefaultErrorBehavior 不详

StepBody 是一个节点,IStepBuilder 构建一个节点,只有通过 StartWith,才能开始一个工作流、一个分支、异步任务等。

UseDefaultErrorBehavior笔者没有使用到,不敢瞎说。貌似与事务有关,当一个步骤点发生异常时,可以终止、重试等。

二,IStepBuilder 节点

IStepBuilder 表示一个节点,或者说一个容器,里面可以含有其它操作,例如并行、异步、循环等。

1,设置属性的方法

Name:设置此步骤点的名称;
id:步骤点的唯一标识符。

        /// Specifies a display name for the step
        IStepBuilder<TData, TStepBody> Name(string name);

        /// Specifies a custom Id to reference this step
        IStepBuilder<TData, TStepBody> Id(string id);

2,设置数据

前面说到,工作流每个步骤点传递数据有两种方式。

TData(泛型) 是工作流中,随着流传的数据,这个对象会在整个工作流程生存。

例如 Mydata

 class RecurSampleWorkflow : IWorkflow<MyData>
    {
        public string Id => "recur-sample";
        public int Version => 1;

        public void Build(IWorkflowBuilder<MyData> builder)
        {
        ...
        }
    }
 public class MyData
    {
        public int Counter { get; set; }
    }

3,Input / Output

为当前步骤点(StepBody)设置数据,亦可为 TData 设置数据。

两类数据:每个步骤点都可以拥有很多字段、属性和方法等;工作流流转 TData。

Input、Output 是设置这些数据的具体方法。

        IStepBuilder<TData, TStepBody> Input<TInput>(Expression<Func<TStepBody, TInput>> stepProperty, Expression<Func<TData, TInput>> value);

        IStepBuilder<TData, TStepBody> Input<TInput>(Expression<Func<TStepBody, TInput>> stepProperty, Expression<Func<TData, IStepExecutionContext, TInput>> value);

        IStepBuilder<TData, TStepBody> Input(Action<TStepBody, TData> action);

        IStepBuilder<TData, TStepBody> Output<TOutput>(Expression<Func<TData, TOutput>> dataProperty, Expression<Func<TStepBody, object>> value);

三,工作流节点的逻辑和操作

容器操作

1,Saga

用于在容器中执行一系列操作。

    /// Execute a sequence of steps in a container
    IStepBuilder<TData, Sequence> Saga(Action<IWorkflowBuilder<TData>> builder);

虽然注释说明 “用于在容器中执行一系列操作”,但实际上它不是一个真正的”容器“。

因为它没有继承 IContainerStepBuilder,也没有实现 Do()

但是它返回的 Sequence 实现了ContainerStepBody

如果说真正的容器相当于一条长河流中的一个湖泊(可以容纳和储水),而 Saga 可能只是某一段河流的命名,而不是具体的湖泊。

或者说 static void Main(string[] args)里面的代码太多了,新建一个方法体,把部分代码放进去。总不能把所有代码写在一个方法里吧?那么创建一个类,把代码分成多个部分,放到不同方法中,增强可读性。本质还是没有变。

Saga 可以用来处理事务,进行重试或回滚等操作。后面说明。

普通节点

1,Then

用于创建下一个节点,创建一个普通节点。可以是主工作流的节点(最外层)、或者作为循环、条件节点里的节点、作为节点中节点的节点。

 IStepBuilder<TData, TStep> Then<TStep>(Action<IStepBuilder<TData, TStep>> stepSetup = null) where TStep : IStepBody;

        IStepBuilder<TData, TStep> Then<TStep>(IStepBuilder<TData, TStep> newStep) where TStep : IStepBody;

        IStepBuilder<TData, InlineStepBody> Then(Func<IStepExecutionContext, ExecutionResult> body);

        IStepBuilder<TData, ActionStepBody> Then(Action<IStepExecutionContext> body);

2,Attach

Then 作为普通节点,按顺序执行。操作对象是类型、StepBody。

Attach 也是普通节点,无特殊意义,通过 id 来指定要执行 StepBody 。可以作为流程控制的跳转。

相当于 goto 语句。

        /// Specify the next step in the workflow by Id
        IStepBuilder<TData, TStepBody> Attach(string id);

事件

1,WaitFor

用于定义事件,将当前节点作为事件节点,然后在后台挂起,工作流会接着执行下一个节点。在工作流停止前,可以通过指定 标识符(Id) 触发事件。在一个工作流中,每个事件的标识符都是唯一的。

        IStepBuilder<TData, WaitFor> WaitFor(string eventName, Expression<Func<TData, string>> eventKey, Expression<Func<TData, DateTime>> effectiveDate = null, Expression<Func<TData, bool>> cancelCondition = null);


        IStepBuilder<TData, WaitFor> WaitFor(string eventName, Expression<Func<TData, IStepExecutionContext, string>> eventKey, Expression<Func<TData, DateTime>> effectiveDate = null, Expression<Func<TData, bool>> cancelCondition = null);

条件体和循环体

1,End

意思应该是结束一个节点的运行。

如果在 When 中使用,相当于 break。

        IStepBuilder<TData, TStep> End<TStep>(string name) where TStep : IStepBody;

使用例子

            builder
                .StartWith<RandomOutput>(x => x.Name("Random Step"))
                    .When(0)
                        .Then<TaskA>()
                        .Then<TaskB>()                        
                        .End<RandomOutput>("Random Step")
                    .When(1)
                        .Then<TaskC>()
                        .Then<TaskD>()
                        .End<RandomOutput>("Random Step");

2,CancelCondition

在一个条件下过早地取消此步骤的执行。

应该相当于 contiune。

        /// Prematurely cancel the execution of this step on a condition
        IStepBuilder<TData, TStepBody> CancelCondition(Expression<Func<TData, bool>> cancelCondition, bool proceedAfterCancel = false);

节点的异步或多线程

1,Delay

延迟执行,使得当前节点延时执行。并非是阻塞当前的工作流运行。Delay 跟在节点后面,使得这个节点延时运行。可以理解成异步,工作流不会等待此节点执行完毕,会直接执行下一个节点/步骤。

        /// Wait for a specified period
        IStepBuilder<TData, Delay> Delay(Expression<Func<TData, TimeSpan>> period);

2,Schedule

预定执行。将当前节点设置一个时间,将在一段时间后执行。Schedule 不会阻塞工作流。

Schedule 是非阻塞的,工作流不会等待Schedule执行完毕,会直接执行下一个节点/步骤。

        /// Schedule a block of steps to execute in parallel sometime in the future
        IContainerStepBuilder<TData, Schedule, TStepBody> Schedule(Expression<Func<TData, TimeSpan>> time);

例子

            builder
                .StartWith(context => Console.WriteLine("Hello"))
                .Schedule(data => TimeSpan.FromSeconds(5)).Do(schedule => schedule
                    .StartWith(context => Console.WriteLine("Doing scheduled tasks"))
                )
                .Then(context => Console.WriteLine("Doing normal tasks"));

3,Recur

用于重复执行某个节点,直至条件不符。

Recur 是非阻塞的,工作流不会等待 Rezur 执行完毕,会直接执行下一个节点/步骤。

        /// Schedule a block of steps to execute in parallel sometime in the future at a recurring interval
        IContainerStepBuilder<TData, Recur, TStepBody> Recur(Expression<Func<TData, TimeSpan>> interval, Expression<Func<TData, bool>> until);

用于事务的操作

相当于数据库中的事务,流程中某些步骤发生异常时的时候执行某些操作。

例如:

        builder
            .StartWith(context => Console.WriteLine("Begin"))
            .Saga(saga => saga
                .StartWith<Task1>()
                    .CompensateWith<UndoTask1>()
                .Then<Task2>()
                    .CompensateWith<UndoTask2>()
                .Then<Task3>()
                    .CompensateWith<UndoTask3>()
            )
                .OnError(Models.WorkflowErrorHandling.Retry, TimeSpan.FromSeconds(5))
            .Then(context => Console.WriteLine("End"));

1,CompensateWith

如果此步骤引发未处理的异常,则撤消步骤;如果发生异常,则执行。

可以作为节点的 B计划。当节点执行任务没有问题时, CompensateWith 不会运行;如果节点发生错误,就会按一定要求执行 CompensateWith 。

        /// Undo step if unhandled exception is thrown by this step
        IStepBuilder<TData, TStepBody> CompensateWith<TStep>(Action<IStepBuilder<TData, TStep>> stepSetup = null) where TStep : IStepBody;

        IStepBuilder<TData, TStepBody> CompensateWith(Func<IStepExecutionContext, ExecutionResult> body);

        IStepBuilder<TData, TStepBody> CompensateWith(Action<IStepExecutionContext> body);

2,CompensateWithSequence

如果此步骤引发未处理的异常,则撤消步骤;如果发生异常,则执行。与 CompensateWith 的区别是,传入参数前者是 Func,后者是 Action。

CompensateWith 的内部实现了 CompensateWith,是对 CompensateWith 的封装。

        /// Undo step if unhandled exception is thrown by this step
        IStepBuilder<TData, TStepBody> CompensateWithSequence(Action<IWorkflowBuilder<TData>> builder);

3,OnError

用于事务操作,表示发生错误时如果回滚、设置时间等。一般与 Saga 一起使用。

OnError 是阻塞的。

        /// Configure the behavior when this step throws an unhandled exception
        IStepBuilder<TData, TStepBody> OnError(WorkflowErrorHandling behavior, TimeSpan? retryInterval = null);

OnError 可以捕获一个容器内,某个节点的异常,并执行回滚操作。如果直接在节点上使用而不是容器,可以发生回滚,然后执行下个节点。如果作用于容器,那么可以让容器进行重新运行,等一系列操作。

OnError 可以与 When、While 等节点容器一起使用,但他们本身带有循环功能,使用事务会让代码逻辑变得奇怪。

Saga 没有条件判断、没有循环,本身就是一个简单的袋子,是节点的容器。因此使用 Saga 作为事务操作的容器,十分适合,进行回滚、重试等一系列操作。

四,条件或开关

迭代

1,ForEach

迭代,也可以说是循环。内部使用 IEnumerable 来实现。

与 C# 中 Foreach 的区别是,C# 中是用来迭代数据;

而工作流中 ForEach 用来判断元素个数,标识应该循环多少次。

ForEach 是阻塞的。

        /// Execute a block of steps, once for each item in a collection in a parallel foreach
        IContainerStepBuilder<TData, Foreach, Foreach> ForEach(Expression<Func<TData, IEnumerable>> collection);

示例

            builder
                .StartWith<SayHello>()
                .ForEach(data => new List<int>() { 1, 2, 3, 4 })
                    .Do(x => x
                        .StartWith<DisplayContext>()
                            .Input(step => step.Item, (data, context) => context.Item)
                        .Then<DoSomething>())
                .Then<SayGoodbye>();

最终会循环5次。

条件判断

1,When

条件判断,条件是否真。

When 是阻塞的。

When 可以捕获上一个节点流转的数据(非 TData)。

        /// Configure an outcome for this step, then wire it to another step
        [Obsolete]
        IStepOutcomeBuilder<TData> When(object outcomeValue, string label = null);
        
        
        /// Configure an outcome for this step, then wire it to a sequence
        IContainerStepBuilder<TData, When, OutcomeSwitch> When(Expression<Func<TData, object>> outcomeValue, string label = null);

前一个方法例如

When(0),会捕获 return ExecutionResult.Outcome(value); 的值,判断是否相等。但是这种方式已经过时。

需要使用表达式来判断。例如

.When(data => 1)
.When(data => data.value==1)

2,While

条件判断,条件是否真。与When有区别,When可以捕获 ExecutionResult.Outcome(value);

While 是阻塞的。

        /// Repeat a block of steps until a condition becomes true
        IContainerStepBuilder<TData, While, While> While(Expression<Func<TData, bool>> condition);

示例

            builder
                .StartWith<SayHello>()
                .While(data => data.Counter < 3)
                    .Do(x => x
                        .StartWith<DoSomething>()
                        .Then<IncrementStep>()
                            .Input(step => step.Value1, data => data.Counter)
                            .Output(data => data.Counter, step => step.Value2))
                .Then<SayGoodbye>();

3,If

条件判断,是否符合条件。

If是阻塞的。

        /// Execute a block of steps if a condition is true
        IContainerStepBuilder<TData, If, If> If(Expression<Func<TData, bool>> condition);

When、While、If的区别是,When、While 是条件是否为真,If是表达式是否为真。

实质上,是语言上的区别,与代码逻辑无关。

真假用 When/While,条件判断、表达式判断用 If 。

节点并发

1,Parallel

并行任务。作为容器,可以在里面设置多组任务,这些任务将会同时、并发运行。

Parallel 是阻塞的。

        /// Execute multiple blocks of steps in parallel
        IParallelStepBuilder<TData, Sequence> Parallel();

示例:

                .StartWith<SayHello>()
                .Parallel()
                    .Do(then => 
                        then.StartWith<PrintMessage>()
                                .Input(step => step.Message, data => "Item 1.1")
                            .Then<PrintMessage>()
                                .Input(step => step.Message, data => "Item 1.2"))
                    .Do(then =>
                        then.StartWith<PrintMessage>()
                                .Input(step => step.Message, data => "Item 2.1")
                            .Then<PrintMessage>()
                                .Input(step => step.Message, data => "Item 2.2")
                            .Then<PrintMessage>()
                                .Input(step => step.Message, data => "Item 2.3"))
                    .Do(then =>
                        then.StartWith<PrintMessage>()
                                .Input(step => step.Message, data => "Item 3.1")
                            .Then<PrintMessage>()
                                .Input(step => step.Message, data => "Item 3.2"))
                .Join()
                .Then<SayGoodbye>();

有三个 Do,代表三个并行任务。三个 Do 是并行的,Do 内的代码,会按顺序执行。

Paeallel 的 Do:

    public interface IParallelStepBuilder<TData, TStepBody>
        where TStepBody : IStepBody
    {
        IParallelStepBuilder<TData, TStepBody> Do(Action<IWorkflowBuilder<TData>> builder);
        IStepBuilder<TData, Sequence> Join();
    }

比起 ForEach、When、While、If,除了有 Do,还有 Join 方法。

对于其它节点类型来说,Do直接构建节点。

对于Parallel来说,Do收集任务,最终需要Join来构建节点和运行任务。

五,其它

写得长不好看,其它内容压缩一下。

数据传递和依赖注入

Workflow Core 支持对每个步骤点进行依赖注入。

1565439224(1).png)

支持数据持久化

Workflow Core 支持将构建的工作流存储到数据库中,以便以后再次调用。

支持 Sql Server、Mysql、SQLite、PostgreSQL、Redis、MongoDB、AWS、Azure、

Elasticsearch、RabbitMQ... ....

支持动态调用和动态生成工作流

你可以通过 C# 代码构建工作流,或者通过 Json、Yaml 动态构建工作流。

可以利用可视化设计器,将逻辑和任务生成配置文件,然后动态传递,使用 Workflow Core 动态创建工作流。

篇幅有限,不再赘述。

有兴趣请关注 Workflow Core:https://github.com/danielgerlag/workflow-core

目录
相关文章
|
JSON 数据安全/隐私保护 数据格式
开源利器:it-tools 项目介绍
作为一名开发人员,我们在日常工作和学习中常常需要使用一系列小工具,如JSON格式化、JSON转表格、当前时间戳、XML格式化、SQL格式化、密码生成以及UUID生成等。通常情况下,我们会在网上搜索各种在线工具来满足这些需求。然而,这些在线工具虽然众多,却分散在各个网站,有些还存在登录和广告等繁琐问题。作为一名经常在编程世界里制造Bug的工程师,难道你不希望拥有一个属于自己的工具集吗?最近,我恰巧发现了一个名为IT-Tools的开源项目,它恰好包含了我们经常使用的所有工具。在本文中,我们将介绍IT-Tools的主要功能,并探讨如何使用Docker进行部署。
849 4
开源利器:it-tools 项目介绍
|
1月前
|
消息中间件 监控 数据可视化
Apache Airflow 开源最顶级的分布式工作流平台
Apache Airflow 是一个用于创作、调度和监控工作流的平台,通过将工作流定义为代码,实现更好的可维护性和协作性。Airflow 使用有向无环图(DAG)定义任务,支持动态生成、扩展和优雅的管道设计。其丰富的命令行工具和用户界面使得任务管理和监控更加便捷。适用于静态和缓慢变化的工作流,常用于数据处理。
Apache Airflow 开源最顶级的分布式工作流平台
|
3月前
|
人工智能 测试技术 人机交互
深入浅出智能工作流(Agentic Workflow)|技术干货
著名AI学者、斯坦福大学教授吴恩达提出AI Agent的四种设计方式后,Agentic Workflow(智能体工作流)在全球范围内迅速走红,多个行业纷纷实践其应用,并推动了新的Agentic AI探索热潮。吴恩达总结了Agent设计的四种模式:自我反思、工具调用、规划设计及多智能体协作。前两者较普及,后两者则为智能体使用模式从单一大模型向多智能体协同配合完成业务流程的转变奠定了基础。
721 3
|
5月前
|
人工智能 API 决策智能
智胜未来:国内大模型+Agent应用案例精选,以及主流Agent框架开源项目推荐
【7月更文挑战第8天】智胜未来:国内大模型+Agent应用案例精选,以及主流Agent框架开源项目推荐
2479 9
智胜未来:国内大模型+Agent应用案例精选,以及主流Agent框架开源项目推荐
|
6月前
|
运维 Serverless Shell
Serverless 应用引擎产品使用合集之如何完成Python依赖环境配置
阿里云Serverless 应用引擎(SAE)提供了完整的微服务应用生命周期管理能力,包括应用部署、服务治理、开发运维、资源管理等功能,并通过扩展功能支持多环境管理、API Gateway、事件驱动等高级应用场景,帮助企业快速构建、部署、运维和扩展微服务架构,实现Serverless化的应用部署与运维模式。以下是对SAE产品使用合集的概述,包括应用管理、服务治理、开发运维、资源管理等方面。
|
7月前
|
机器学习/深度学习 自然语言处理 数据挖掘
【LangChain系列】第七篇:工作流(链)简介及实践
【5月更文挑战第21天】LangChain是一个框架,利用“链”的概念将复杂的任务分解为可管理的部分,便于构建智能应用。数据科学家可以通过组合不同组件来处理和分析非结构化数据。示例中展示了如何使用LLMChain结合OpenAI的GPT-3.5-turbo模型,创建提示模板以生成公司名称和描述。顺序链(SimpleSequentialChain和SequentialChain)则允许按顺序执行多个步骤,处理多个输入和输出
1048 1
|
开发框架 关系型数据库 Docker
推荐一个.Ner Core开发的配置中心开源项目
推荐一个.Ner Core开发的配置中心开源项目
75 0
|
人工智能 API 算法框架/工具
课时1:ModelScope社区Library技术架构介绍
课时1:ModelScope社区Library技术架构介绍
|
XML Oracle Java
Flowable工作流入门看这篇就够了
Flowable工作流入门看这篇就够了
8465 1
|
JSON 前端开发 JavaScript
这些年我开源的几个小项目
笔者是一个平平无奇的前端打工人,没有参与过啥热门开源项目的共建,所以每次说自己热爱开源都很心虚,充其量就是热爱使用开源项目,不过这两年来也陆续做了几个小项目,虽然只有时不时的来几个`star`,不过也给我安静的`github`平添了几分人气,本文就给大家推荐一下笔者的开源项目,如果觉得可以欢迎给个关注~
317 1
这些年我开源的几个小项目