C#并发处理-锁OR线程安全?

简介: 每次写博客,第一句话都是这样的:程序员很苦逼,除了会写程序,还得会写博客! 当然,题外话说多了,咱进入正题! 背景 基于任务的程序设计、命令式数据并行和任务并行都要求能够支持并发更新的数组、列表和集合。

每次写博客,第一句话都是这样的:程序员很苦逼,除了会写程序,还得会写博客!

当然,题外话说多了,咱进入正题!

背景

基于任务的程序设计、命令式数据并行和任务并行都要求能够支持并发更新的数组、列表和集合。

在.NET Framework 4 以前,为了让共享的数组、列表和集合能够被多个线程更新,需要添加复杂的代码来同步这些更新操作。

如您需要编写一个并行循环,这个循环以无序的方式向一个共享集合中添加元素,那么必须加入一个同步机制来保证这是一个线程安全的集合。

System.Collenctions和System.Collenctions.Generic 名称空间中所提供的经典列表、集合和数组的线程都不是安全的,不能接受并发请求,因此需要对相应的操作方法执行串行化。

下面看代码,代码中并没有实现线程安全和串行化:

class Program
  {
    private static object o = new object();   private static List<Product> _Products { get; set; }   /* coder:天才卧龙   * 代码中 创建三个并发线程 来操作_Products 集合   * System.Collections.Generic.List 这个列表在多个线程访问下,不能保证是安全的线程,所以不能接受并发的请求,我们必须对ADD方法的执行进行串行化   */   static void Main(string[] args)   {    _Products = new List<Product>();    /*创建任务 t1 t1 执行 数据集合添加操作*/    Task t1 = Task.Factory.StartNew(() =>    {     AddProducts();    });    /*创建任务 t2 t2 执行 数据集合添加操作*/    Task t2 = Task.Factory.StartNew(() =>    {     AddProducts();    });    /*创建任务 t3 t3 执行 数据集合添加操作*/    Task t3 = Task.Factory.StartNew(() =>    {     AddProducts();    });    Task.WaitAll(t1, t2, t3);    Console.WriteLine(_Products.Count);    Console.ReadLine();   }   /*执行集合数据添加操作*/   static void AddProducts()   {    Parallel.For(0, 1000, (i) =>    {     Product product = new Product();     product.Name = "name" + i;     product.Category = "Category" + i;     product.SellPrice = i;     _Products.Add(product);    });   }  }  class Product  {   public string Name { get; set; }   public string Category { get; set; }   public int SellPrice { get; set; }  }
View Code

代码中开启了三个并发操作,每个操作都向集合中添加1000条数据,在没有保障线程安全和串行化的运行下,实际得到的数据并没有3000条,结果如下:

 

为此我们需要采用Lock关键字,来确保每次只有一个线程来访问  _Products.Add(product); 这个方法,代码如下:

class Program
  {
    private static object o = new object();   private static List<Product> _Products { get; set; }   /* coder:天才卧龙   * 代码中 创建三个并发线程 来操作_Products 集合   * System.Collections.Generic.List 这个列表在多个线程访问下,不能保证是安全的线程,所以不能接受并发的请求,我们必须对ADD方法的执行进行串行化   */   static void Main(string[] args)   {    _Products = new List<Product>();    /*创建任务 t1 t1 执行 数据集合添加操作*/    Task t1 = Task.Factory.StartNew(() =>    {     AddProducts();    });    /*创建任务 t2 t2 执行 数据集合添加操作*/    Task t2 = Task.Factory.StartNew(() =>    {     AddProducts();    });    /*创建任务 t3 t3 执行 数据集合添加操作*/    Task t3 = Task.Factory.StartNew(() =>    {     AddProducts();    });    Task.WaitAll(t1, t2, t3);    Console.WriteLine("当前数据量为:" + _Products.Count);    Console.ReadLine();   }   /*执行集合数据添加操作*/   static void AddProducts()   {    Parallel.For(0, 1000, (i) =>    {     Product product = new Product();     product.Name = "name" + i;     product.Category = "Category" + i;     product.SellPrice = i;     lock (o)     {      _Products.Add(product);     }    });   }  }  class Product  {   public string Name { get; set; }   public string Category { get; set; }   public int SellPrice { get; set; }  }
View Code

但是锁的引入,带来了一定的开销和性能的损耗,并降低了程序的扩展性,在并发编程中显然不适用。

System.Collections.Concurrent

.NET Framework 4提供了新的线程安全和扩展的并发集合,它们能够解决潜在的死锁问题和竞争条件问题,因此在很多复杂的情形下它们能够使得并行代码更容易编写,这些集合尽可能减少需要使用锁的次数,从而使得在大部分情形下能够优化为最佳性能,不会产生不必要的同步开销。

需要注意的是:

线程安全并不是没有代价的,比起System.Collenctions和System.Collenctions.Generic命名空间中的列表、集合和数组来说,并发集合会有更大的开销。因此,应该只在需要从多个任务中并发访问集合的时候才使用并发几个,在串行代码中使用并发集合是没有意义的,因为它们会增加无谓的开销。

为此,在.NET Framework中提供了System.Collections.Concurrent新的命名空间可以访问用于解决线程安全问题,通过这个命名空间能访问以下为并发做好了准备的集合。

1.BlockingCollection 与经典的阻塞队列数据结构类似,能够适用于多个任务添加和删除数据,提供阻塞和限界能力。

2.ConcurrentBag 提供对象的线程安全的无序集合

3.ConcurrentDictionary  提供可有多个线程同时访问的键值对的线程安全集合

4.ConcurrentQueue   提供线程安全的先进先出集合

5.ConcurrentStack   提供线程安全的后进先出集合

这些集合通过使用比较并交换和内存屏障等技术,避免使用典型的互斥重量级的锁,从而保证线程安全和性能。

ConcurrentQueue 

ConcurrentQueue 是完全无锁的,能够支持并发的添加元素,先进先出。下面贴代码,详解见注释:

class Program
  {
    private static object o = new object();   /*定义 Queue*/   private static Queue<Product> _Products { get; set; }   private static ConcurrentQueue<Product> _ConcurrenProducts { get; set; }   /* coder:天才卧龙   * 代码中 创建三个并发线程 来操作_Products 和 _ConcurrenProducts 集合,每次添加 10000 条数据 查看 一般队列Queue 和 多线程安全下的队列ConcurrentQueue 执行情况   */   static void Main(string[] args)   {    Thread.Sleep(1000);    _Products = new Queue<Product>();    Stopwatch swTask = new Stopwatch();    swTask.Start();    /*创建任务 t1 t1 执行 数据集合添加操作*/    Task t1 = Task.Factory.StartNew(() =>    {     AddProducts();    });    /*创建任务 t2 t2 执行 数据集合添加操作*/    Task t2 = Task.Factory.StartNew(() =>    {     AddProducts();    });    /*创建任务 t3 t3 执行 数据集合添加操作*/    Task t3 = Task.Factory.StartNew(() =>    {     AddProducts();    });    Task.WaitAll(t1, t2, t3);    swTask.Stop();    Console.WriteLine("List<Product> 当前数据量为:" + _Products.Count);    Console.WriteLine("List<Product> 执行时间为:" + swTask.ElapsedMilliseconds);    Thread.Sleep(1000);    _ConcurrenProducts = new ConcurrentQueue<Product>();    Stopwatch swTask1 = new Stopwatch();    swTask1.Start();    /*创建任务 tk1 tk1 执行 数据集合添加操作*/    Task tk1 = Task.Factory.StartNew(() =>    {     AddConcurrenProducts();    });    /*创建任务 tk2 tk2 执行 数据集合添加操作*/    Task tk2 = Task.Factory.StartNew(() =>    {     AddConcurrenProducts();    });    /*创建任务 tk3 tk3 执行 数据集合添加操作*/    Task tk3 = Task.Factory.StartNew(() =>    {     AddConcurrenProducts();    });    Task.WaitAll(tk1, tk2, tk3);    swTask1.Stop();    Console.WriteLine("ConcurrentQueue<Product> 当前数据量为:" + _ConcurrenProducts.Count);    Console.WriteLine("ConcurrentQueue<Product> 执行时间为:" + swTask1.ElapsedMilliseconds);    Console.ReadLine();   }   /*执行集合数据添加操作*/   static void AddProducts()   {    Parallel.For(0, 30000, (i) =>    {     Product product = new Product();     product.Name = "name" + i;     product.Category = "Category" + i;     product.SellPrice = i;     lock (o)     {      _Products.Enqueue(product);     }    });   }   /*执行集合数据添加操作*/   static void AddConcurrenProducts()   {    Parallel.For(0, 30000, (i) =>    {     Product product = new Product();     product.Name = "name" + i;     product.Category = "Category" + i;     product.SellPrice = i;     _ConcurrenProducts.Enqueue(product);    });   }  }  class Product  {   public string Name { get; set; }   public string Category { get; set; }   public int SellPrice { get; set; }  }
View Code

需要注意的是,代码中的输出时间并不能够完全正确的展示出并发代码下的ConcurrentQueue性能,采用ConcurrentQueue在一定程度上也带来了损耗,如下图所示:

ConcurrentQueue 还有另外两种方法:TryDequeue  尝试移除并返回 和 TryPeek 尝试返回但不移除,下面贴代码:

class Program
  {
    private static object o = new object();   private static ConcurrentQueue<Product> _ConcurrenProducts { get; set; }   /* coder:天才卧龙   * ConcurrentQueue 下的 TryPeek 和 TryDequeue   */   static void Main(string[] args)   {    _ConcurrenProducts = new ConcurrentQueue<Product>();    /*执行添加操作*/    Console.WriteLine("执行添加操作");    Parallel.Invoke(AddConcurrenProducts, AddConcurrenProducts);    Console.WriteLine("ConcurrentQueue<Product> 当前数据量为:" + _ConcurrenProducts.Count);    /*执行TryPeek操作 尝试返回不移除*/    Console.WriteLine("执行TryPeek操作 尝试返回不移除");    Parallel.Invoke(PeekConcurrenProducts, PeekConcurrenProducts);    Console.WriteLine("ConcurrentQueue<Product> 当前数据量为:" + _ConcurrenProducts.Count);    /*执行TryDequeue操作 尝试返回并移除*/    Console.WriteLine("执行TryDequeue操作 尝试返回并移除");    Parallel.Invoke(DequeueConcurrenProducts, DequeueConcurrenProducts);    Console.WriteLine("ConcurrentQueue<Product> 当前数据量为:" + _ConcurrenProducts.Count);    Console.ReadLine();   }   /*执行集合数据添加操作*/   static void AddConcurrenProducts()   {    Parallel.For(0, 100, (i) =>    {     Product product = new Product();     product.Name = "name" + i;     product.Category = "Category" + i;     product.SellPrice = i;     _ConcurrenProducts.Enqueue(product);    });   }   /*尝试返回 但不移除*/   static void PeekConcurrenProducts()   {    Parallel.For(0, 2, (i) =>    {     Product product = null;     bool excute = _ConcurrenProducts.TryPeek(out product);     Console.WriteLine(product.Name);    });   }   /*尝试返回 并 移除*/   static void DequeueConcurrenProducts()   {    Parallel.For(0, 2, (i) =>    {     Product product = null;     bool excute = _ConcurrenProducts.TryDequeue(out product);     Console.WriteLine(product.Name);    });   }  }  class Product  {   public string Name { get; set; }   public string Category { get; set; }   public int SellPrice { get; set; }  }
View Code

需要注意 TryDequeue  和  TryPeek 的无序性,在多线程下

ConcurrentStack   是完全无锁的,能够支持并发的添加元素,后进先出。下面贴代码,详解见注释:

private static object o = new object();
  /*定义 Stack*/   private static Stack<Product> _Products { get; set; }   private static ConcurrentStack<Product> _ConcurrenProducts { get; set; }   /* coder:天才卧龙   * 代码中 创建三个并发线程 来操作_Products 和 _ConcurrenProducts 集合,每次添加 30000 条数据 查看 一般Stack 和 多线程安全下的 ConcurrentStack 执行情况   */   static void Main(string[] args)   {    Thread.Sleep(1000);    _Products = new Stack<Product>();    Stopwatch swTask = new Stopwatch();    swTask.Start();    /*创建任务 t1 t1 执行 数据集合添加操作*/    Task t1 = Task.Factory.StartNew(() =>    {     AddProducts();    });    /*创建任务 t2 t2 执行 数据集合添加操作*/    Task t2 = Task.Factory.StartNew(() =>    {     AddProducts();    });    /*创建任务 t3 t3 执行 数据集合添加操作*/    Task t3 = Task.Factory.StartNew(() =>    {     AddProducts();    });    Task.WaitAll(t1, t2, t3);    swTask.Stop();    Console.WriteLine("List<Product> 当前数据量为:" + _Products.Count);    Console.WriteLine("List<Product> 执行时间为:" + swTask.ElapsedMilliseconds);    Thread.Sleep(1000);    _ConcurrenProducts = new ConcurrentStack<Product>();    Stopwatch swTask1 = new Stopwatch();    swTask1.Start();    /*创建任务 tk1 tk1 执行 数据集合添加操作*/    Task tk1 = Task.Factory.StartNew(() =>    {     AddConcurrenProducts();    });    /*创建任务 tk2 tk2 执行 数据集合添加操作*/    Task tk2 = Task.Factory.StartNew(() =>    {     AddConcurrenProducts();    });    /*创建任务 tk3 tk3 执行 数据集合添加操作*/    Task tk3 = Task.Factory.StartNew(() =>    {     AddConcurrenProducts();    });    Task.WaitAll(tk1, tk2, tk3);    swTask1.Stop();    Console.WriteLine("ConcurrentStack<Product> 当前数据量为:" + _ConcurrenProducts.Count);    Console.WriteLine("ConcurrentStack<Product> 执行时间为:" + swTask1.ElapsedMilliseconds);    Console.ReadLine();   }   /*执行集合数据添加操作*/   static void AddProducts()   {    Parallel.For(0, 30000, (i) =>    {     Product product = new Product();     product.Name = "name" + i;     product.Category = "Category" + i;     product.SellPrice = i;     lock (o)     {      _Products.Push(product);     }    });   }   /*执行集合数据添加操作*/   static void AddConcurrenProducts()   {    Parallel.For(0, 30000, (i) =>    {     Product product = new Product();     product.Name = "name" + i;     product.Category = "Category" + i;     product.SellPrice = i;     _ConcurrenProducts.Push(product);    });   }  }  class Product  {   public string Name { get; set; }   public string Category { get; set; }   public int SellPrice { get; set; }  }
View Code

ConcurrentStack 还有另外两种方法:TryPop 尝试移除并返回 和 TryPeek 尝试返回但不移除,下面贴代码:

class Program
  {
    private static object o = new object();   private static ConcurrentStack<Product> _ConcurrenProducts { get; set; }   /* coder:天才卧龙   * ConcurrentQueue 下的 TryPeek 和 TryPop   */   static void Main(string[] args)   {    _ConcurrenProducts = new ConcurrentStack<Product>();    /*执行添加操作*/    Console.WriteLine("执行添加操作");    Parallel.Invoke(AddConcurrenProducts, AddConcurrenProducts);    Console.WriteLine("ConcurrentStack<Product> 当前数据量为:" + _ConcurrenProducts.Count);    /*执行TryPeek操作 尝试返回不移除*/    Console.WriteLine("执行TryPeek操作 尝试返回不移除");    Parallel.Invoke(PeekConcurrenProducts, PeekConcurrenProducts);    Console.WriteLine("ConcurrentStack<Product> 当前数据量为:" + _ConcurrenProducts.Count);    /*执行TryDequeue操作 尝试返回并移除*/    Console.WriteLine("执行TryPop操作 尝试返回并移除");    Parallel.Invoke(PopConcurrenProducts, PopConcurrenProducts);    Console.WriteLine("ConcurrentStack<Product> 当前数据量为:" + _ConcurrenProducts.Count);    Console.ReadLine();   }   /*执行集合数据添加操作*/   static void AddConcurrenProducts()   {    Parallel.For(0, 100, (i) =>    {     Product product = new Product();     product.Name = "name" + i;     product.Category = "Category" + i;     product.SellPrice = i;     _ConcurrenProducts.Push(product);    });   }   /*尝试返回 但不移除*/   static void PeekConcurrenProducts()   {    Parallel.For(0, 2, (i) =>    {     Product product = null;     bool excute = _ConcurrenProducts.TryPeek(out product);     Console.WriteLine(product.Name);    });   }   /*尝试返回 并 移除*/   static void PopConcurrenProducts()   {    Parallel.For(0, 2, (i) =>    {     Product product = null;     bool excute = _ConcurrenProducts.TryPop(out product);     Console.WriteLine(product.Name);    });   }  }  class Product  {   public string Name { get; set; }   public string Category { get; set; }   public int SellPrice { get; set; }  }
View Code

对于并发下的其他集合,我这边就不做代码案列了,大家可以通过下面的链接查看,如有问题,欢迎指正

相关文章
|
3月前
|
并行计算 Java 数据处理
SpringBoot高级并发实践:自定义线程池与@Async异步调用深度解析
SpringBoot高级并发实践:自定义线程池与@Async异步调用深度解析
298 0
|
2月前
|
安全
List并发线程安全问题
【10月更文挑战第21天】`List` 并发线程安全问题是多线程编程中一个非常重要的问题,需要我们认真对待和处理。只有通过不断地学习和实践,我们才能更好地掌握多线程编程的技巧和方法,提高程序的性能和稳定性。
249 59
|
26天前
|
Java 关系型数据库 MySQL
【JavaEE“多线程进阶”】——各种“锁”大总结
乐/悲观锁,轻/重量级锁,自旋锁,挂起等待锁,普通互斥锁,读写锁,公不公平锁,可不可重入锁,synchronized加锁三阶段过程,锁消除,锁粗化
|
2月前
|
供应链 安全 NoSQL
PHP 互斥锁:如何确保代码的线程安全?
在多线程和高并发环境中,确保代码段互斥执行至关重要。本文介绍了 PHP 互斥锁库 `wise-locksmith`,它提供多种锁机制(如文件锁、分布式锁等),有效解决线程安全问题,特别适用于电商平台库存管理等场景。通过 Composer 安装后,开发者可以利用该库确保在高并发下数据的一致性和安全性。
48 6
|
2月前
|
安全 Java
线程安全的艺术:确保并发程序的正确性
在多线程环境中,确保线程安全是编程中的一个核心挑战。线程安全问题可能导致数据不一致、程序崩溃甚至安全漏洞。本文将分享如何确保线程安全,探讨不同的技术策略和最佳实践。
58 6
|
2月前
|
安全 Java 开发者
Java 多线程并发控制:深入理解与实战应用
《Java多线程并发控制:深入理解与实战应用》一书详细解析了Java多线程编程的核心概念、并发控制技术及其实战技巧,适合Java开发者深入学习和实践参考。
85 7
|
2月前
|
存储 安全 Java
Java多线程编程中的并发容器:深入解析与实战应用####
在本文中,我们将探讨Java多线程编程中的一个核心话题——并发容器。不同于传统单一线程环境下的数据结构,并发容器专为多线程场景设计,确保数据访问的线程安全性和高效性。我们将从基础概念出发,逐步深入到`java.util.concurrent`包下的核心并发容器实现,如`ConcurrentHashMap`、`CopyOnWriteArrayList`以及`BlockingQueue`等,通过实例代码演示其使用方法,并分析它们背后的设计原理与适用场景。无论你是Java并发编程的初学者还是希望深化理解的开发者,本文都将为你提供有价值的见解与实践指导。 --- ####
|
2月前
|
存储 设计模式 分布式计算
Java中的多线程编程:并发与并行的深度解析####
在当今软件开发领域,多线程编程已成为提升应用性能、响应速度及资源利用率的关键手段之一。本文将深入探讨Java平台上的多线程机制,从基础概念到高级应用,全面解析并发与并行编程的核心理念、实现方式及其在实际项目中的应用策略。不同于常规摘要的简洁概述,本文旨在通过详尽的技术剖析,为读者构建一个系统化的多线程知识框架,辅以生动实例,让抽象概念具体化,复杂问题简单化。 ####
|
3月前
|
Java
【编程进阶知识】揭秘Java多线程:并发与顺序编程的奥秘
本文介绍了Java多线程编程的基础,通过对比顺序执行和并发执行的方式,展示了如何使用`run`方法和`start`方法来控制线程的执行模式。文章通过具体示例详细解析了两者的异同及应用场景,帮助读者更好地理解和运用多线程技术。
45 1
|
3月前
|
Java 应用服务中间件 测试技术
Java21虚拟线程:我的锁去哪儿了?
【10月更文挑战第8天】
65 0
AI助理

你好,我是AI助理

可以解答问题、推荐解决方案等