当多个线程同时对同一个内存地址进行写入时,由于CPU时间调度上的问题写入数据会被多次的覆盖,所以就要使线程同步。所谓的同步就是协同步调,按预定的先后次序进行运行。线程同步是指多线程通过特定的设置来控制线程之间的执行顺序,也可以说是在线程之间通过同步建立起执行顺序的关系。.Net 为我们提供了多种线程同步的解决方案:
- 使用原子操作,一个操作只占用一个量子时间,一次就能完成,在当前操作完成后其他线程才能执行其他操作。这种方法可以避免使用锁进而排除了产生死锁的可能性;
- 将等待的线程置于阻塞状态,这就意味着将会引入最少一次上下文切换。这种方法会消耗大量的资源,只有在线程需要长时间被挂起时方可使用;
- 利用简单等待,这种方式减少切换上下文的时间,但是在等待过程中却增加了 CPU 的时间,它只适用于线程短暂等待的情况下;
- 混合模式,首先利用简单等待,如果线程等待时间太长,就会自动切换到阻塞状态。
下面我将利用两篇文章来讲解以上四种方式在 .NET 中使用,本篇文章讲解的内容主要有:
- 原子操作
- Mutex
- SemaphoreSlim
- AutoResetEvent
- ManualResetEventSilm
零、原子操作
原子本意是不能被进一步分割的最小粒子,而原子操作指的是 不可被中断的一个或一系列操作 。在C#中有多个线程同时对某个变量进行操作的时候,我们应该使用原子操作防止多线程取到的值不是最新的值。使用.NET提供的Interlocked类可以对一些数据进行原子操作,效果看起来似乎跟 lock 锁一样,但它的原子操作是基于 CPU 本身的非阻塞的,所以要比 lock 的效率高。
using System.Threading; using static System.Console; using static System.Threading.Thread; namespace NoThree { class Program { private static int runCount = 0; static Number number = new Number(); static void Main(string[] args) { Thread thread1 = new Thread(Run); Thread thread2 = new Thread(Run); Thread thread3 = new Thread(Run); thread1.Start(); thread2.Start(); thread3.Start(); thread1.Join(); thread2.Join(); thread3.Join(); WriteLine($"runCount = {runCount}"); Read(); } static void Run() { for (int i = 0; i < 10000; i++) { number.Add(); number.Subtraction(); } } class Number { public void Add() { //runCount++; Interlocked.Increment(ref runCount); } public void Subtraction() { //runCount--; Interlocked.Decrement(ref runCount); } } } }
在上述代码中我创建了三个线程,它们都调用 Run 方法。 Run 方法调用 Number 类的 Add 和 Subtraction 10000 次。 在这两个方法中我们分别调用了 Interlocked 的 Increment 和 Decrement 方法,这两个方法类似于 ++ 和 – ,但相对来说这两个方法要比 ++ 和 – 安全。如果不使用 Increment 和 Decrement ,会出现 thread1 线程执行完 Add 方法后,thread2 又执行了 Add 方法,这样 thread2 runCount 初始值就不是 0 ,执行完 Add 方法后值会被覆盖。就出现了结果不为 0 的情况。借助于Interlocked类,我们无需锁定任何对象即可获取到正确的结果。Interlocked提供了 Increment 、 Decrement 和 Add 等基本数学操作的原子方法。
一、Mutex
Mutex 是一种原始的同步方式,其只对一个线程授予对共享资源的独占访问。当多个线程同时访问共享资源时,Mutex 仅向一个线程授予对共享资源的独占访问权限。如果线程获取互斥体,则需要获取该互斥体的第二个线程将挂起,直到第一个线程释放该互斥体。这里需要注意,具名互斥体是全局操作对象,必须正确关闭否则就会导致其他线程一直在等待,直到超时。关闭互斥体也很简单,只需要用 using 代码块包裹互斥体即可。这种方法经常被用于不同进程之间线程同步。
using System.Threading; using static System.IO.File; using static System.Console; using static System.Threading.Thread; using static System.DateTime; namespace MutexClass { class Program { static void Main(string[] args) { Thread thread1 = new Thread(WriteFile); Thread thread2 = new Thread(WriteFile); thread1.Name = "thread1"; thread2.Name = "thread2"; thread1.Start(); thread2.Start(); Read(); } static void WriteFile() { const string mutexName = "write_file"; using (var m = new Mutex(false, mutexName)) { if (m.WaitOne(1000, false)) { WriteLine($"{CurrentThread.Name}:开始写入 {Now}"); for (int i = 0; i < 10000; i++) { AppendAllText("mutex.txt", i.ToString()); } Thread.Sleep(5000); WriteLine($"{CurrentThread.Name}:写入完毕 {Now}"); m.ReleaseMutex(); } else { WriteLine($"{CurrentThread.Name}:其他线程正在占用文件!{Now}"); } } } } }
在上述代码中我们定义了一个 WriteFile 方法,利用这个方法向文件 mutex.txt 写入内容。在方法的第二行我们定义了一个互斥量,名称是 write_file ,并设置 initiallyOwned 为 false 。参数 initiallyOwned 如果为 true,则给予调用线程已命名的系统互斥体的初始所属权(如果已命名的系统互斥体是通过此调用创建的)否则为 false。之后我们调用 WaitOne 方法组织当前线程操作,让当前线程在5秒内接收互斥量,并指定等待之前不退出同步域。当返回值为 true 时则代表已经接收到信号。最后我们调用 ReleaseMutex 方法释放线程拥有的互斥体的控制权。
Tip:
- 如果第二个线程在等待时间内没有收到互斥量,那么即使前一个线程执行完毕它也不会接着执行;
- 如果需要让第二个线程一直等待,只需要将 WaitOne 的超时时间设置为 -1 即可。
二、SemaphoreSlim
在开发中我们会遇到某某连接池已满或超出某某可连接的最大数量,这种情况就是我们要操作的东西限制了可连接的线程数(当然有些情况并不是这个原因)。同样我们在开发项目的时候需要访问某些共享资源(比如数据库、文件)时需要限制链接的线程数量,这时我们就可以用 SemaphoreSlim 类来进行处理。 SemaphoreSlim 类可以让我们通过信号系统限制访问共享资源的并发线程数量,当超出限制并发线程数量时,超出的线程将会等待,直到有线程调用 Release 方法发出信号,超出的线程才会开始访问共享资源。
using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading; using System.Threading.Tasks; using static System.Console; using static System.Threading.Thread; namespace SemaphoreSlimClass { class Program { static SemaphoreSlim ss = new SemaphoreSlim(3); static void Main(string[] args) { for(int i=0;i<12;i++) { Thread thread = new Thread(ReadTxt); thread.Start(); thread.Name = $"线程{i}"; } Read(); } static void ReadTxt() { WriteLine($"{CurrentThread.Name} 线程进入"); ss.Wait(); WriteLine($"{CurrentThread.Name} 线程开始读取文件"); Random ran = new Random(); int n = ran.Next(1000,5000); Sleep(n); WriteLine($"{CurrentThread.Name} 线程完毕"); ss.Release(); } } }
上述代码中,首先我们创建了一个 SemaphoreSlim 实例,并指定可并发访问线程数量为 4 ,之后通过 for 循环创建了 12 个线程。这 12 个线程都调用 ReadTxt 方法。这个方法中调用 Wait 方法让当前线程等待进入 SemaphoreSlim ,一旦剩余并发访问线程数量大于 0 或有线程调用 Release 发出信号,则继续执行。在 C# 中还存在一个名叫 Semaphore 的类,这个类一般用的很少,功能和 Mutex 功能类似,一般用在跨进程的线程同步中。它和 SemaphoreSlim 不同点是 Semaphore使用的是系统内核时间,而 SemaphoreSlim 不使用系统内核时间。
三、AutoResetEvent
有时候我们需要在线程之间通讯,我们可以借助数据库、文件进行解决,但是这都不是好办法。.NET 给我们提供了更好的办法–利用 AutoResetEvent 类。我们利用 AutoResetEvent 类告诉等待执行的线程有事件要发生。 线程通过调用 AutoResetEvent 上的 WaitOne 来等待信号。如果 AutoResetEvent 处于非终止状态,则该线程阻塞,并等待当前控制资源的线程通过调用 Set 发出资源可用的信号。调用 Set 向 AutoResetEvent 发信号以释放等待线程。AutoResetEvent 将保持终止状态,直到一个正在等待的线程被释放,然后自动返回非终止状态。如果没有任何线程在等待,则状态将无限期地保持为终止状态。可以通过将一个布尔值传递给构造函数来控制 AutoResetEvent 的初始状态,如果初始状态为终止状态,则为 true;否则为 false。通俗的来讲只有等 Set() 成功运行后, WaitOne() 才能够获得运行机会。Set 是发信号,WaitOne 是等待信号,只有发了信号,等待的才会执行。如果不发的话,WaitOne 后面的程序就永远不会执行。下面我们通过饭店吃饭的例子来看一下:
using static System.Console; using static System.Threading.Thread; using System.Threading; namespace AutoResetEventClass { class Program { private static AutoResetEvent serveEvent = new AutoResetEvent(false); private static AutoResetEvent cookEvent = new AutoResetEvent(false); static void Main(string[] args) { Thread orderThread = new Thread(Serve); Thread cookThread = new Thread(Cook); orderThread.Start(); cookThread.Start(); for (int i = 0; i < 20; i++) { WriteLine($"点餐{i + 1}"); cookEvent.Set(); } Read(); } static void Serve() { while (true) { serveEvent.WaitOne(); Sleep(5000); WriteLine("上菜完毕!!!!"); } } static void Cook() { while (true) { cookEvent.WaitOne(); Sleep(5000); WriteLine("做饭完毕!!!!"); serveEvent.Set(); } } } }
四、ManualResetEventSlim
上一小节所讲的 AutoResetEvent 类使用的是内核时间,因此不能等待太长时间,如果需要等待时间很长的话我们就需要用到 ManualResetEventSilm 类。好比是学校大门,当调用 Set 时,相当于打开了大门从而允许准备放学的学生(线程)放学回家。如果有但是在大门开启时间内如果有学生还在睡觉,之后调用 Rest 方法关闭了大门,那么这个学生就没法放学了,就只能等待下次大门打开。
using static System.Console; using static System.Threading.Thread; using System.Threading; using System; namespace ManualResetEventSilmClass { class Program { static ManualResetEventSlim mre = new ManualResetEventSlim(false); static void Main(string[] args) { Thread studentThread1 = new Thread(WaitingForSchool); Thread studentThread2 = new Thread(WaitingForSchool); studentThread1.Name = "小明"; studentThread2.Name = "小刘"; studentThread1.Start(); studentThread2.Start(); Sleep(5000); WriteLine("放学了!!!"); mre.Set(); Sleep(2000); mre.Reset(); WriteLine("上课了!!!!!"); Sleep(5000); WriteLine("又放学了!!!"); mre.Set(); Sleep(2000); mre.Reset(); WriteLine("又上课了!!!!!"); Read(); } static void WaitingForSchool() { Console.WriteLine($"{CurrentThread.Name} 睡觉中"); Random ran = new Random(); int n = ran.Next(1000, 10000); Sleep(n); Console.WriteLine($"{CurrentThread.Name} 等待放学!"); mre.Wait(); Console.WriteLine($"{CurrentThread.Name} 放学了!"); } } }
下面我们对比一下 AutoResetEvent 和 ManualResetEventSlim 的异同点:
- 共同点:
- Set方法将事件状态设置为终止状态,允许一个或多个等待线程继续;Reset方法将事件状态设置为非终止状态,导致线程阻止;WaitOne阻止当前线程,直到当前线程的WaitHandler收到事件信号。
- 可以通过构造函数的参数值来决定其初始状态,若为true则事件为终止状态从而使线程为非阻塞状态,为false则线程为阻塞状态。
- 如果某个线程调用WaitOne方法,则当事件状态为终止状态时,该线程会得到信号,继续向下执行。
- 不同点:
- AutoResetEvent.WaitOne()每次只允许一个线程进入,当某个线程得到信号后,AutoResetEvent会自动又将信号置为不发送状态,则其他调用WaitOne的线程只有继续等待,也就是说AutoResetEvent一次只唤醒一个线程;
- ManualResetEvent则可以唤醒多个线程,因为当某个线程调用了ManualResetEvent.Set()方法后,其他调用WaitOne的线程获得信号得以继续执行,而ManualResetEvent不会自动将信号置为不发送。
- 除非手工调用了ManualResetEvent.Reset()方法,则ManualResetEvent将一直保持有信号状态,ManualResetEvent也就可以同时唤醒多个线程继续执行。