每次写博客,第一句话都是这样的:程序员很苦逼,除了会写程序,还得会写博客!
当然,题外话说多了,咱进入正题!
背景
基于任务的程序设计、命令式数据并行和任务并行都要求能够支持并发更新的数组、列表和集合。
在.NET Framework 4 以前,为了让共享的数组、列表和集合能够被多个线程更新,需要添加复杂的代码来同步这些更新操作。
如您需要编写一个并行循环,这个循环以无序的方式向一个共享集合中添加元素,那么必须加入一个同步机制来保证这是一个线程安全的集合。
System.Collenctions和System.Collenctions.Generic 名称空间中所提供的经典列表、集合和数组的线程都不是安全的,不能接受并发请求,因此需要对相应的操作方法执行串行化。
下面看代码,代码中并没有实现线程安全和串行化:
class Program
{
private static object o = new object(); private static List<Product> _Products { get; set; } /* coder:天才卧龙 * 代码中 创建三个并发线程 来操作_Products 集合 * System.Collections.Generic.List 这个列表在多个线程访问下,不能保证是安全的线程,所以不能接受并发的请求,我们必须对ADD方法的执行进行串行化 */ static void Main(string[] args) { _Products = new List<Product>(); /*创建任务 t1 t1 执行 数据集合添加操作*/ Task t1 = Task.Factory.StartNew(() => { AddProducts(); }); /*创建任务 t2 t2 执行 数据集合添加操作*/ Task t2 = Task.Factory.StartNew(() => { AddProducts(); }); /*创建任务 t3 t3 执行 数据集合添加操作*/ Task t3 = Task.Factory.StartNew(() => { AddProducts(); }); Task.WaitAll(t1, t2, t3); Console.WriteLine(_Products.Count); Console.ReadLine(); } /*执行集合数据添加操作*/ static void AddProducts() { Parallel.For(0, 1000, (i) => { Product product = new Product(); product.Name = "name" + i; product.Category = "Category" + i; product.SellPrice = i; _Products.Add(product); }); } } class Product { public string Name { get; set; } public string Category { get; set; } public int SellPrice { get; set; } }
View Code
代码中开启了三个并发操作,每个操作都向集合中添加1000条数据,在没有保障线程安全和串行化的运行下,实际得到的数据并没有3000条,结果如下:
为此我们需要采用Lock关键字,来确保每次只有一个线程来访问 _Products.Add(product); 这个方法,代码如下:
class Program
{
private static object o = new object(); private static List<Product> _Products { get; set; } /* coder:天才卧龙 * 代码中 创建三个并发线程 来操作_Products 集合 * System.Collections.Generic.List 这个列表在多个线程访问下,不能保证是安全的线程,所以不能接受并发的请求,我们必须对ADD方法的执行进行串行化 */ static void Main(string[] args) { _Products = new List<Product>(); /*创建任务 t1 t1 执行 数据集合添加操作*/ Task t1 = Task.Factory.StartNew(() => { AddProducts(); }); /*创建任务 t2 t2 执行 数据集合添加操作*/ Task t2 = Task.Factory.StartNew(() => { AddProducts(); }); /*创建任务 t3 t3 执行 数据集合添加操作*/ Task t3 = Task.Factory.StartNew(() => { AddProducts(); }); Task.WaitAll(t1, t2, t3); Console.WriteLine("当前数据量为:" + _Products.Count); Console.ReadLine(); } /*执行集合数据添加操作*/ static void AddProducts() { Parallel.For(0, 1000, (i) => { Product product = new Product(); product.Name = "name" + i; product.Category = "Category" + i; product.SellPrice = i; lock (o) { _Products.Add(product); } }); } } class Product { public string Name { get; set; } public string Category { get; set; } public int SellPrice { get; set; } }
View Code
但是锁的引入,带来了一定的开销和性能的损耗,并降低了程序的扩展性,在并发编程中显然不适用。
System.Collections.Concurrent
.NET Framework 4提供了新的线程安全和扩展的并发集合,它们能够解决潜在的死锁问题和竞争条件问题,因此在很多复杂的情形下它们能够使得并行代码更容易编写,这些集合尽可能减少需要使用锁的次数,从而使得在大部分情形下能够优化为最佳性能,不会产生不必要的同步开销。
需要注意的是:
线程安全并不是没有代价的,比起System.Collenctions和System.Collenctions.Generic命名空间中的列表、集合和数组来说,并发集合会有更大的开销。因此,应该只在需要从多个任务中并发访问集合的时候才使用并发几个,在串行代码中使用并发集合是没有意义的,因为它们会增加无谓的开销。
为此,在.NET Framework中提供了System.Collections.Concurrent新的命名空间可以访问用于解决线程安全问题,通过这个命名空间能访问以下为并发做好了准备的集合。
1.BlockingCollection 与经典的阻塞队列数据结构类似,能够适用于多个任务添加和删除数据,提供阻塞和限界能力。
2.ConcurrentBag 提供对象的线程安全的无序集合
3.ConcurrentDictionary 提供可有多个线程同时访问的键值对的线程安全集合
4.ConcurrentQueue 提供线程安全的先进先出集合
5.ConcurrentStack 提供线程安全的后进先出集合
这些集合通过使用比较并交换和内存屏障等技术,避免使用典型的互斥重量级的锁,从而保证线程安全和性能。
ConcurrentQueue
ConcurrentQueue 是完全无锁的,能够支持并发的添加元素,先进先出。下面贴代码,详解见注释:
class Program
{
private static object o = new object(); /*定义 Queue*/ private static Queue<Product> _Products { get; set; } private static ConcurrentQueue<Product> _ConcurrenProducts { get; set; } /* coder:天才卧龙 * 代码中 创建三个并发线程 来操作_Products 和 _ConcurrenProducts 集合,每次添加 10000 条数据 查看 一般队列Queue 和 多线程安全下的队列ConcurrentQueue 执行情况 */ static void Main(string[] args) { Thread.Sleep(1000); _Products = new Queue<Product>(); Stopwatch swTask = new Stopwatch(); swTask.Start(); /*创建任务 t1 t1 执行 数据集合添加操作*/ Task t1 = Task.Factory.StartNew(() => { AddProducts(); }); /*创建任务 t2 t2 执行 数据集合添加操作*/ Task t2 = Task.Factory.StartNew(() => { AddProducts(); }); /*创建任务 t3 t3 执行 数据集合添加操作*/ Task t3 = Task.Factory.StartNew(() => { AddProducts(); }); Task.WaitAll(t1, t2, t3); swTask.Stop(); Console.WriteLine("List<Product> 当前数据量为:" + _Products.Count); Console.WriteLine("List<Product> 执行时间为:" + swTask.ElapsedMilliseconds); Thread.Sleep(1000); _ConcurrenProducts = new ConcurrentQueue<Product>(); Stopwatch swTask1 = new Stopwatch(); swTask1.Start(); /*创建任务 tk1 tk1 执行 数据集合添加操作*/ Task tk1 = Task.Factory.StartNew(() => { AddConcurrenProducts(); }); /*创建任务 tk2 tk2 执行 数据集合添加操作*/ Task tk2 = Task.Factory.StartNew(() => { AddConcurrenProducts(); }); /*创建任务 tk3 tk3 执行 数据集合添加操作*/ Task tk3 = Task.Factory.StartNew(() => { AddConcurrenProducts(); }); Task.WaitAll(tk1, tk2, tk3); swTask1.Stop(); Console.WriteLine("ConcurrentQueue<Product> 当前数据量为:" + _ConcurrenProducts.Count); Console.WriteLine("ConcurrentQueue<Product> 执行时间为:" + swTask1.ElapsedMilliseconds); Console.ReadLine(); } /*执行集合数据添加操作*/ static void AddProducts() { Parallel.For(0, 30000, (i) => { Product product = new Product(); product.Name = "name" + i;