C#并行编程-并发集合

简介: 原文:C#并行编程-并发集合菜鸟学习并行编程,参考《C#并行编程高级教程.PDF》,如有错误,欢迎指正。 背景 基于任务的程序设计、命令式数据并行和任务并行都要求能够支持并发更新的数组、列表和集合。
原文: C#并行编程-并发集合

菜鸟学习并行编程,参考《C#并行编程高级教程.PDF》,如有错误,欢迎指正。

背景

基于任务的程序设计、命令式数据并行和任务并行都要求能够支持并发更新的数组、列表和集合。

在.NET Framework 4 以前,为了让共享的数组、列表和集合能够被多个线程更新,需要添加复杂的代码来同步这些更新操作。

如您需要编写一个并行循环,这个循环以无序的方式向一个共享集合中添加元素,那么必须加入一个同步机制来保证这是一个线程安全的集合。

System.Collenctions和System.Collenctions.Generic 名称空间中所提供的经典列表、集合和数组的线程都不是安全的,不能接受并发请求,因此需要对相应的操作方法执行串行化。

下面看代码,代码中并没有实现线程安全和串行化:

    class Program
    {
        private static object o = new object();
        private static List<Product> _Products { get; set; }
        /*  coder:释迦苦僧  
         *  代码中 创建三个并发线程 来操作_Products 集合
         *  System.Collections.Generic.List 这个列表在多个线程访问下,不能保证是安全的线程,所以不能接受并发的请求,我们必须对ADD方法的执行进行串行化
         */
        static void Main(string[] args)
        {
            _Products = new List<Product>();
            /*创建任务 t1  t1 执行 数据集合添加操作*/
            Task t1 = Task.Factory.StartNew(() =>
            {
                AddProducts();
            });
            /*创建任务 t2  t2 执行 数据集合添加操作*/
            Task t2 = Task.Factory.StartNew(() =>
            {
                AddProducts();
            });
            /*创建任务 t3  t3 执行 数据集合添加操作*/
            Task t3 = Task.Factory.StartNew(() =>
            {
                AddProducts();
            });
            Task.WaitAll(t1, t2, t3);
            Console.WriteLine(_Products.Count);
            Console.ReadLine();
        }

        /*执行集合数据添加操作*/
        static void AddProducts()
        {
            Parallel.For(0, 1000, (i) =>
            {
                Product product = new Product();
                product.Name = "name" + i;
                product.Category = "Category" + i;
                product.SellPrice = i;
                _Products.Add(product);
            });

        }
    }

    class Product
    {
        public string Name { get; set; }
        public string Category { get; set; }
        public int SellPrice { get; set; }
    }
View Code

代码中开启了三个并发操作,每个操作都向集合中添加1000条数据,在没有保障线程安全和串行化的运行下,实际得到的数据并没有3000条,结果如下:
 

为此我们需要采用Lock关键字,来确保每次只有一个线程来访问  _Products.Add(product); 这个方法,代码如下:

    class Program
    {
        private static object o = new object();
        private static List<Product> _Products { get; set; }
        /*  coder:释迦苦僧  
         *  代码中 创建三个并发线程 来操作_Products 集合
         *  System.Collections.Generic.List 这个列表在多个线程访问下,不能保证是安全的线程,所以不能接受并发的请求,我们必须对ADD方法的执行进行串行化
         */
        static void Main(string[] args)
        {
            _Products = new List<Product>();
            /*创建任务 t1  t1 执行 数据集合添加操作*/
            Task t1 = Task.Factory.StartNew(() =>
            {
                AddProducts();
            });
            /*创建任务 t2  t2 执行 数据集合添加操作*/
            Task t2 = Task.Factory.StartNew(() =>
            {
                AddProducts();
            });
            /*创建任务 t3  t3 执行 数据集合添加操作*/
            Task t3 = Task.Factory.StartNew(() =>
            {
                AddProducts();
            });
            Task.WaitAll(t1, t2, t3);
            Console.WriteLine("当前数据量为:" + _Products.Count);
            Console.ReadLine();
        }

        /*执行集合数据添加操作*/
        static void AddProducts()
        {
            Parallel.For(0, 1000, (i) =>
            {
                Product product = new Product();
                product.Name = "name" + i;
                product.Category = "Category" + i;
                product.SellPrice = i;
                lock (o)
                {
                    _Products.Add(product);
                }
            });

        }
    }

    class Product
    {
        public string Name { get; set; }
        public string Category { get; set; }
        public int SellPrice { get; set; }
    }
View Code

但是锁的引入,带来了一定的开销和性能的损耗,并降低了程序的扩展性,在并发编程中显然不适用。

 

System.Collections.Concurrent

.NET Framework 4提供了新的线程安全和扩展的并发集合,它们能够解决潜在的死锁问题和竞争条件问题,因此在很多复杂的情形下它们能够使得并行代码更容易编写,这些集合尽可能减少需要使用锁的次数,从而使得在大部分情形下能够优化为最佳性能,不会产生不必要的同步开销。

需要注意的是:

线程安全并不是没有代价的,比起System.Collenctions和System.Collenctions.Generic命名空间中的列表、集合和数组来说,并发集合会有更大的开销。因此,应该只在需要从多个任务中并发访问集合的时候才使用并发几个,在串行代码中使用并发集合是没有意义的,因为它们会增加无谓的开销。

 

为此,在.NET Framework中提供了System.Collections.Concurrent新的命名空间可以访问用于解决线程安全问题,通过这个命名空间能访问以下为并发做好了准备的集合。

1.BlockingCollection 与经典的阻塞队列数据结构类似,能够适用于多个任务添加和删除数据,提供阻塞和限界能力。

2.ConcurrentBag 提供对象的线程安全的无序集合

3.ConcurrentDictionary  提供可有多个线程同时访问的键值对的线程安全集合

4.ConcurrentQueue   提供线程安全的先进先出集合

5.ConcurrentStack   提供线程安全的后进先出集合

这些集合通过使用比较并交换和内存屏障等技术,避免使用典型的互斥重量级的锁,从而保证线程安全和性能。

 

ConcurrentQueue  

ConcurrentQueue 是完全无锁的,能够支持并发的添加元素,先进先出。下面贴代码,详解见注释:

    class Program
    {
        private static object o = new object();
        /*定义 Queue*/
        private static Queue<Product> _Products { get; set; }
        private static ConcurrentQueue<Product> _ConcurrenProducts { get; set; }
        /*  coder:释迦苦僧  
         *  代码中 创建三个并发线程 来操作_Products 和 _ConcurrenProducts 集合,每次添加 10000 条数据 查看 一般队列Queue 和 多线程安全下的队列ConcurrentQueue 执行情况
         */
        static void Main(string[] args)
        {
            Thread.Sleep(1000);
            _Products = new Queue<Product>();
            Stopwatch swTask = new Stopwatch();
            swTask.Start();

            /*创建任务 t1  t1 执行 数据集合添加操作*/
            Task t1 = Task.Factory.StartNew(() =>
            {
                AddProducts();
            });
            /*创建任务 t2  t2 执行 数据集合添加操作*/
            Task t2 = Task.Factory.StartNew(() =>
            {
                AddProducts();
            });
            /*创建任务 t3  t3 执行 数据集合添加操作*/
            Task t3 = Task.Factory.StartNew(() =>
            {
                AddProducts();
            });

            Task.WaitAll(t1, t2, t3);
            swTask.Stop();
            Console.WriteLine("List<Product> 当前数据量为:" + _Products.Count);
            Console.WriteLine("List<Product> 执行时间为:" + swTask.ElapsedMilliseconds);

            Thread.Sleep(1000);
            _ConcurrenProducts = new ConcurrentQueue<Product>();
            Stopwatch swTask1 = new Stopwatch();
            swTask1.Start();

            /*创建任务 tk1  tk1 执行 数据集合添加操作*/
            Task tk1 = Task.Factory.StartNew(() =>
            {
                AddConcurrenProducts();
            });
            /*创建任务 tk2  tk2 执行 数据集合添加操作*/
            Task tk2 = Task.Factory.StartNew(() =>
            {
                AddConcurrenProducts();
            });
            /*创建任务 tk3  tk3 执行 数据集合添加操作*/
            Task tk3 = Task.Factory.StartNew(() =>
            {
                AddConcurrenProducts();
            });

            Task.WaitAll(tk1, tk2, tk3);
            swTask1.Stop();
            Console.WriteLine("ConcurrentQueue<Product> 当前数据量为:" + _ConcurrenProducts.Count);
            Console.WriteLine("ConcurrentQueue<Product> 执行时间为:" + swTask1.ElapsedMilliseconds);
            Console.ReadLine();
        }

        /*执行集合数据添加操作*/
        static void AddProducts()
        {
            Parallel.For(0, 30000, (i) =>
            {
                Product product = new Product();
                product.Name = "name" + i;
                product.Category = "Category" + i;
                product.SellPrice = i;
                lock (o)
                {
                    _Products.Enqueue(product);
                }
            });

        }
        /*执行集合数据添加操作*/
        static void AddConcurrenProducts()
        {
            Parallel.For(0, 30000, (i) =>
            {
                Product product = new Product();
                product.Name = "name" + i;
                product.Category = "Category" + i;
                product.SellPrice = i;
                _ConcurrenProducts.Enqueue(product);
            });

        }
    }

    class Product
    {
        public string Name { get; set; }
        public string Category { get; set; }
        public int SellPrice { get; set; }
    }
View Code

需要注意的是,代码中的输出时间并不能够完全正确的展示出并发代码下的ConcurrentQueue性能,采用ConcurrentQueue在一定程度上也带来了损耗,如下图所示:

ConcurrentQueue 还有另外两种方法:TryDequeue  尝试移除并返回 和 TryPeek 尝试返回但不移除,下面贴代码:

    class Program
    {
        private static object o = new object();
        private static ConcurrentQueue<Product> _ConcurrenProducts { get; set; }
        /*  coder:释迦苦僧  
         *  ConcurrentQueue  下的 TryPeek 和 TryDequeue
         */
        static void Main(string[] args)
        {
            _ConcurrenProducts = new ConcurrentQueue<Product>();
            /*执行添加操作*/
            Console.WriteLine("执行添加操作");
            Parallel.Invoke(AddConcurrenProducts, AddConcurrenProducts);
            Console.WriteLine("ConcurrentQueue<Product> 当前数据量为:" + _ConcurrenProducts.Count);
            /*执行TryPeek操作   尝试返回不移除*/
            Console.WriteLine("执行TryPeek操作   尝试返回不移除");
            Parallel.Invoke(PeekConcurrenProducts, PeekConcurrenProducts);
            Console.WriteLine("ConcurrentQueue<Product> 当前数据量为:" + _ConcurrenProducts.Count);

            /*执行TryDequeue操作  尝试返回并移除*/
            Console.WriteLine("执行TryDequeue操作  尝试返回并移除");
            Parallel.Invoke(DequeueConcurrenProducts, DequeueConcurrenProducts);
            Console.WriteLine("ConcurrentQueue<Product> 当前数据量为:" + _ConcurrenProducts.Count);

            Console.ReadLine();
        }

        /*执行集合数据添加操作*/
        static void AddConcurrenProducts()
        {
            Parallel.For(0, 100, (i) =>
            {
                Product product = new Product();
                product.Name = "name" + i;
                product.Category = "Category" + i;
                product.SellPrice = i;
                _ConcurrenProducts.Enqueue(product);
            });
        }
        /*尝试返回 但不移除*/
        static void PeekConcurrenProducts()
        {
            Parallel.For(0, 2, (i) =>
            {
                Product product = null;
                bool excute = _ConcurrenProducts.TryPeek(out product);
                Console.WriteLine(product.Name);
            });
        }
        /*尝试返回 并 移除*/
        static void DequeueConcurrenProducts()
        {
            Parallel.For(0, 2, (i) =>
            {
                Product product = null;
                bool excute = _ConcurrenProducts.TryDequeue(out product);
                Console.WriteLine(product.Name);
            });
        }
    }

    class Product
    {
        public string Name { get; set; }
        public string Category { get; set; }
        public int SellPrice { get; set; }
    }
View Code

需要注意 TryDequeue  和  TryPeek 的无序性,在多线程下

ConcurrentStack  是完全无锁的,能够支持并发的添加元素,后进先出。下面贴代码,详解见注释:

        private static object o = new object();
        /*定义 Stack*/
        private static Stack<Product> _Products { get; set; }
        private static ConcurrentStack<Product> _ConcurrenProducts { get; set; }
        /*  coder:释迦苦僧  
         *  代码中 创建三个并发线程 来操作_Products 和 _ConcurrenProducts 集合,每次添加 30000 条数据 查看 一般Stack 和 多线程安全下的 ConcurrentStack 执行情况
         */
        static void Main(string[] args)
        {
            Thread.Sleep(1000);
            _Products = new Stack<Product>();
            Stopwatch swTask = new Stopwatch();
            swTask.Start();

            /*创建任务 t1  t1 执行 数据集合添加操作*/
            Task t1 = Task.Factory.StartNew(() =>
            {
                AddProducts();
            });
            /*创建任务 t2  t2 执行 数据集合添加操作*/
            Task t2 = Task.Factory.StartNew(() =>
            {
                AddProducts();
            });
            /*创建任务 t3  t3 执行 数据集合添加操作*/
            Task t3 = Task.Factory.StartNew(() =>
            {
                AddProducts();
            });

            Task.WaitAll(t1, t2, t3);
            swTask.Stop();
            Console.WriteLine("List<Product> 当前数据量为:" + _Products.Count);
            Console.WriteLine("List<Product> 执行时间为:" + swTask.ElapsedMilliseconds);

            Thread.Sleep(1000);
            _ConcurrenProducts = new ConcurrentStack<Product>();
            Stopwatch swTask1 = new Stopwatch();
            swTask1.Start();

            /*创建任务 tk1  tk1 执行 数据集合添加操作*/
            Task tk1 = Task.Factory.StartNew(() =>
            {
                AddConcurrenProducts();
            });
            /*创建任务 tk2  tk2 执行 数据集合添加操作*/
            Task tk2 = Task.Factory.StartNew(() =>
            {
                AddConcurrenProducts();
            });
            /*创建任务 tk3  tk3 执行 数据集合添加操作*/
            Task tk3 = Task.Factory.StartNew(() =>
            {
                AddConcurrenProducts();
            });

            Task.WaitAll(tk1, tk2, tk3);
            swTask1.Stop();
            Console.WriteLine("ConcurrentStack<Product> 当前数据量为:" + _ConcurrenProducts.Count);
            Console.WriteLine("ConcurrentStack<Product> 执行时间为:" + swTask1.ElapsedMilliseconds);
            Console.ReadLine();
        }

        /*执行集合数据添加操作*/
        static void AddProducts()
        {
            Parallel.For(0, 30000, (i) =>
            {
                Product product = new Product();
                product.Name = "name" + i;
                product.Category = "Category" + i;
                product.SellPrice = i;
                lock (o)
                {
                    _Products.Push(product);
                }
            });

        }
        /*执行集合数据添加操作*/
        static void AddConcurrenProducts()
        {
            Parallel.For(0, 30000, (i) =>
            {
                Product product = new Product();
                product.Name = "name" + i;
                product.Category = "Category" + i;
                product.SellPrice = i;
                _ConcurrenProducts.Push(product);
            });

        }
    }

    class Product
    {
        public string Name { get; set; }
        public string Category { get; set; }
        public int SellPrice { get; set; }
    }
View Code

ConcurrentStack 还有另外两种方法:TryPop 尝试移除并返回 和 TryPeek 尝试返回但不移除,下面贴代码:

    class Program
    {
        private static object o = new object();
        private static ConcurrentStack<Product> _ConcurrenProducts { get; set; }
        /*  coder:释迦苦僧  
         *  ConcurrentQueue  下的 TryPeek 和 TryPop
         */
        static void Main(string[] args)
        {
            _ConcurrenProducts = new ConcurrentStack<Product>();
            /*执行添加操作*/
            Console.WriteLine("执行添加操作");
            Parallel.Invoke(AddConcurrenProducts, AddConcurrenProducts);
            Console.WriteLine("ConcurrentStack<Product> 当前数据量为:" + _ConcurrenProducts.Count);
            /*执行TryPeek操作   尝试返回不移除*/
            Console.WriteLine("执行TryPeek操作   尝试返回不移除");
            Parallel.Invoke(PeekConcurrenProducts, PeekConcurrenProducts);
            Console.WriteLine("ConcurrentStack<Product> 当前数据量为:" + _ConcurrenProducts.Count);

            /*执行TryDequeue操作  尝试返回并移除*/
            Console.WriteLine("执行TryPop操作  尝试返回并移除");
            Parallel.Invoke(PopConcurrenProducts, PopConcurrenProducts);
            Console.WriteLine("ConcurrentStack<Product> 当前数据量为:" + _ConcurrenProducts.Count);

            Console.ReadLine();
        }

        /*执行集合数据添加操作*/
        static void AddConcurrenProducts()
        {
            Parallel.For(0, 100, (i) =>
            {
                Product product = new Product();
                product.Name = "name" + i;
                product.Category = "Category" + i;
                product.SellPrice = i;
                _ConcurrenProducts.Push(product);
            });
        }
        /*尝试返回 但不移除*/
        static void PeekConcurrenProducts()
        {
            Parallel.For(0, 2, (i) =>
            {
                Product product = null;
                bool excute = _ConcurrenProducts.TryPeek(out product);
                Console.WriteLine(product.Name);
            });
        }
        /*尝试返回 并 移除*/
        static void PopConcurrenProducts()
        {
            Parallel.For(0, 2, (i) =>
            {
                Product product = null;
                bool excute = _ConcurrenProducts.TryPop(out product);
                Console.WriteLine(product.Name);
            });
        }
    }

    class Product
    {
        public string Name { get; set; }
        public string Category { get; set; }
        public int SellPrice { get; set; }
    }
View Code

对于并发下的其他集合,我这边就不做代码案列了,大家可以通过下面的链接查看,如有问题,欢迎指正

http://msdn.microsoft.com/zh-cn/library/system.collections.concurrent(v=vs.110).aspx

 

作者:释迦苦僧 出处:http://www.cnblogs.com/woxpp/p/3935557.html
本文版权归作者和博客园共有,欢迎转载,但未经作者同意必须保留此段声明,且在文章页面明显位置给出原文连接。

目录
相关文章
|
21天前
|
开发框架 前端开发 .NET
C#编程与Web开发
【4月更文挑战第21天】本文探讨了C#在Web开发中的应用,包括使用ASP.NET框架、MVC模式、Web API和Entity Framework。C#作为.NET框架的主要语言,结合这些工具,能创建动态、高效的Web应用。实际案例涉及企业级应用、电子商务和社交媒体平台。尽管面临竞争和挑战,但C#在Web开发领域的前景将持续拓展。
|
21天前
|
SQL 开发框架 安全
C#编程与多线程处理
【4月更文挑战第21天】探索C#多线程处理,提升程序性能与响应性。了解C#中的Thread、Task类及Async/Await关键字,掌握线程同步与安全,实践并发计算、网络服务及UI优化。跟随未来发展趋势,利用C#打造高效应用。
|
21天前
|
存储 安全 网络安全
C#编程的安全性与加密技术
【4月更文挑战第21天】C#在.NET框架支持下,以其面向对象和高级特性成为安全软件开发的利器。本文探讨C#在安全加密领域的应用,包括使用System.Security.Cryptography库实现加密算法,利用SSL/TLS保障网络传输安全,进行身份验证,并强调编写安全代码的重要性。实际案例涵盖在线支付、企业应用和文件加密,展示了C#在应对安全挑战的同时,不断拓展其在该领域的潜力和未来前景。
|
21天前
|
人工智能 C# 开发者
C#编程中的图形界面设计
【4月更文挑战第21天】本文探讨了C#在GUI设计中的应用,介绍了Windows Forms、WPF和UWP等常用框架,强调了简洁界面、响应式设计和数据绑定等最佳实践。通过实际案例,展示了C#在企业应用、游戏开发和移动应用中的GUI实现。随着技术发展,C#在GUI设计的未来将趋向于跨平台、更丰富的组件和AI集成,为开发者创造更多可能性。
|
21天前
|
存储 算法 C#
C#编程与数据结构的结合
【4月更文挑战第21天】本文探讨了C#如何结合数据结构以构建高效软件,强调数据结构在C#中的重要性。C#作为面向对象的编程语言,提供内置数据结构如List、Array和Dictionary,同时也支持自定义数据结构。文章列举了C#实现数组、链表、栈、队列等基础数据结构的示例,并讨论了它们在排序、图算法和数据库访问等场景的应用。掌握C#数据结构有助于编写高性能、可维护的代码。
|
21天前
|
开发框架 Linux C#
C#编程的跨平台应用
【4月更文挑战第21天】C#与.NET Core的结合使得跨平台应用开发变得高效便捷,提供统一编程模型和高性能。丰富的类库、活跃的社区支持及Visual Studio Code、Xamarin等工具强化了其优势。广泛应用在企业系统、云服务和游戏开发中,虽面临挑战,但随着技术进步,C#在跨平台开发领域的前景广阔。
|
21天前
|
人工智能 C# 云计算
C#编程的未来发展趋向
【4月更文挑战第21天】C#编程未来将深化跨平台支持,强化云计算与容器技术集成,如.NET Core、Docker。在AI和ML领域,C#将提供更丰富框架,与AI芯片集成。语言和工具将持续创新,优化异步编程,如Task、async和await,提升多核性能。开源生态的壮大将吸引更多开发者,共创更多机遇。
|
2月前
|
C#
24. C# 编程:用户设定敌人初始血值的实现
24. C# 编程:用户设定敌人初始血值的实现
23 0
|
21天前
|
程序员 C#
C#编程中的面向对象编程思想
【4月更文挑战第21天】本文探讨了C#中的面向对象编程,包括类、对象、封装、继承和多态。类是对象的抽象,定义属性和行为;对象是类的实例。封装隐藏内部细节,只暴露必要接口。继承允许类复用和扩展属性与行为,而多态使不同类的对象能通过相同接口调用方法。C#通过访问修饰符实现封装,使用虚方法和抽象方法实现多态。理解并应用这些概念,能提升代码的清晰度和可扩展性,助你成为更好的C#程序员。
|
21天前
|
开发框架 安全 .NET
C#编程高手的成长之路
【4月更文挑战第21天】本文揭示了成为C#编程高手的路径:牢固掌握基础知识和面向对象编程,深入了解C#特性如泛型和委托,精通ASP.NET等框架工具,养成良好编程习惯,持续学习实践并参与开源项目,勇于挑战创新。通过这些步骤,不断提升编程技能,迈向C#编程的巅峰。