技术背景
硬件线程和软件线程
多核处理器带有一个以上的物理内核:物理内核是真正的独立处理单元,多个物理内核使得多条指令能够同时并行运行。
硬件线程也称为逻辑内核,一个物理内核可能会使用超线程技术提供多个硬件线程,所以一个硬件线程并不代表一个物理内核。
程序通过Environment.ProcessorCount
得到的就是逻辑内核(本人的机器是i5-5300U 虚拟4核), Windows中每个运行的程序都是一个进程,每一个进程都会创建并运行一个或多个线程,这些线程称为软件线程,硬件线程就像是一条泳道,而软件线程就是在其中游泳的人。
并行场景
.NET引入的Task Parallel Library(任务并行库,TPL),动态地扩展并发度,以最有效的方式使用所有可用的处理器。
另外TPL支持分区工作、支持基于ThreadPool调度、支持取消异步操作、支持状态管理。
通过TPL专注与让程序完成你业务意义上的任务,同时最大限度的提高程序性能。
TPL同时支持数据并行、任务并行和流水线Dataflow
1.数据并行:有大量数据需要处理,并且必须对每一份数据执行同样的操作;
2.任务并行:通过任务并发运行不同的操作;
3.流水线:任务并行和数据并行的结合体(需要引入System.Threading.Tasks.Dataflow组件库)
其中1、3 已经在上文演示,本文就随手拿数据并行、任务并行聊一聊。
编程实践
1. 数据并行
找到100000以内素数的个数
上文[共享内存并发模型],代码可做如下优化:
由每个线程独立计算线程内迭代产生的素数和,最后再对几个和求和。
using System; using System.Threading.Tasks; using System.Collections; using System.Collections.Generic; using System.Threading; using System.Diagnostics; /// <summary> /// 利用并行编程库Parallel,计算100000内素数的个数 /// </summary> namespace Paralleler { class Program { static void Main(string[] args) { Stopwatch sw = new Stopwatch(); sw.Start(); ShareMemory(); sw.Stop(); Console.WriteLine($"优化后的共享内存并发模型耗时:{sw.Elapsed}"); } static void ShareMemory() { var sum = 0; Parallel.For(1, 100000 + 1, () => 0, (x, state, local) => { var f = true; if (x == 1) f = false; for (int i = 2; i <= x / 2; i++) { if (x % i == 0) // 被[2,x/2]任一数字整除,就不是质数 f = false; } if (f == true) local++; return local; }, local => { Interlocked.Add(ref sum, local); } ); Console.WriteLine($"1-100000内质数的个数是{sum}"); } } }
参数1,2 表示数据并行要操作的对象;
参数3localInit
表示某线程内迭代的初始值,将会作为参数4body
委托的第3个参数,只在线程第一次使用;
参数4body
表示每个迭代都需要经历的执行体, 这里以线程为单元处理迭代;
2. 任务并行
让许多方法并行运行的最简单的方法就是使用Parallel类的Invoke
方法,Invoke方法接受一个Action的参数组
void System.Threading.Tasks.Parallel.Invoke(WatchMovie, HaveDinner, ReadBook, WriteBlog);
这段代码会创建指向每一个方法的委托。
没有特定的执行顺序
Parallel.Invoke方法只有在4个方法全部完成之后才会返回。它至少需要4个硬件线程才足以让这4个方法并发运行。
但并不保证这4个方法能够同时启动运行,如果一个或者多个内核处于繁忙状态,那么底层的调度逻辑可能会延迟某些方法的初始化执行。
捕捉并行循环中发生的异常
当并行迭代中调用的委托抛出异常,这个异常没有在委托中被捕获到时,就会变成一组异常,新的System.AggregateException
负责处理这一组异常。