8天玩转并行开发——第五天 同步机制(下)

简介:

   

     承接上一篇,我们继续说下.net4.0中的同步机制,是的,当出现了并行计算的时候,轻量级别的同步机制应运而生,在信号量这一块

出现了一系列的轻量级,今天继续介绍下面的3个信号量 CountdownEvent,SemaphoreSlim,ManualResetEventSlim。

 

一:CountdownEvent

     这种采用信号状态的同步基元非常适合在动态的fork,join的场景,它采用“信号计数”的方式,就比如这样,一个麻将桌只能容纳4个

人打麻将,如果后来的人也想搓一把碰碰运气,那么他必须等待直到麻将桌上的人走掉一位。好,这就是简单的信号计数机制,从技术角

度上来说它是定义了最多能够进入关键代码的线程数。

     但是CountdownEvent更牛X之处在于我们可以动态的改变“信号计数”的大小,比如一会儿能够容纳8个线程,一下又4个,一下又10个,

这样做有什么好处呢?还是承接上一篇文章所说的,比如一个任务需要加载1w条数据,那么可能出现这种情况。

 

加载User表:         根据user表的数据量,我们需要开5个task。

加载Product表:    产品表数据相对比较多,计算之后需要开8个task。

加载order表:       由于我的网站订单丰富,计算之后需要开12个task。

 

先前的文章也说了,我们需要协调task在多阶段加载数据的同步问题,那么如何应对这里的5,8,12,幸好,CountdownEvent给我们提供了

可以动态修改的解决方案。

using System.Collections.Concurrent;
using System.Threading.Tasks;
using System;
using System.Diagnostics;
using System.Collections.Generic;
using System.Linq;
using System.Threading;

class Program
{
//默认的容纳大小为“硬件线程“数
static CountdownEvent cde = new CountdownEvent(Environment.ProcessorCount);

static void Main(string[] args)
{
//加载User表需要5个任务
var userTaskCount = 5;

//重置信号
cde.Reset(userTaskCount);

for (int i = 0; i < userTaskCount; i++)
{
Task.Factory.StartNew((obj) =>
{
LoadUser(obj);
}, i);
}

//等待所有任务执行完毕
cde.Wait();

Console.WriteLine("\nUser表数据全部加载完毕!\n");

//加载product需要8个任务
var productTaskCount = 8;

//重置信号
cde.Reset(productTaskCount);

for (int i = 0; i < productTaskCount; i++)
{
Task.Factory.StartNew((obj) =>
{
LoadProduct(obj);
}, i);
}

cde.Wait();

Console.WriteLine("\nProduct表数据全部加载完毕!\n");

//加载order需要12个任务
var orderTaskCount = 12;

//重置信号
cde.Reset(orderTaskCount);

for (int i = 0; i < orderTaskCount; i++)
{
Task.Factory.StartNew((obj) =>
{
LoadOrder(obj);
}, i);
}

cde.Wait();

Console.WriteLine("\nOrder表数据全部加载完毕!\n");

Console.WriteLine("\n(*^__^*) 嘻嘻,恭喜你,数据全部加载完毕\n");

Console.Read();
}

static void LoadUser(object obj)
{
try
{
Console.WriteLine("当前任务:{0}正在加载User部分数据!", obj);
}
finally
{
cde.Signal();
}
}

static void LoadProduct(object obj)
{
try
{
Console.WriteLine("当前任务:{0}正在加载Product部分数据!", obj);
}
finally
{
cde.Signal();
}
}

static void LoadOrder(object obj)
{
try
{
Console.WriteLine("当前任务:{0}正在加载Order部分数据!", obj);
}
finally
{
cde.Signal();
}
}
}


我们看到有两个主要方法:Wait和Signal。每调用一次Signal相当于麻将桌上走了一个人,直到所有人都搓过麻将wait才给放行,这里同样要

注意也就是“超时“问题的存在性,尤其是在并行计算中,轻量级别给我们提供了”取消标记“的机制,这是在重量级别中不存在的,比如下面的

重载public bool Wait(int millisecondsTimeout, CancellationToken cancellationToken),具体使用可以看前一篇文章的介绍。

 

二:SemaphoreSlim

     在.net 4.0之前,framework中有一个重量级的Semaphore,人家可以跨进程同步,咋轻量级不行,msdn对它的解释为:限制可同时访问

某一资源或资源池的线程数。关于它的重量级demo,我的上一个系列有演示,你也可以理解为CountdownEvent是 SemaphoreSlim的功能加

强版,好了,举一个轻量级使用的例子。

using System.Collections.Concurrent;
using System.Threading.Tasks;
using System;
using System.Diagnostics;
using System.Collections.Generic;
using System.Linq;
using System.Threading;

class Program
{
static SemaphoreSlim slim = new SemaphoreSlim(Environment.ProcessorCount, 12);

static void Main(string[] args)
{
for (int i = 0; i < 12; i++)
{
Task.Factory.StartNew((obj) =>
{
Run(obj);
}, i);
}

Console.Read();
}

static void Run(object obj)
{
slim.Wait();

Console.WriteLine("当前时间:{0}任务 {1}已经进入。", DateTime.Now, obj);

//这里busy3s中
Thread.Sleep(3000);

slim.Release();
}
}


同样,防止死锁的情况,我们需要知道”超时和取消标记“的解决方案,像SemaphoreSlim这种定死的”线程请求范围“,其实是降低了扩展性,

所以说,试水有风险使用需谨慎,在觉得有必要的时候使用它。

 

三: ManualResetEventSlim

     相信它的重量级别大家都知道是ManualReset,而这个轻量级别采用的是"自旋等待“+”内核等待“,也就是说先采用”自旋等待的方式“等待,

直到另一个任务调用set方法来释放它。如果迟迟等不到释放,那么任务就会进入基于内核的等待,所以说如果我们知道等待的时间比较短,采

用轻量级的版本会具有更好的性能,原理大概就这样,下面举个小例子。

using System.Collections.Concurrent;
using System.Threading.Tasks;
using System;
using System.Diagnostics;
using System.Collections.Generic;
using System.Linq;
using System.Threading;

class Program
{
//2047:自旋的次数
static ManualResetEventSlim mrs = new ManualResetEventSlim(false, 2047);

static void Main(string[] args)
{

for (int i = 0; i < 12; i++)
{
Task.Factory.StartNew((obj) =>
{
Run(obj);
}, i);
}

Console.WriteLine("当前时间:{0}我是主线程{1},你们这些任务都等2s执行吧:\n",
DateTime.Now,
Thread.CurrentThread.ManagedThreadId);
Thread.Sleep(2000);

mrs.Set();

Console.Read();
}

static void Run(object obj)
{
mrs.Wait();

Console.WriteLine("当前时间:{0}任务 {1}已经进入。", DateTime.Now, obj);
}
}

相关文章
|
7月前
|
消息中间件 缓存 Java
根据实际开发经验(订单管理系统),谈谈多线程开发的好处
根据实际开发经验(订单管理系统),谈谈多线程开发的好处
101 0
|
安全 Java 微服务
《微服务实战》 第二十五章 Java多线程安全与锁
《微服务实战》 第二十五章 Java多线程安全与锁
144 0
|
消息中间件 监控 数据库
|
消息中间件 Dubbo NoSQL
Java后端开发三年多线程你都懂,问你异步编程你说你没听过???
以前需要异步执行一个任务时,一般是用Thread或者线程池Executor去创建。如果需要返回值,则是调用Executor.submit获取Future。但是多个线程存在依赖组合,我们又能怎么办?可使用同步组件CountDownLatch、CyclicBarrier等;其实有简单的方法,就是用CompletableFuture
291 0
|
监控
【多线程:犹豫模式】
【多线程:犹豫模式】
136 0
|
安全 Java
面试突击45:为什么要用读写锁?它有什么优点?
读写锁(Readers-Writer Lock)顾名思义是一把锁分为两部分:读锁和写锁。
423 0
|
存储 缓存 安全
|
设计模式 运维 架构师
90%的程序员,都没用过多线程和锁,怎么成为架构师?
如果是框架和中间件的存在,是了让程序员只关心业务开发。那为什么你面试的时候会被问到核心组件的设计和原理呢? 在这个年代,别放弃学习是你几乎唯一的生存途径。
172 0
90%的程序员,都没用过多线程和锁,怎么成为架构师?
|
算法 NoSQL Java
2021-Java后端工程师面试指南-(并发-多线程)(上)
前言 文本已收录至我的GitHub仓库,欢迎Star:github.com/bin39232820… 种一棵树最好的时间是十年前,其次是现在
121 0
|
监控 并行计算 安全
2021-Java后端工程师面试指南-(并发-多线程)(下)
前言 文本已收录至我的GitHub仓库,欢迎Star:github.com/bin39232820… 种一棵树最好的时间是十年前,其次是现在
103 0