工作者引擎 IWorkerEngine -- ESBasic 可复用的.NET类库(05)

简介: 1.缘起:     假设我们的系统在运行的过程中,源源不断的有新的任务需要处理(比如订单处理),而且这些任务的处理是相互独立的,没有前后顺序依赖性(顺序依赖性是指,必须在任务A处理结束后才可开始B任务),那么我们就可以使用多个线程来同时处理多个任务。

1.缘起:

    假设我们的系统在运行的过程中,源源不断的有新的任务需要处理(比如订单处理),而且这些任务的处理是相互独立的,没有前后顺序依赖性(顺序依赖性是指,必须在任务A处理结束后才可开始B任务),那么我们就可以使用多个线程来同时处理多个任务。每个处理任务的线程称为“工作者(线程)”。
      
我设计了ESBasic.Threading.Engines.IWorkerEngine工作者引擎,其目的就是使用多个线程来并行处理任务,提高系统的吞吐能力。

      工作者引擎的形象示意图如下:
      
 

2.适用场合:

设计工作者引擎ESBasic.Threading.Engines.IWorkerEngine的主要目的是为了解决类似下面的问题:

(1)充分利用多CPU、多核计算资源。

(2)减少因高速设备与低速设备之间速度差而产生计算资源浪费。

(3)对于突发的大批量的任务(比如订单系统经常在其它时段接受的订单很少,但在某高峰期会有突发性的大量的订单进来)进行缓冲处理,并最大限度地利用现有资源进行处理。

 

3.设计思想与实现

       IWorkerEngine的设计思路是这样的:我们使用一个队列来存放需要处理的任务,新来的任务都会排队到这个队列中,然后有N个工作者线程不断地从队列中取出任务去处理,每个线程处理完当前任务后,又从队列中取出下一个任务……,如此循环。

       IWorkerEngine接口的源码对应如下:    

 

    public   interface   IWorkerEngine < T >
    {
        
///   <summary>
        
///  IdleSpanInMSecs 当没有工作要处理时,工作者线程休息的时间间隔。默认为10ms
        
///   </summary>
         int  IdleSpanInMSecs {  get ; set ; }

        
///   <summary>
        
///  WorkerThreadCount 工作者线程的数量。默认值为1。
        
///   </summary>
         int  WorkerThreadCount {  get set ; }

        
///   <summary>
        
///  WorkProcesser 用于处理任务的处理器。
        
///   </summary>
         IWorkProcesser < T >  WorkProcesser {  set ; }
        
        
///   <summary>
        
///  WorkCount 当前任务队列中的任务数。
        
///   </summary>
         int  WorkCount {  get ; }

        
///   <summary>
        
///  MaxWaitWorkCount 历史中最大的处于等待状态的任务数量。
        
///   </summary>
         int  MaxWaitWorkCount {  get ; }

        
void  Initialize();
        
void  Start();
        
void  Stop();

        
///   <summary>
        
///  AddWork 添加任务。
        
///   </summary>        
         void  AddWork(T work); 
    }

   

由于任务的类型不是固定的,所以我们使用的泛型参数T来表示要处理任务的类型。

所有的任务的具体执行都是由IWorkProcesser完成的:    

 

    public   interface   IWorkProcesser < T >
    {
        
void  Process(T work);
    }


      实现这个IWorkerEngine接口的时候要注意以下几点:

(1)AddWork方法会在多线程的环境中被调用,所以必须保证其是线程安全的。

(2)每个工作者线程实际上就是一个我们前面介绍的循环引擎ICycleEngine,只不过将其DetectSpanInSecs设为0即可,表示不间断地执行任务。WorkerEngine便是使用了NAgileCycleEngine实例来作为工作者的。这些AgileCycleEngine实例在Initialize方法中被实例化。

(3)所有的工作者最终都是执行私有的DoWork方法,这个方法就是从任务队列中取出任务并且调用IWorkProcesser来处理任务,如果任务队列为空,则等待IdleSpanInMSecs秒钟后再重试。

(4)MaxWaitWorkCount属性用于记录自从引擎运行以来最大的等待任务的数量,通过这个属性我们可以推测任务量与任务处理速度之间的差距。

(5)通过StartStop方法我们可以随时停止、启动工作者引擎,并可重复调用。

4. 使用时的注意事项

(1)     当引擎已经启动并正在运行时,如果要修改WorkerThreadCount的值并使其生效,则必须先调用Stop方法停止引擎,然后重新调用Initialize方法初始化引擎,再调用Start方法启动引擎。

(2)     关于工作者线程的个数N的设置的问题。这个数字不是越大越好,因为使用的线程越多,而CPU跟不上的话,那么消耗在线程切换上的浪费就越严重。所以,为了达到最好的性能,需要为工作者线程个数设置一个合适的值。
通常,这个值跟CPU的个数、CPU核的个数、任务的复杂度、慢速设备与快速设备之间的速度差以及它们的吞吐量有关。我们可以通过足够的测试来发现适合我们系统的N值。

      一般情况下的推荐值为:CPU个数*单个CPU的核数*2 + 1
 

5.扩展

1)“一次性”的工作者引擎:BriefWorkerEngine

    假设我们的系统可能会偶尔有一批任务要处理(也许永远也不会有这样的任务出现),我们希望只有当任务到来时,才使用一个工作者引擎实例来多线程处理它,处理完后,该引擎就可以释放掉。

      ESBasic.Threading.Engines.BriefWorkerEngine,精简的工作者引擎,便是为这一目的而设计的。它使用多线程处理一批任务,当这批任务处理结束后,工作者线程会被自动释放,而该引擎实例也就可以被结束了。

    为了方便使用,我将BriefWorkerEngine设计为从构造函数注入引擎运行所需要的参数,包括任务处理器、工作者线程个数、以及要处理的任务集合。在引擎实例被构造成功的同时,内部的循环引擎已经准备好了。注意,BriefWorkerEngine实现了IDisposable接口,这表明当引擎被释放时,内部所有的循环引擎都会停止运行,从而不再占有后台线程池中的线程。

我们可以这样来使用BriefWorkerEngine

 

            IWorkProcesser < MyTask >  processer  =  ... ;
            
IList < MyTask >  taskList  =  ... ;
            
BriefWorkerEngine < MyTask >  engine  =   new   BriefWorkerEngine < MyTask > (processer,  5 , taskList);
            engine.Start();
            
while  ( ! engine.IsFinished())
            {
                System.Threading.Thread.Sleep(
100 );
            }
            engine.Dispose();
            
// 执行到这里,表示所有任务已经处理完毕,引擎实例即将被释放。

       我们可以通过它的IsFinished方法来检测执行是否已经完成。当IsFinished方法返回true时,引擎实例就可以被销毁了。


2)永不停止的工作者引擎

我们同样可以考虑一个类似于循环引擎的扩展的情况,假设我们的系统要求在启动时就将工作者引擎运行起来,而且在整个运行的生命周期中,都不需要停止引擎,那么我们就不想将Start方法、Stop方法暴露出来以免意外的调用Stop方法而导致引擎停止运行,那这个时候我们可以使用相同的技巧来做到:

 

    public   sealed   class   MyWorkerEngine
    {
        
private   IWorkerEngine < MyTask >  workerEngine;

        
public   void  Initialize()
        {
            
this .workerEngine  =   new   WorkerEngine < MyTask > ();
            
this .workerEngine.WorkerThreadCount  =   5 ;
            
// this.workerEngine.WorkProcesser = .. 赋值
             this .workerEngine.Initialize();
            
this .workerEngine.Start();
        }
    }

    
public   class   MyTask    {    }

其道理与循环引擎的扩展是一样的。

注: ESBasic已经开源,点击这里下载源码。
    
ESBasic开源前言


目录
相关文章
|
3月前
|
开发框架 .NET Linux
2款高效的.NET二维码生成类库
2款高效的.NET二维码生成类库
|
3月前
|
XML 开发框架 数据格式
.Net Core 开发框架,支持多版本的类库
.Net Core 开发框架,支持多版本的类库
56 0
|
4月前
|
人工智能 开发框架 Devops
.NET技术概览:** 本文探讨了.NET的核心特性,包括多语言支持、Common Language Runtime、丰富的类库和跨平台能力,强调其在企业级、Web、移动及游戏开发中的应用。
【7月更文挑战第4天】.NET技术概览:** 本文探讨了.NET的核心特性,包括多语言支持、Common Language Runtime、丰富的类库和跨平台能力,强调其在企业级、Web、移动及游戏开发中的应用。此外,讨论了.NET如何通过性能优化、DevOps集成、AI与ML支持以及开源策略应对未来挑战,为开发者提供强大工具,共创软件开发新篇章。
51 3
|
4月前
|
人工智能 前端开发 Devops
NET技术在现代开发中的影响力日益增强,本文聚焦其核心价值,如多语言支持、强大的Visual Studio工具、丰富的类库和跨平台能力。
【7月更文挑战第4天】**.NET技术在现代开发中的影响力日益增强,本文聚焦其核心价值,如多语言支持、强大的Visual Studio工具、丰富的类库和跨平台能力。实际应用涵盖企业系统、Web、移动和游戏开发,以及云服务。面对性能挑战、容器化、AI集成及跨平台竞争,.NET持续创新,开发者应关注技术趋势,提升技能,并参与社区,共同推进技术发展。**
37 1
|
4月前
|
开发框架 .NET API
.NET Core 和 .NET 标准类库项目类型有什么区别?
在 Visual Studio 中,可创建三种类库:.NET Framework、.NET Standard 和 .NET Core。.NET Standard 是规范,确保跨.NET实现的API一致性,适用于代码共享。.NET Framework 用于特定技术,如旧版支持。.NET Core 库允许访问更多API但限制兼容性。选择取决于兼容性和所需API:需要广泛兼容性时用.NET Standard,需要更多API时用.NET Core。.NET Standard 替代了 PCL,促进多平台共享代码。
|
6月前
|
开发框架 安全 .NET
【专栏】.NET 技术:推动开发进步的引擎
【4月更文挑战第29天】本文探讨了.NET技术在软件开发中的重要性,强调其跨平台兼容性、丰富的开发工具和框架、高效性能及强安全稳定性。.NET的特性加速了应用开发,提升了质量和可靠性,并促进了创新和业务发展,同时培育了专业人才和技术社区。随着未来的发展,.NET将持续创新,为软件开发和数字化转型贡献力量,成为行业前进的关键驱动力。
39 0
|
6月前
|
C# 数据安全/隐私保护
一款实用的.NET Core加密解密工具类库
一款实用的.NET Core加密解密工具类库
|
2月前
|
开发框架 前端开发 JavaScript
ASP.NET MVC 教程
ASP.NET 是一个使用 HTML、CSS、JavaScript 和服务器脚本创建网页和网站的开发框架。
37 7
|
2月前
|
存储 开发框架 前端开发
ASP.NET MVC 迅速集成 SignalR
ASP.NET MVC 迅速集成 SignalR
52 0