并行Linq

简介: 并行Linq

并行LINQ

并行查询

.NET4在System.Linq名称空间中包含一个新类ParalleIEnumerable ,可以分解查询的工作使其分布在多个线程上。尽管Enmerable类给IEnunerable<T>接口定义了扩展方法,但


ParalleIEnumerable 类的大多数扩展方法是ParallelQuery<TSource>类的扩展。一个重要的例外是AsParallel()方法,它扩展了IEnumerable<TSource>接口,返回ParallelQuery<TSource>类,所以正常的集合类可以以平行方式查询。


例:


   const int arraySize = 100000000;


           var data = new int[arraySize];


           var r = new Random();


           for (int i = 0; i < arraySize; i++)


           {

               data[i] = r.Next(40);


           }


   现在可以使用LINQ查询筛选数据,获取筛选数据的总和。该查询用where子句定义了一个筛选器,仅会中对应值小于20的项,接着调用聚合函数Sum()方法 。与前面的LINQ查询的唯一区别是,这次调用了AsParallel()方法。


   var sum = (from x in data.AsParallel()


                                  where x < 20


                                  select x).Sum();


与前面的LINQ查询一样,编译器会修改语法,以调用AsParallel()、Where()、Select()和Sum()方法。AsParallel()方法用ParallelEnumerable类定义,以扩展IEnumerable<T>接口,所以对简单的数据调用它。AsParallel()方法返回ParallelQuery<TSource>。因为返回的类型,所以编译器选择的Where()方法是ParallelEnumerable.Where(),而不是Enumerable.Where()。在下面的代码中Select()和Sum()方法也来自ParallelEnumerable类。与Enumerable类的实现代码相反,对于ParallelEnumerable类,查询是分区的,以便多个线程可以同时处理该查询。数组可以分为多个部分,其中每个部分由不同的线程处理,以筛选其余项。完成分区的工作后,就需要合并,获得所有部分的总和。


       var sum=data.AsParallel().Where(x=>x<20).Select(x=>x).Sum();


   运行这行代码就会启动任务管理器,这样就可以看出系统的所有CPU都在忙碌。如果删除AsParallel()方法,就不可能使用多个CPU。当然,如果系统上没有多个CPU,就不会看到并行版本带来改进。


分区器

AsParallel()方法不仅扩展了IEnumerable<T>接口,还扩展了Partition类。通过它,可以影响要创建的分区。


   Partitioner类用System.Collection.Concurrent命名空间定义,并且有不同变体。Create方法接受实现了IList<T>类的数组或对象。根据这一点,以及类型的参数loadBalance和该方法的一些重载版本,会返回一个不同的Partitioner类型。对于数组,.Net4包含派生自抽象基类OrderablePartitioner<TSource>的DynamicPartitionerForArray<TSource>类和StaticPartitionerForArray<TSource>类。


var q1 = (from x in Partitioner.Create(data).AsParallel()


                     where x < 20


                    select x).Sum();


也可以调用WithExecutionMode()和WithDegreeOfParallelism()方法可以传递ParallelExecutionMode的一个Default值或者ForceParallelism值。默认情况下,并行LINQ避免使用系统开销很高的并行机制。对于WithDegreeOfParallelism()方法,可以传递一个整数值,以指定并行运行的最大任务数。


例:


 const int arraySize = 100000000;


           var data = new int[arraySize];


           var r = new Random();


           for (int i = 0; i < arraySize; i++)


           {

               data[i] = r.Next(40);


           }


           Stopwatch watch = new Stopwatch();


           watch.Start();


           //一种写法,没有添加动态负载均衡,执行完所需要的时间1300毫秒


           var q1 = (from x in Partitioner.Create(data).AsParallel()


                     where x < 80


                    select x).Sum();


       //第二种写法,添加了动态负载均衡,执行完所需要的时间为660毫秒。


var q1 = (from x in Partitioner.Create(data,true).AsParallel()


                     where x < 80


                    select x).Sum();


           watch.Stop();


           Console.WriteLine(watch.ElapsedMilliseconds.ToString());


取消

.Net提供了一种标准方式,来取消长时间运行的任务,这也适用于并行LINQ。要取消长时间的查询,可以给查询添加WithCancellation()方法,并传递一个CancellationToken令牌作为参数。CancellationToken令牌从CancellationTokenSource类中创建。该查询在单独的线程中运行,在该线程中,捕获一个OperationCancelException类型的异常。如果取消了查询,就触发这个异常。在主线程中,调用CancellationTokenSource类的Cancel()方法可以取消任务。


 const int arraySize = 100000000;


           var data = new int[arraySize];


           var r = new Random();


           for (int i = 0; i < arraySize; i++)


           {

               data[i] = r.Next(40);


           }


           var cts = new CancellationTokenSource();


           new Thread(() =>


               {

                   try


                   {

                       var sum = (from x in data.AsParallel().WithCancellation(cts.Token)


                                  where x < 80


                                  select x).Sum();


                       Console.WriteLine("query finished, sum: {0}", sum);


                   }


                   catch (OperationCanceledException ex)


                   {

                       Console.WriteLine(ex.Message);


                   }


               }).Start();


           Console.WriteLine("query started");


           Console.Write("cancel? ");


           int input = Console.Read();


           if (input == 'Y' || input == 'y')


           {

               // cancel!


               cts.Cancel();


              }


 


目录
相关文章
|
开发框架 .NET 大数据
C#使用linq查询大数据集的方法
这篇文章主要介绍了C#使用linq查询大数据集的方法,涉及C#调用linq进行数据查询的技巧,具有一定参考借鉴价值,需要的朋友可以参考下
2120 0
|
.NET 开发框架 存储
|
SQL .NET 开发框架