.Net Core中利用TPL(任务并行库)构建Pipeline处理Dataflow

简介: 在学习的过程中,看一些一线的技术文档很吃力,而且考虑到国内那些技术牛人英语都不差的,要向他们看齐,所以每天下班都在疯狂地背单词,博客有些日子没有更新了,见谅见谅 什么是TPL? Task Parallel Library (TPL), 在.NET Framework 4微软推出TPL,并把TPL作为编写多线程和并行代码的首选方式,但是,在国内,到目前为止好像用的人并不多。

在学习的过程中,看一些一线的技术文档很吃力,而且考虑到国内那些技术牛人英语都不差的,要向他们看齐,所以每天下班都在疯狂地背单词,博客有些日子没有更新了,见谅见谅 Smile with tongue out

什么是TPL?

Task Parallel Library (TPL), 在.NET Framework 4微软推出TPL,并把TPL作为编写多线程和并行代码的首选方式,但是,在国内,到目前为止好像用的人并不多。(TPL)是System.ThreadingSystem.Threading.Tasks命名空间中的一组公共类型和API 。TPL的目的是通过简化向应用程序添加并行性和并发性的过程来提高开发人员的工作效率,TPL动态地扩展并发度,以最有效地使用所有可用的处理器。通过使用TPL,您可以最大限度地提高代码的性能,让我们专注于程序本身而不用去关注负责的多线程管理。

出自: https://docs.microsoft.com/en-us/dotnet/standard/parallel-programming/task-parallel-library-tpl

为什么使用TPL?

在上面介绍了什么是TPL,可能大家还是云里雾里,不知道TPL的好处到底是什么。

我在youtube上找到了一个优秀的视频,讲述的是TPL和Thread的区别,我觉得对比一下,TPL的优势很快就能体现出来,如果大家能打开的话建议大家一定要看看。

地址是:https://www.youtube.com/watch?v=No7QqSc5cl8

现如今,我们的电脑的CPU怎么也是2核以上,下面假设我的电脑是四核的,我们来做一个实验。

使用Thread

代码中,如果使用Thread来处理任务,如果不做特出的处理,只是thread.Start(),监测电脑的核心的使用情况是下面这样的。

TIM截图20181003221820

每一条线代表CPU某个核心的使用情况,明显,随着代码Run起来,其实只有某一个核心的使用率迅速提升,其他核心并无明显波动,为什么会这样呢?

 

TIM截图20181003221925

原来,默认情况下,操作系统并不会调用所有的核心来处理任务,即使我们使用多线程,其实也是在一个核心里面运行这些Thread,而且Thread之间涉及到线程同步等问题,其实,效率也不会明显提高。

使用TPL

在代码中,引入了TPL来处理相同的任务,再次监视各个核心的使用情况,效果就变得截然不同,如下。

TIM截图20181003222605

可以看到各个核心的使用情况都同时有了明显的提高。

TIM截图20181003222044

说明使用TPL后,不再是使用CPU的某个核心来处理任务了,而是TPL自动把任务分摊给每个核心来处理,处理效率可想而知,理论上会有明显提升的(为什么说理论上?和使用多线程一样,各个核心之间的同步管理也是要占用一定的效率的,所以对于并不复杂的任务,使用TPL可能适得其反)。

实验结果出自https://www.youtube.com/watch?v=No7QqSc5cl8

看了这个实验讲解,是不是理解了上面所说的这句。

TPL的目的是通过简化向应用程序添加并行性和并发性的过程来提高开发人员的工作效率,TPL动态地扩展并发度,以最有效地使用所有可用的处理器。

 

所以说,使用TPL 来处理多线程任务可以让你不必吧把精力放在如何提高多线程处理效率上,因为这一切,TPL 能自动地帮你完成。

TPL Dataflow?

TPL处理Dataflow是TPL强大功能中的一种,它提供一套完整的数据流组件,这些数据流组件统称为TPL Dataflow Library,那么,在什么场景下适合使用TPL Dataflow Library呢?

官方举的一个 栗子 再恰当不过:

例如,通过TPL Dataflow提供的功能来转换图像,执行光线校正或防红眼,可以创建管道数据流组件,管道中的每个功能可以并行执行,并且TPL能自动控制图像流在不同线程之间的同步,不再需要Thread 中的Lock。

TPL数据流库由Block组成,Block是缓冲和处理数据的单元,TPL定义了三种最基础的Block。

source blocksSystem.Threading.Tasks.Dataflow.ISourceBlock <TOutput>),源块充当数据源并且可以从中读取。

target blocksSystem.Threading.Tasks.Dataflow.ITargetBlock <TInput>,目标块充当数据接收器并可以写入。

propagator blocksSystem.Threading.Tasks.Dataflow.IPropagatorBlock <TInput,TOutput>),传播器块充当源块和目标块,并且可以被读取和写入。它继承自ISourceBlock <TOutput>ITargetBlock <TInput>

 

还有其他一些个性化的Block,但其实他们都是对这三种Block进行一些扩充,可以结合下面的代码来理解这三种Block.

Code Show

1.source block 和 target block 合并成propagator block.

 

private IPropagatorBlock<string, Dictionary<int, string>> Process1()
        {
            var bufferBlock = new BufferBlock<Dictionary<int, string>>();
            var actionBlock = new ActionBlock<string>(x =>
              {
                  Console.WriteLine($"Process1 处理中:{x}");
                  Thread.Sleep(5000);
                  var dic = new Dictionary<int, string> { { 0, x } };
                  dic.Add(1, "Process1");
                  bufferBlock.Post(dic);
              }, new ExecutionDataflowBlockOptions
              {
                  MaxDegreeOfParallelism = _maxDegreeOfParallelism
              });
            actionBlock.Completion.ContinueWith(_ =>
            {
                Console.WriteLine($"Process1 Complete,State{_.Status}");
                bufferBlock.Complete();
            });
            return DataflowBlock.Encapsulate(actionBlock, bufferBlock);
        }

 

可以看到,我定义了BufferBlock和ActionBlock,它们分别继承于ISourceBlock 和 ITargetBlock ,所以说,他们其实就是源块和目标块,在new actionBlock()中传入了一个Action<String>,该Action就是该Block所执行的任务。 最后,DataflowBlock.Encapsulate(actionBlock, bufferBlock)把源块和目标块合并成了一个传递块。

2.TransformBlock

private IPropagatorBlock<Dictionary<int, string>, Dictionary<int, string>> Process2()
        {
            var block = new TransformBlock<Dictionary<int, string>, Dictionary<int, string>>(dic =>
                  {
                      Console.WriteLine($"Process2 处理中:{dic.First().Value}");
                      Thread.Sleep(5000);
                      dic.Add(2, "Process2");
                      return dic;
                  }, new ExecutionDataflowBlockOptions
                  {
                      MaxDegreeOfParallelism = _maxDegreeOfParallelism
                  }
               );

            block.Completion.ContinueWith(_ =>
            {
                Console.WriteLine($"Process2 Complete,State{_.Status}");
            });

            return block;
        }

TransfromBlock继承了IPropagatorBlock,所以它本身就是一个传递块,所以它除了要处理出入数据,还要返回数据,所以给new TransformBlock()中传入的是Func<TInput, TOutput>而不是Action<TInput>.

 

3.TargetBlock来收尾

private ITargetBlock<Dictionary<int, string>> Process3()
        {
            var actionBlock = new ActionBlock<Dictionary<int, string>>(dic =>
               {
                   Console.WriteLine($"Process3 处理中:{dic.First().Value}");
                   Thread.Sleep(5000);
                   dic.Add(3, "Process3");
                   Console.WriteLine("Dic中的内容如下:");
                   foreach (var item in dic)
                   {
                       Console.Write($"{item.Key}:{item.Value}||");
                   }
                   Console.WriteLine();
               }, new ExecutionDataflowBlockOptions
               {
                   MaxDegreeOfParallelism = _maxDegreeOfParallelism
               });
            return actionBlock;
        }

TargetBlock只能写入并处理数据,不能读取,所以TargetBlock适合作为Pipeline的最后一个Block。

 

4.控制每个Block的并行度

在在构造TargetBlock(包括其子类)的时候,可以传入ExecutionDataflowBlockOptions参数,ExecutionDataflowBlockOptions对象里面有一个MaxDegreeOfParallelism属性,通过改制,可以控制该Block的同时处理任务的数量(可以理解成线程数)。

new ExecutionDataflowBlockOptions
               {
                   MaxDegreeOfParallelism = _maxDegreeOfParallelism
               }

 

5.构建Pipeline,连接Block

public Task Builder()
        {
            _startBlock = Process1();
            var process2Block = Process2();
            var process3Block = Process3();

            _startBlock.LinkTo(process2Block, new DataflowLinkOptions() { PropagateCompletion = true });

            process2Block.LinkTo(process3Block, new DataflowLinkOptions() { PropagateCompletion = true });

            process3Block.Completion.ContinueWith(_ =>
            {
                Console.WriteLine($"Process3 Complete,State{_.Status}");
                Console.WriteLine("所有任务处理完成");
            });

            return process3Block.Completion;
        }

通过

ISourceBlock<TOutput>.LinkTo(ITargetBlock<TOutput> target, DataflowLinkOptions linkOption)

方法,可以把Block连接起来,即构建Pipeline,当DataflowLinkOptions对象的PropagateCompletion属性为true时,SorceBlock任务处理完成是,会把TargetBlock也标记为完成。

 

Block被标记为Complete 后,无法传入新的数据了,即不能再处理新的任务了。

 

6.Pipeline的运行

public void Process(string[] inputs)
        {
            if (inputs == null)
                return;
            foreach (var input in inputs)
            {
                _startBlock.Post(input);
            }
            _startBlock.Complete();
        }

Pipeline构建好后,我们只需要给第一个Block传入数据,该数据就会在管道内流动起来了,所有数据传入完成后,调用Block的Complete方法,把该Block标记为完成,就不可以再往里面Post数据了。

 

完整代码如下:

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;

namespace Tpl.Dataflow
{
    public class Pipeline
    {
        IPropagatorBlock<string, Dictionary<int, string>> _startBlock;
        private int _maxDegreeOfParallelism;

        public Pipeline(int maxDegreeOfParallelism)
        {
            _maxDegreeOfParallelism = maxDegreeOfParallelism;
        }

        public void Process(string[] inputs)
        {
            if (inputs == null)
                return;
            foreach (var input in inputs)
            {
                _startBlock.Post(input);
            }
            _startBlock.Complete();
        }

        public Task Builder()
        {
            _startBlock = Process1();
            var process2Block = Process2();
            var process3Block = Process3();

            _startBlock.LinkTo(process2Block, new DataflowLinkOptions() { PropagateCompletion = true });

            process2Block.LinkTo(process3Block, new DataflowLinkOptions() { PropagateCompletion = true });

            process3Block.Completion.ContinueWith(_ =>
            {
                Console.WriteLine($"Process3 Complete,State{_.Status}");
                Console.WriteLine("所有任务处理完成");
            });

            return process3Block.Completion;
        }

        private IPropagatorBlock<string, Dictionary<int, string>> Process1()
        {
            var bufferBlock = new BufferBlock<Dictionary<int, string>>();
            var actionBlock = new ActionBlock<string>(x =>
              {
                  Console.WriteLine($"Process1 处理中:{x}");
                  Thread.Sleep(5000);
                  var dic = new Dictionary<int, string> { { 0, x } };
                  dic.Add(1, "Process1");
                  bufferBlock.Post(dic);
              }, new ExecutionDataflowBlockOptions
              {
                  MaxDegreeOfParallelism = _maxDegreeOfParallelism
              });
            actionBlock.Completion.ContinueWith(_ =>
            {
                Console.WriteLine($"Process1 Complete,State{_.Status}");
                bufferBlock.Complete();
            });
            return DataflowBlock.Encapsulate(actionBlock, bufferBlock);
        }

        private IPropagatorBlock<Dictionary<int, string>, Dictionary<int, string>> Process2()
        {
            var block = new TransformBlock<Dictionary<int, string>, Dictionary<int, string>>(dic =>
                  {
                      Console.WriteLine($"Process2 处理中:{dic.First().Value}");
                      Thread.Sleep(5000);
                      dic.Add(2, "Process2");
                      return dic;
                  }, new ExecutionDataflowBlockOptions
                  {
                      MaxDegreeOfParallelism = _maxDegreeOfParallelism
                  }
               );

            block.Completion.ContinueWith(_ =>
            {
                Console.WriteLine($"Process2 Complete,State{_.Status}");
            });

            return block;
        }

        private ITargetBlock<Dictionary<int, string>> Process3()
        {
            var actionBlock = new ActionBlock<Dictionary<int, string>>(dic =>
               {
                   Console.WriteLine($"Process3 处理中:{dic.First().Value}");
                   Thread.Sleep(5000);
                   dic.Add(3, "Process3");
                   Console.WriteLine("Dic中的内容如下:");
                   foreach (var item in dic)
                   {
                       Console.Write($"{item.Key}:{item.Value}||");
                   }
                   Console.WriteLine();
               }, new ExecutionDataflowBlockOptions
               {
                   MaxDegreeOfParallelism = _maxDegreeOfParallelism
               });
            return actionBlock;
        }
    }
}

 

Main方法如下:

static void Main(string[] args)
        {
            Console.WriteLine("请输入管道并发数:");
            if (int.TryParse(Console.ReadLine(), out int max))
            {
                var pipeline = new Pipeline(max);
                var task = pipeline.Builder();
                pipeline.Process(new[] { "", "", "", "" });
                task.Wait();
                Console.ReadKey();
            }
        }

 

测试运行如图:

image

我来解释一下,为什么是这么运行的,因为把管道的并行度设置为2,所以每个Block可以同时处理两个任务,所以,如果给管道传入四个字符 ,每个字符作为一个任务,假设传入  “码农阿宇”四个任务,会时这样的一个过程…..

  1. 码   农  两个首先进入Process1,
  2. 处理完成后,码  农   两个任务流出,
  3. Process1位置空出来, 阿  宇 两个任务流入 Process1,
  4. 码  农 两个任务流向 Process2,
  5. 阿  宇 从 Process1 处理完成后流出,此时Process1任务完成
  6. 码  农 流出 Process2 ,同时 阿 宇  流入 Process2 ……
  7. 依此类推….

 

该项目Github地址: https://github.com/liuzhenyulive/Tpl-Dataflow-Demo

参考文献:https://docs.microsoft.com/en-us/dotnet/standard/parallel-programming/dataflow-task-parallel-library

码字不易,如果对您有用,欢迎推荐和关注,谢谢Flirt male

相关文章
|
1月前
|
C#
一个.NET开源、轻量级的运行耗时统计库 - MethodTimer
一个.NET开源、轻量级的运行耗时统计库 - MethodTimer
|
15天前
|
开发框架 .NET 开发者
简化 ASP.NET Core 依赖注入(DI)注册-Scrutor
Scrutor 是一个简化 ASP.NET Core 应用程序中依赖注入(DI)注册过程的开源库,支持自动扫描和注册服务。通过简单的配置,开发者可以轻松地从指定程序集中筛选、注册服务,并设置其生命周期,同时支持服务装饰等高级功能。适用于大型项目,提高代码的可维护性和简洁性。仓库地址:&lt;https://github.com/khellang/Scrutor&gt;
37 5
|
1月前
|
人工智能 自然语言处理 API
适用于 .NET 稳定的官方OpenAI库
适用于 .NET 稳定的官方OpenAI库
|
1月前
|
开发框架 .NET C#
在 ASP.NET Core 中创建 gRPC 客户端和服务器
本文介绍了如何使用 gRPC 框架搭建一个简单的“Hello World”示例。首先创建了一个名为 GrpcDemo 的解决方案,其中包含一个 gRPC 服务端项目 GrpcServer 和一个客户端项目 GrpcClient。服务端通过定义 `greeter.proto` 文件中的服务和消息类型,实现了一个简单的问候服务 `GreeterService`。客户端则通过 gRPC 客户端库连接到服务端并调用其 `SayHello` 方法,展示了 gRPC 在 C# 中的基本使用方法。
42 5
在 ASP.NET Core 中创建 gRPC 客户端和服务器
|
15天前
|
弹性计算 开发框架 安全
基于云效 Windows 构建环境和 Nuget 制品仓库进行 .Net 应用开发
本文将基于云效 Flow 流水线 Windows 构建环境和云效 Packages Nuget 制品仓库手把手教你如何开发并部署一个 .NET 应用,从环境搭建到实战应用发布的详细教程,帮助你掌握 .NET 开发的核心技能。
|
23天前
|
开发框架 缓存 .NET
GraphQL 与 ASP.NET Core 集成:从入门到精通
本文详细介绍了如何在ASP.NET Core中集成GraphQL,包括安装必要的NuGet包、创建GraphQL Schema、配置GraphQL服务等步骤。同时,文章还探讨了常见问题及其解决方法,如处理复杂查询、错误处理、性能优化和实现认证授权等,旨在帮助开发者构建灵活且高效的API。
26 3
|
1月前
|
传感器 人工智能 供应链
.NET开发技术在数字化时代的创新作用,从高效的开发环境、强大的性能表现、丰富的库和框架资源等方面揭示了其关键优势。
本文深入探讨了.NET开发技术在数字化时代的创新作用,从高效的开发环境、强大的性能表现、丰富的库和框架资源等方面揭示了其关键优势。通过企业级应用、Web应用及移动应用的创新案例,展示了.NET在各领域的广泛应用和巨大潜力。展望未来,.NET将与新兴技术深度融合,拓展跨平台开发,推动云原生应用发展,持续创新。
35 4
|
1月前
|
开发框架 安全 .NET
.NET使用Moq开源模拟库简化单元测试
.NET使用Moq开源模拟库简化单元测试~
|
开发框架 前端开发 .NET
ASP.NET Core 核心特性学习笔记「下」
ASP.NET Core 核心特性学习笔记「下」
|
开发框架 前端开发 中间件
ASP.NET Core 核心特性学习笔记「上」
ASP.NET Core 核心特性学习笔记「上」