8天玩转并行开发——第四天 同步机制(上)

简介:

     在并行计算中,不可避免的会碰到多个任务共享变量,实例,集合。虽然task自带了两个方法:task.ContinueWith()和Task.Factory

.ContinueWhenAll()来实现任务串行化,但是这些简单的方法远远不能满足我们实际的开发需要,从.net 4.0开始,类库给我们提供了很多

的类来帮助我们简化并行计算中复杂的数据同步问题。

 

大体上分为二种:

①   并发集合类:           这个在先前的文章中也用到了,他们的出现不再让我们过多的关注同步细节。

②  轻量级同步机制:      相对于老版本中那些所谓的重量级同步机制而言,新的机制更加节省cpu的额外开销。

 

关于并发集合类没什么好讲的,如果大家熟悉非线程安全的集合,那么这些并发的集合对你来说小菜一碟,这一篇和下一篇我们仔细来玩玩这

些轻量级的同步机制。

 

一:Barrier(屏障同步)

 

1:基本概念

    msdn对它的解释是:使多个任务能够采用并行方式依据某种算法在多个阶段协同工作。乍一看有点不懂,没关系,我们采取提干法。

”多个任务“,”多个阶段”,“协同”,仔细想想知道了,下一阶段的执行必须等待上一个阶段中多task全部执行完,那么我们实际中有这样

的需求吗?当然有的,比如我们数据库中有100w条数据需要导入excel,为了在数据库中加速load,我们需要开多个任务去跑,比如这

里的4个task,要想load产品表,必须等4个task都跑完用户表才行,那么你有什么办法可以让task为了你两肋插刀呢?它就是Barrier。

 

好,我们知道barrier叫做屏障,就像下图中的“红色线”,如果我们的屏障设为4个task就认为已经满了的话,那么执行中先到的task必须等待

后到的task,通知方式也就是barrier.SignalAndWait(),屏障中线程设置操作为new Barrier(4,(i)=>{})。

 

啰嗦了半天,还是上下代码说话:

using System.Collections.Concurrent;
using System.Threading.Tasks;
using System;
using System.Diagnostics;
using System.Collections.Generic;
using System.Linq;
using System.Threading;

class Program
{
//四个task执行
static Task[] tasks = new Task[4];

static Barrier barrier = null;

static void Main(string[] args)
{
barrier = new Barrier(tasks.Length, (i) =>
{
Console.WriteLine("**********************************************************");
Console.WriteLine("\n屏障中当前阶段编号:{0}\n", i.CurrentPhaseNumber);
Console.WriteLine("**********************************************************");
});

for (int j = 0; j < tasks.Length; j++)
{
tasks[j] = Task.Factory.StartNew((obj) =>
{
var single = Convert.ToInt32(obj);

LoadUser(single);
barrier.SignalAndWait();

LoadProduct(single);
barrier.SignalAndWait();

LoadOrder(single);
barrier.SignalAndWait();
}, j);
}

Task.WaitAll(tasks);

Console.WriteLine("指定数据库中所有数据已经加载完毕!");

Console.Read();
}

static void LoadUser(int num)
{
Console.WriteLine("当前任务:{0}正在加载User部分数据!", num);
}

static void LoadProduct(int num)
{
Console.WriteLine("当前任务:{0}正在加载Product部分数据!", num);
}

static void LoadOrder(int num)
{
Console.WriteLine("当前任务:{0}正在加载Order部分数据!", num);
}
}


2:死锁问题

    先前的例子我们也知道,屏障必须等待4个task通过SignalAndWait()来告知自己已经到达,当4个task全部达到后,我们可以通过

barrier.ParticipantsRemaining来获取task到达状态,那么如果有一个task久久不能到达那会是怎样的情景呢?好,我举个例子。

using System.Collections.Concurrent;
using System.Threading.Tasks;
using System;
using System.Diagnostics;
using System.Collections.Generic;
using System.Linq;
using System.Threading;

class Program
{
//四个task执行
static Task[] tasks = new Task[4];

static Barrier barrier = null;

static void Main(string[] args)
{
barrier = new Barrier(tasks.Length, (i) =>
{
Console.WriteLine("**********************************************************");
Console.WriteLine("\n屏障中当前阶段编号:{0}\n", i.CurrentPhaseNumber);
Console.WriteLine("**********************************************************");
});

for (int j = 0; j < tasks.Length; j++)
{
tasks[j] = Task.Factory.StartNew((obj) =>
{
var single = Convert.ToInt32(obj);

LoadUser(single);
barrier.SignalAndWait();

LoadProduct(single);
barrier.SignalAndWait();

LoadOrder(single);
barrier.SignalAndWait();

}, j);
}

Task.WaitAll(tasks);

barrier.Dispose();

Console.WriteLine("指定数据库中所有数据已经加载完毕!");

Console.Read();
}

static void LoadUser(int num)
{
Console.WriteLine("\n当前任务:{0}正在加载User部分数据!", num);

if (num == 0)
{
//num=0:表示0号任务
//barrier.ParticipantsRemaining == 0:表示所有task到达屏障才会退出
// SpinWait.SpinUntil: 自旋锁,相当于死循环
SpinWait.SpinUntil(() => barrier.ParticipantsRemaining == 0);
}
}

static void LoadProduct(int num)
{
Console.WriteLine("当前任务:{0}正在加载Product部分数据!", num);
}

static void LoadOrder(int num)
{
Console.WriteLine("当前任务:{0}正在加载Order部分数据!", num);
}
}

我们发现程序在加载User表的时候卡住了,出现了类似死循环,这句SpinWait.SpinUntil(() => barrier.ParticipantsRemaining == 0)中

的ParticipantsRemaining==0 永远也不能成立,导致task0永远都不能退出,然而barrier还在一直等待task0调用SignalAndWait来结束屏障。

结果就是造成了相互等待的尴尬局面,我们下个断点看看情况。

 

3:超时机制

    当我们coding的时候遇到了这种问题还是很纠结的,所以我们必须引入一种“超时机制”,如果在指定的时候内所有的参与者(task)都

没有到达屏障的话,我们就需要取消这些参与者的后续执行,幸好SignalAndWait给我们提供了超时的重载,为了能够取消后续执行,我们

还要采用CancellationToken机制。

using System.Collections.Concurrent;
using System.Threading.Tasks;
using System;
using System.Diagnostics;
using System.Collections.Generic;
using System.Linq;
using System.Threading;

class Program
{
//四个task执行
static Task[] tasks = new Task[4];

static Barrier barrier = null;

static void Main(string[] args)
{
CancellationTokenSource cts = new CancellationTokenSource();

CancellationToken ct = cts.Token;

barrier = new Barrier(tasks.Length, (i) =>
{
Console.WriteLine("**********************************************************");
Console.WriteLine("\n屏障中当前阶段编号:{0}\n", i.CurrentPhaseNumber);
Console.WriteLine("**********************************************************");
});

for (int j = 0; j < tasks.Length; j++)
{
tasks[j] = Task.Factory.StartNew((obj) =>
{
var single = Convert.ToInt32(obj);

LoadUser(single);

if (!barrier.SignalAndWait(2000))
{
//抛出异常,取消后面加载的执行
throw new OperationCanceledException(string.Format("我是当前任务{0},我抛出异常了!", single), ct);
}

LoadProduct(single);
barrier.SignalAndWait();

LoadOrder(single);
barrier.SignalAndWait();

}, j, ct);
}

//等待所有tasks 4s
Task.WaitAll(tasks, 4000);

try
{
for (int i = 0; i < tasks.Length; i++)
{
if (tasks[i].Status == TaskStatus.Faulted)
{
//获取task中的异常
foreach (var single in tasks[i].Exception.InnerExceptions)
{
Console.WriteLine(single.Message);
}
}
}

barrier.Dispose();
}
catch (AggregateException e)
{
Console.WriteLine("我是总异常:{0}", e.Message);
}

Console.Read();
}

static void LoadUser(int num)
{
Console.WriteLine("\n当前任务:{0}正在加载User部分数据!", num);

if (num == 0)
{
//自旋转5s
if (!SpinWait.SpinUntil(() => barrier.ParticipantsRemaining == 0, 5000))
return;
}

Console.WriteLine("当前任务:{0}正在加载User数据完毕!", num);
}

static void LoadProduct(int num)
{
Console.WriteLine("当前任务:{0}正在加载Product部分数据!", num);
}

static void LoadOrder(int num)
{
Console.WriteLine("当前任务:{0}正在加载Order部分数据!", num);
}
}


二:spinLock(自旋锁)

    我们初识多线程或者多任务时,第一个想到的同步方法就是使用lock或者Monitor,然而在4.0 之后给我们提供了另一把武器spinLock,

如果你的任务持有锁的时间非常短,具体短到什么时候msdn也没有给我们具体的答案,但是有一点值得确定的时,如果持有锁的时候比较

短,那么它比那些重量级别的Monitor具有更小的性能开销,它的用法跟Monitor很相似,下面举个例子,Add2方法采用自旋锁。

using System.Collections.Concurrent;
using System.Threading.Tasks;
using System;
using System.Diagnostics;
using System.Collections.Generic;
using System.Linq;
using System.Threading;

class Program
{
static SpinLock slock = new SpinLock(false);

static int sum1 = 0;

static int sum2 = 0;

static void Main(string[] args)
{
Task[] tasks = new Task[100];

for (int i = 1; i <= 100; i++)
{
tasks[i - 1] = Task.Factory.StartNew((num) =>
{
Add1((int)num);

Add2((int)num);

}, i);
}

Task.WaitAll(tasks);

Console.WriteLine("Add1数字总和:{0}", sum1);

Console.WriteLine("Add2数字总和:{0}", sum2);

Console.Read();
}

//无锁
static void Add1(int num)
{
Thread.Sleep(100);

sum1 += num;
}

//自旋锁
static void Add2(int num)
{
bool lockTaken = false;

Thread.Sleep(100);

try
{
slock.Enter(ref lockTaken);
sum2 += num;
}
finally
{
if (lockTaken)
slock.Exit(false);
}
}
}

相关文章
|
4月前
|
Java 开发者
【编程高手必备】Java多线程编程实战揭秘:解锁高效并发的秘密武器!
【8月更文挑战第22天】Java多线程编程是提升软件性能的关键技术,可通过继承`Thread`类或实现`Runnable`接口创建线程。为确保数据一致性,可采用`synchronized`关键字或`ReentrantLock`进行线程同步。此外,利用`wait()`和`notify()`方法实现线程间通信。预防死锁策略包括避免嵌套锁定、固定锁顺序及设置获取锁的超时。掌握这些技巧能有效增强程序的并发处理能力。
28 2
|
4月前
|
Java 开发者
解锁Java并发编程的秘密武器!揭秘AQS,让你的代码从此告别‘锁’事烦恼,多线程同步不再是梦!
【8月更文挑战第25天】AbstractQueuedSynchronizer(AQS)是Java并发包中的核心组件,作为多种同步工具类(如ReentrantLock和CountDownLatch等)的基础。AQS通过维护一个表示同步状态的`state`变量和一个FIFO线程等待队列,提供了一种高效灵活的同步机制。它支持独占式和共享式两种资源访问模式。内部使用CLH锁队列管理等待线程,当线程尝试获取已持有的锁时,会被放入队列并阻塞,直至锁被释放。AQS的巧妙设计极大地丰富了Java并发编程的能力。
47 0
|
6月前
|
Java 调度
【实战指南】Java多线程高手秘籍:线程生命周期管理,掌控程序命运的钥匙!
【6月更文挑战第19天】Java多线程涉及线程生命周期的五个阶段:新建、就绪、运行、阻塞和死亡。理解这些状态转换对性能优化至关重要。线程从新建到调用`start()`变为就绪,等待CPU执行。获得执行权后进入运行状态,执行`run()`。遇到阻塞如等待锁时,进入阻塞状态。完成后或被中断则死亡。管理线程包括合理使用锁、利用线程池、处理异常和优雅关闭线程。通过控制这些,能编写更高效稳定的多线程程序。
47 1
|
7月前
|
设计模式 消息中间件 存储
18个并发场景的设计模式详解,有没有你的盲区
这些模式在多线程并发编程中非常有用`。在分布式应用中,并发场景无处不在,理解和掌握这些并发模式的编码技巧,有助于我们在开发中解决很多问题,这要把这些与23种设计模式混淆了,虽然像单例模式是同一个,但这个是考虑并发场景下的应用。内容比较多,V哥建议可以收藏起来,即用好查。拜拜了您誒,晚安。
192 1
18个并发场景的设计模式详解,有没有你的盲区
|
算法 Java Linux
工作这么久了,还不懂多线程吗?
浩哥Java多线程整理学习系列之01基础知识整理
108 0
工作这么久了,还不懂多线程吗?
|
Java
【多线程3:基础原理】
【多线程3:基础原理】
114 0
|
缓存 NoSQL 算法
|
存储 安全 算法
重生之我在人间敲代码_Java并发基础_安全性、活跃性以及性能问题
并发编程中我们需要注意的问题有很多,很庆幸前人已经帮我们总结过了,主要有三个方面,分别是:安全性问题、活跃性问题和性能问题。
|
监控
【多线程:犹豫模式】
【多线程:犹豫模式】
136 0