2.1.3 分析
Parallel.Invoke 的使用过程中我们要注意以下特点:
没有特定的顺序,每个Task可能是不同的线程去执行,也可能是相同的;
Invoke中的方法全部执行完才返回,这样对我们以后设计并行的时候,要考虑每个Task任务尽可能差不多,如果相差很大,比如一个时间非常长,其他都比较短,这样一个线程可能会影响整个任务的性能。这点非常重要;
但是即使有异常在执行过程中也同样会完成,他只是一个很简单的并行处理方法,特点就是简单,不需要我们考虑线程的问题。主要Framework已经为我们控制好线程池的问题。
Invoke在每次调用都有开销的,不一定并行一定比串行好,要根据实际情况,内核环境多次测试调优才可以。如果在设计Invoke中有个需要很长时间,这样会影响整个Invoke的效率和性能,这个我们在设计每个task时候必须去考虑的。
Invoke 参数是委托方法。
异常处理比较复杂。
ps:如果其中有一个异常怎么办? 带做这个问题修改了增加了一个Task4.
/// <summary> /// Invoke方式一 action /// </summary> public void Client2() { Stopwatch stopWatch = new Stopwatch(); Console.WriteLine("主线程:{0}线程ID : {1};开始", "Client1", Thread.CurrentThread.ManagedThreadId); stopWatch.Start(); try { Parallel.Invoke( () => Task1("task1"), () => Task2("task2"), () => Task3("task3"), delegate() { throw new Exception("我这里发送了异常"); }); } catch (AggregateException ae) { foreach (var ex in ae.InnerExceptions) Console.WriteLine(ex.Message); } stopWatch.Stop(); Console.WriteLine("主线程:{0}线程ID : {1};结束,共用时{2}ms", "Client1", Thread.CurrentThread.ManagedThreadId, stopWatch.ElapsedMilliseconds); }
不建议在并行程序中写异常
2.2 ParallelOptions 参数模式
2.2.1 ParallelOptions类
ParallelOptions options = new ParallelOptions(); //指定使用的硬件线程数为4 options.MaxDegreeOfParallelism = 4;
有时候我们的线程可能会跑遍所有的内核,为了提高其他应用程序的稳定性,就要限制参与的内核,正好ParallelOptions提供了MaxDegreeOfParallelism属性。
2.2.2 示例代码
下述代码的执行原理为:
程序在执行过程中线程数码不超过3个
CancellationTokenSource/CancellationToken控制任务的取消。
// 定义CancellationTokenSource 控制取消 readonly CancellationTokenSource _cts = new CancellationTokenSource(); /// <summary> /// Invoke方式一 action /// </summary> public void Client3() { Console.WriteLine("主线程:{0}线程ID : {1};开始{2}", "Client3", Thread.CurrentThread.ManagedThreadId, DateTime.Now); var po = new ParallelOptions { CancellationToken = _cts.Token, // 控制线程取消 MaxDegreeOfParallelism = 3 // 设置最大的线程数3,仔细观察线程ID变化 }; Parallel.Invoke(po, () => Task1("task1"), () => Task5(po), Task6); Console.WriteLine("主线程:{0}线程ID : {1};结束{2}", "Client3", Thread.CurrentThread.ManagedThreadId, DateTime.Now); } private void Task1(string data) { Thread.Sleep(5000); Console.WriteLine("任务名:{0}线程ID : {1}", data, Thread.CurrentThread.ManagedThreadId); } // 打印数字 private void Task5(ParallelOptions po) { Console.WriteLine("进入Task5线程ID : {0}", Thread.CurrentThread.ManagedThreadId); int i = 0; while (i < 100) { // 判断是否已经取消 if (po.CancellationToken.IsCancellationRequested) { Console.WriteLine("已经被取消。"); return; } Thread.Sleep(100); Console.Write(i + " "); Interlocked.Increment(ref i); } } /// <summary> /// 10秒后取消 /// </summary> private void Task6() { Console.WriteLine("进入取消任务,Task6线程ID : {0}", Thread.CurrentThread.ManagedThreadId); Thread.Sleep(1000 * 10); _cts.Cancel(); Console.WriteLine("发起取消请求..........."); }
2.3 中断并行
2.3.1 break与stop
如何中途退出并行循环?
是的,在串行代码中我们break一下就搞定了,但是并行就不是这么简单了,不过没关系,在并行循环的委托参数中
提供了一个ParallelLoopState,该实例提供了Break和Stop方法来帮我们实现。
Break: 当然这个是通知并行计算尽快的退出循环,比如并行计算正在迭代100,那么break后程序还会迭代所有小于100的。
Stop:这个就不一样了,比如正在迭代100突然遇到stop,那它啥也不管了,直接退出。
using System; using System.Collections.Concurrent; using System.Threading.Tasks; namespace ConsoleApp1 { class Program { static void Main(string[] args) { ConcurrentBag<int> bag = new ConcurrentBag<int>(); Parallel.For(0, 20000000, (i, state) => { if (bag.Count == 1000) { //state.Break(); state.Stop(); return; } bag.Add(i); }); Console.WriteLine("当前集合有{0}个元素。", bag.Count); } } }
2.3.2 Cancel
using System; using System.Threading; using System.Threading.Tasks; namespace ConsoleApp1 { class Program { public static void Main() { var cts = new CancellationTokenSource(); var ct = cts.Token; Task.Factory.StartNew(() => fun(ct)); Console.ReadKey(); //Thread.Sleep(3000); cts.Cancel(); Console.WriteLine("任务取消了!"); } static void fun(CancellationToken token) { Parallel.For(0, 100000, new ParallelOptions { CancellationToken = token }, (i) => { Console.WriteLine("针对数组索引{0}的一些工作代码……ThreadId={1}", i, Thread.CurrentThread.ManagedThreadId); }); } } }