C#多线程(16):手把手教你撸一个工作流

简介: C#多线程(16):手把手教你撸一个工作流

前言


前面学习了很多多线程和任务的基础知识,这里要来实践一下啦。通过本篇教程,你可以写出一个简单的工作流引擎。


本篇教程内容完成是基于任务的,只需要看过笔者的三篇关于异步的文章,掌握 C# 基础,即可轻松完成。


由于本篇文章编写的工作流程序,主要使用任务,有些逻辑过程会比较难理解,多测试一下就好。代码主要还是 C# 基础,为什么说简单?

  • 不包含 async 、await
  • 几乎不含包含多线程(有个读写锁)
  • 不包含表达式树
  • 几乎不含反射(有个小地方需要反射一下,但是非常简单)
  • 没有复杂的算法


因为是基于任务(Task)的,所以可以轻松设计组合流程,组成复杂的工作流。

由于只是讲述基础,所以不会包含很多种流程控制,这里只实现一些简单的。


先说明,别用到业务上。。。这个工作流非常简单,就几个功能,这个工作流是基于笔者的多线程系列文章的知识点。写这个东西是为了讲解任务操作,让读者更加深入理解任务。

代码地址:https://github.com/whuanle/CZGL.FLow

这两天忙着搬东西,今天没认真写文章,代码不明白的地方,可以到微信群找我。微信名称:痴者工良,dotnet 的群基本我都在。


节点


在开始前,我们来设计几种流程控制的东西。

将一个 步骤/流程/节点 称为 step。


Then

一个普通的节点,包含一个任务。

多个 Then 节点,可以组成一条连续的工作流。

微信图片_20220504102845.png


Parallel

并行节点,可以设置多个并行节点放到 Parallel 中,以及在里面为任一个节点创建新的分支。

微信图片_20220504102848.png

Schedule

定时节点,创建后会在一定时间后执行节点中的任务。

微信图片_20220504102851.png

Delay

让当前任务阻塞一段时间。

微信图片_20220504102854.png

试用一下


顺序节点

打开你的 VS ,创建项目,Nuget 引用 CZGL.DoFlow ,版本 1.0.2 。

创建一个类 MyFlow1,继承 IDoFlow


public class MyFlow1 : IDoFlow
    {
        public int Id => 1;
        public string Name => "随便起个名字";
        public int Version => 1;
        public IDoFlowBuilder Build(IDoFlowBuilder builder)
        {
            throw new NotImplementedException();
        }
    }


你可以创建多个工作流任务,每个工作流的 Id 必须唯一。Name 和 Version 随便填,因为这里笔者没有对这几个字段做逻辑。

IDoFlowBuilder 是构建工作流的一个接口。


我们来写一个工作流测试一下。

/// <summary>
/// 普通节点 Then 使用方法
/// </summary>
public class MyFlow1 : IDoFlow
{
    public int Id => 1;
    public string Name => "test";
    public int Version => 1;
    public IDoFlowBuilder Build(IDoFlowBuilder builder)
    {
        builder.StartWith(() =>
        {
            Console.WriteLine("工作流开始");
        }).Then(() =>
        {
            Console.WriteLine("下一个节点");
        }).Then(() =>
         {
             Console.WriteLine("最后一个节点");
         });
        return builder;
    }
}


Main 方法中:

static void Main(string[] args)
        {
            FlowCore.RegisterWorkflow<MyFlow1>();
            // FlowCore.RegisterWorkflow(new MyFlow1());
            FlowCore.Start(1);
            Console.ReadKey();
        }


.StartWith() 方法开始一个工作流;

FlowCore.RegisterWorkflow<T>() 注册一个工作流;

FlowCore.Start();执行一个工作流;


并行任务

其代码如下:

/// <summary>
    /// 并行节点 Parallel 使用方法
    /// </summary>
    public class MyFlow2 : IDoFlow
    {
        public int Id => 2;
        public string Name => "test";
        public int Version => 1;
        public IDoFlowBuilder Build(IDoFlowBuilder builder)
        {
            builder.StartWith()
                .Parallel(steps =>
                {
                    // 每个并行任务也可以设计后面继续执行其它任务
                    steps.Do(() =>
                    {
                        Console.WriteLine("并行1");
                    }).Do(() =>
                    {
                        Console.WriteLine("并行2");
                    });
                    steps.Do(() =>
                    {
                        Console.WriteLine("并行3");
                    });
                    // 并行任务设计完成后,必须调用此方法
                    // 此方法必须放在所有并行任务 .Do() 的最后
                    steps.EndParallel();
                    // 如果 .Do() 在 EndParallel() 后,那么不会等待此任务
                    steps.Do(() => { Console.WriteLine("并行异步"); });
                    // 开启新的分支
                    steps.StartWith()
                    .Then(() =>
                    {
                        Console.WriteLine("新的分支" + Task.CurrentId);
                    }).Then(() => { Console.WriteLine("分支2.0" + Task.CurrentId); });
                }, false)
                .Then(() =>
                {
                    Console.WriteLine("11111111111111111 ");
                });
            return builder;
        }
    }


Main 方法中:

static void Main(string[] args)
        {
            FlowCore.RegisterWorkflow<MyFlow2>();
            FlowCore.Start(2);
            Console.ReadKey();
        }


通过以上示例,可以大概了解本篇文章中我们要写的程序。


编写工作流


建立一个类库项目,名为 DoFlow

建立 ExtensionsInterfacesServices 三个目录。


接口构建器

新建 IStepBuilder 接口文件到 Interfaces 目录,其内容如下:

using System;
namespace DoFlow.Interfaces
{
    public interface IStepBuilder
    {
        /// <summary>
        /// 普通节点
        /// </summary>
        /// <param name="stepBuilder"></param>
        /// <returns></returns>
        IStepBuilder Then(Action action);
        /// <summary>
        /// 多个节点
        /// <para>默认下,需要等待所有的任务完成,这个step才算完成</para>
        /// </summary>
        /// <param name="action"></param>
        /// <param name="anyWait">任意一个任务完成即可跳转到下一个step</param>
        /// <returns></returns>
        IStepBuilder Parallel(Action<IStepParallel> action, bool anyWait = false);
        /// <summary>
        /// 节点将在某个时间间隔后执行
        /// <para>异步,不会阻塞当前工作流的运行,计划任务将在一段时间后触发</para>
        /// </summary>
        /// <returns></returns>
        IStepBuilder Schedule(Action action, TimeSpan time);
        /// <summary>
        /// 阻塞一段时间
        /// </summary>
        /// <param name="time"></param>
        /// <returns></returns>
        IStepBuilder Delay(TimeSpan time);
    }
}


新建 IStepParallel 文件到 Interfaces 目录。

using System;
namespace DoFlow.Interfaces
{
    /// <summary>
    /// 并行任务
    ///  <para>默认情况下,只有这个节点的所有并行任务都完成后,这个节点才算完成</para>
    /// </summary>
    public interface IStepParallel
    {
        /// <summary>
        /// 一个并行任务
        /// </summary>
        /// <param name="action"></param>
        /// <returns></returns>
        IStepParallel Do(Action action);
        /// <summary>
        /// 开始一个分支
        /// </summary>
        /// <param name="action"></param>
        /// <returns></returns>
        IStepBuilder StartWith(Action action = null);
        /// <summary>
        /// 必须使用此方法结束一个并行任务
        /// </summary>
        void EndParallel();
    }
    /// <summary>
    /// 并行任务
    /// <para>任意一个任务完成后,就可以跳转到下一个 step</para>
    /// </summary>
    public interface IStepParallelAny : IStepParallel
    {
    }
}


工作流构建器

新建 IDoFlowBuilder 接口文件到 Interfaces 目录。

using System;
using System.Threading.Tasks;
namespace DoFlow.Interfaces
{
    /// <summary>
    /// 构建工作流任务
    /// </summary>
    public interface IDoFlowBuilder
    {
        /// <summary>
        /// 开始一个 step
        /// </summary>
        IStepBuilder StartWith(Action action = null);
        void EndWith(Action action);
        Task ThatTask { get; }
    }
}


新建 IDoFlow 接口文件到 Interfaces 目录。

namespace DoFlow.Interfaces
{
    /// <summary>
    /// 工作流
    /// <para>无参数传递</para>
    /// </summary>
    public interface IDoFlow
    {
        /// <summary>
        /// 全局唯一标识
        /// </summary>
        int Id { get; }
        /// <summary>
        /// 标识此工作流的名称
        /// </summary>
        string Name { get; }
        /// <summary>
        /// 标识此工作流的版本
        /// </summary>
        int Version { get; }
        IDoFlowBuilder Build(IDoFlowBuilder builder);
    }
}


依赖注入

新建 DependencyInjectionService 文件到 Services 目录。

用于实现依赖注入和解耦。

using DoFlow.Extensions;
using Microsoft.Extensions.DependencyInjection;
using System;
namespace DoFlow.Services
{
    /// <summary>
    /// 依赖注入服务
    /// </summary>
    public static class DependencyInjectionService
    {
        private static IServiceCollection _servicesList;
        private static IServiceProvider _services;
        static DependencyInjectionService()
        {
            IServiceCollection services = new ServiceCollection();
            _servicesList = services;
            // 注入引擎需要的服务
            InitExtension.StartInitExtension();
            var serviceProvider = services.BuildServiceProvider();
            _services = serviceProvider;
        }
        /// <summary>
        /// 添加一个注入到容器服务
        /// </summary>
        /// <typeparam name="TService"></typeparam>
        /// <typeparam name="TImplementation"></typeparam>
        public static void AddService<TService, TImplementation>()
            where TService : class
            where TImplementation : class, TService
        {
            _servicesList.AddTransient<TService, TImplementation>();
        }
        /// <summary>
        /// 获取需要的服务
        /// </summary>
        /// <typeparam name="TIResult"></typeparam>
        /// <returns></returns>
        public static TIResult GetService<TIResult>()
        {
            TIResult Tservice = _services.GetService<TIResult>();
            return Tservice;
        }
    }
}


添加一个 InitExtension 文件到 Extensions 目录。

using DoFlow.Interfaces;
using DoFlow.Services;
namespace DoFlow.Extensions
{
    public static class InitExtension
    {
        private static bool IsInit = false;
        public static void StartInitExtension()
        {
            if (IsInit) return;
            IsInit = true;
            DependencyInjectionService.AddService<IStepBuilder, StepBuilder>();
            DependencyInjectionService.AddService<IDoFlowBuilder, DoFlowBuilder>();
            DependencyInjectionService.AddService<IStepParallel, StepParallelWhenAll>();
            DependencyInjectionService.AddService<IStepParallelAny, StepParallelWhenAny>();
        }
    }
}


实现工作流解析

以下文件均在 Services 目录建立。

新建 StepBuilder 文件,用于解析节点,构建任务。

using DoFlow.Interfaces;
using System;
using System.Threading.Tasks;
namespace DoFlow.Services
{
    /// <summary>
    /// 节点工作引擎
    /// </summary>
    public class StepBuilder : IStepBuilder
    {
        private Task _task;
        /// <summary>
        /// 延迟执行
        /// </summary>
        /// <param name="time"></param>
        /// <returns></returns>
        public IStepBuilder Delay(TimeSpan time)
        {
            Task.Delay(time).Wait();
            return this;
        }
        /// <summary>
        /// 并行 step
        /// </summary>
        /// <param name="action"></param>
        /// <returns></returns>
        public IStepBuilder Parallel(Action<IStepParallel> action, bool anyAwait = false)
        {
            IStepParallel parallel = anyAwait ? DependencyInjectionService.GetService<IStepParallelAny>() : DependencyInjectionService.GetService<IStepParallel>();
            Task task = new Task(() =>
            {
                action.Invoke(parallel);
            });
            _task.ConfigureAwait(false).GetAwaiter().OnCompleted(() =>
            {
                task.Start();
            });
            _task = task;
            return this;
        }
        /// <summary>
        /// 计划任务
        /// </summary>
        /// <param name="action"></param>
        /// <param name="time"></param>
        /// <returns></returns>
        public IStepBuilder Schedule(Action action, TimeSpan time)
        {
            Task.Factory.StartNew(() =>
            {
                Task.Delay(time).Wait();
                action.Invoke();
            });
            return this;
        }
        /// <summary>
        /// 普通 step
        /// </summary>
        /// <param name="action"></param>
        /// <returns></returns>
        public IStepBuilder Then(Action action)
        {
            Task task = new Task(action);
            _task.ConfigureAwait(false).GetAwaiter().OnCompleted(() =>
            {
                task.Start();
                task.Wait();
            });
            _task = task;
            return this;
        }
        public void SetTask(Task task)
        {
            _task = task;
        }
    }
}


新建 StepParallel 文件,里面有两个类,用于实现同步任务。

using DoFlow.Interfaces;
using System;
using System.Collections.Generic;
using System.Threading.Tasks;
namespace DoFlow.Services
{
    /// <summary>
    /// 第一层所有任务结束后才能跳转下一个 step
    /// </summary>
    public class StepParallelWhenAll : IStepParallel
    {
        private Task _task;
        private readonly List<Task> _tasks = new List<Task>();
        public StepParallelWhenAll()
        {
            _task = new Task(() => { },TaskCreationOptions.AttachedToParent);
        }
        public IStepParallel Do(Action action)
        {
            _tasks.Add(Task.Run(action));
            return this;
        }
        public void EndParallel()
        {
            _task.ConfigureAwait(false).GetAwaiter().OnCompleted(() =>
            {
                Task.WhenAll(_tasks).Wait();
            });
        }
        public IStepBuilder StartWith(Action action = null)
        {
            Task task =
                action is null ? new Task(() => { })
                : new Task(action);
            var _stepBuilder = DependencyInjectionService.GetService<IStepBuilder>();
            _task.ConfigureAwait(false).GetAwaiter().OnCompleted(() => { task.Start(); });
            return _stepBuilder;
        }
    }
    /// <summary>
    /// 完成任意一个任务即可跳转到下一个 step
    /// </summary>
    public class StepParallelWhenAny : IStepParallelAny
    {
        private Task _task;
        private readonly List<Task> _tasks = new List<Task>();
        public StepParallelWhenAny()
        {
            _task = Task.Run(() => { });
        }
        public IStepParallel Do(Action action)
        {
            _tasks.Add(Task.Run(action));
            return this;
        }
        public void EndParallel()
        {
            _task.ConfigureAwait(false).GetAwaiter().OnCompleted(() =>
            {
                Task.WhenAny(_tasks).Wait();
            });
        }
        public IStepBuilder StartWith(Action action = null)
        {
            Task task =
                action is null ? new Task(() => { })
                : new Task(action);
            var _stepBuilder = DependencyInjectionService.GetService<IStepBuilder>();
            _task.ConfigureAwait(false).GetAwaiter().OnCompleted(() => { task.Start(); });
            return _stepBuilder;
        }
    }
}


新建 DoFlowBuilder 文件,用于构建工作流。

using DoFlow.Interfaces;
using System;
using System.Threading.Tasks;
namespace DoFlow.Services
{
    public class DoFlowBuilder : IDoFlowBuilder
    {
        private Task _task;
        public Task ThatTask => _task;
        public void EndWith(Action action)
        {
            _task.Start();
        }
        public IStepBuilder StartWith(Action action = null)
        {
            if (action is null)
                _task = new Task(() => { });
            else _task = new Task(action);
            IStepBuilder _stepBuilder = DependencyInjectionService.GetService<IStepBuilder>();
            ((StepBuilder)_stepBuilder).SetTask(_task);
            return _stepBuilder;
        }
    }
}


新建 FlowEngine 文件,用于执行工作流。

using DoFlow.Interfaces;
namespace DoFlow.Services
{
    /// <summary>
    /// 工作流引擎
    /// </summary>
    public class FlowEngine
    {
        private readonly IDoFlow _flow;
        public FlowEngine(IDoFlow flow)
        {
            _flow = flow;
        }
        /// <summary>
        /// 开始一个工作流
        /// </summary>
        public void Start()
        {
            IDoFlowBuilder builder = DependencyInjectionService.GetService<IDoFlowBuilder>();
            _flow.Build(builder).ThatTask.Start();
        }
    }
}


新建 FlowCore 文件,用于存储和索引工作流。使用读写锁解决并发字典问题。

using DoFlow.Interfaces;
using System;
using System.Collections.Generic;
using System.Threading;
namespace DoFlow.Services
{
    public static class FlowCore
    {
        private static Dictionary<int, FlowEngine> flowEngines = new Dictionary<int, FlowEngine>();
        // 读写锁
        private static ReaderWriterLockSlim readerWriterLockSlim = new ReaderWriterLockSlim();
        /// <summary>
        /// 注册工作流
        /// </summary>
        /// <param name="flow"></param>
        public static bool RegisterWorkflow(IDoFlow flow)
        {
            try
            {
                readerWriterLockSlim.EnterReadLock();
                if (flowEngines.ContainsKey(flow.Id))
                    return false;
                flowEngines.Add(flow.Id, new FlowEngine(flow));
                return true;
            }
            finally
            {
                readerWriterLockSlim.ExitReadLock();
            }
        }
        /// <summary>
        /// 注册工作流
        /// </summary>
        /// <param name="flow"></param>
        public static bool RegisterWorkflow<TDoFlow>()
        {
            Type type = typeof(TDoFlow);
            IDoFlow flow = (IDoFlow)Activator.CreateInstance(type);
            try
            {
                readerWriterLockSlim.EnterReadLock();
                if (flowEngines.ContainsKey(flow.Id))
                    return false;
                flowEngines.Add(flow.Id, new FlowEngine(flow));
                return true;
            }
            finally
            {
                readerWriterLockSlim.ExitReadLock();
            }
        }
        /// <summary>
        /// 要启动的工作流
        /// </summary>
        /// <param name="id"></param>
        public static bool Start(int id)
        {
            FlowEngine engine;
            // 读写锁
            try
            {
                readerWriterLockSlim.EnterUpgradeableReadLock();
                if (!flowEngines.ContainsKey(id))
                    return default;
                try
                {
                    readerWriterLockSlim.EnterWriteLock();
                    engine = flowEngines[id];
                }
                catch { return default; }
                finally
                {
                    readerWriterLockSlim.ExitWriteLock();
                }
            }
            catch { return default; }
            finally
            {
                readerWriterLockSlim.ExitUpgradeableReadLock();
            }
            engine.Start();
            return true;
        }
    }
}

就这样程序写完了。

相关文章
|
5月前
|
数据采集 XML JavaScript
C# 中 ScrapySharp 的多线程下载策略
C# 中 ScrapySharp 的多线程下载策略
|
2月前
|
开发工具 C# git
C#一分钟浅谈:Git 版本控制与 GitFlow 工作流
【10月更文挑战第22天】本文介绍了 Git 和 GitFlow 的结合使用,从基础概念到具体操作,涵盖了安装配置、基本命令、GitFlow 工作流的核心分支和流程示例。同时,文章还讨论了常见的问题和易错点,如忽略文件、冲突解决、回退提交和分支命名规范,并提供了代码案例。通过学习本文,读者可以更好地理解和应用 Git 及 GitFlow,提高团队协作效率。
70 1
|
4月前
|
安全 数据库连接 API
C#一分钟浅谈:多线程编程入门
在现代软件开发中,多线程编程对于提升程序响应性和执行效率至关重要。本文从基础概念入手,详细探讨了C#中的多线程技术,包括线程创建、管理及常见问题的解决策略,如线程安全、死锁和资源泄露等,并通过具体示例帮助读者理解和应用这些技巧,适合初学者快速掌握C#多线程编程。
90 0
|
5月前
|
安全 C# 开发者
【C# 多线程编程陷阱揭秘】:小心!那些让你的程序瞬间崩溃的多线程数据同步异常问题,看完这篇你就能轻松应对!
【8月更文挑战第18天】多线程编程对现代软件开发至关重要,特别是在追求高性能和响应性方面。然而,它也带来了数据同步异常等挑战。本文通过一个简单的计数器示例展示了当多个线程无序地访问共享资源时可能出现的问题,并介绍了如何使用 `lock` 语句来确保线程安全。此外,还提到了其他同步工具如 `Monitor` 和 `Semaphore`,帮助开发者实现更高效的数据同步策略,以达到既保证数据一致性又维持良好性能的目标。
70 0
|
7月前
|
并行计算 算法 C#
C# Mandelbrot和Julia分形图像生成程序更新到2010-9-14版 支持多线程计算 多核处理器
此文档是一个关于分形图像生成器的介绍,作者分享了个人开发的M-J算法集成及色彩创新,包括源代码和历史版本。作者欢迎有兴趣的读者留言交流,并提供了邮箱(delacroix_xu@sina.com)以分享资源。文中还展示了程序的发展历程,如增加了真彩色效果、圈选放大、历史记录等功能,并分享了几幅精美的分形图像。此外,还提到了程序的新特性,如导入ini文件批量输出图像和更新一批图片的功能。文档末尾附有多张程序生成的高分辨率分形图像示例。
|
7月前
|
大数据 C#
C#实现多线程的几种方式
C#实现多线程的几种方式
|
8月前
|
安全 Java C#
C#多线程详解
C#多线程详解
78 0
|
8月前
|
开发框架 前端开发 .NET
C#编程与Web开发
【4月更文挑战第21天】本文探讨了C#在Web开发中的应用,包括使用ASP.NET框架、MVC模式、Web API和Entity Framework。C#作为.NET框架的主要语言,结合这些工具,能创建动态、高效的Web应用。实际案例涉及企业级应用、电子商务和社交媒体平台。尽管面临竞争和挑战,但C#在Web开发领域的前景将持续拓展。
233 3
|
2月前
|
C# 开发者
C# 一分钟浅谈:Code Contracts 与契约编程
【10月更文挑战第26天】本文介绍了 C# 中的 Code Contracts,这是一个强大的工具,用于通过契约编程增强代码的健壮性和可维护性。文章从基本概念入手,详细讲解了前置条件、后置条件和对象不变量的使用方法,并通过具体代码示例进行了说明。同时,文章还探讨了常见的问题和易错点,如忘记启用静态检查、过度依赖契约和性能影响,并提供了相应的解决建议。希望读者能通过本文更好地理解和应用 Code Contracts。
44 3