并行编程之任务并行

简介: 任务并行库 (TPL) 基于任务的概念。术语“任务并行”是指同时运行的一个或多个任务。任务表示异步操作,在某些方面它类似于创建新线程或 ThreadPool 工作项,但抽象级别较高。任务提供两个主要好处: 系统资源的使用效率更高,可伸缩性更好。

任务并行库 (TPL) 基于任务的概念。术语“任务并行”是指同时运行的一个或多个任务。任务表示异步操作,在某些方面它类似于创建新线程或 ThreadPool 工作项,但抽象级别较高。任务提供两个主要好处:

  • 系统资源的使用效率更高,可伸缩性更好。

    在后台,任务排队到 ThreadPool,ThreadPool 已使用登山等算法进行增强,这些算法能够确定并调整到可最大化吞吐量的线程数。这会使任务相对轻量,您可以创建很多任务以启用细化并行。为了补偿这一点,可使用众所周知的工作窃取算法提供负载平衡。

  • 对于线程或工作项,可以使用更多的编程控件。

    任务和围绕它们生成的框架提供了一组丰富的 API,这些 API 支持等待、取消、继续、可靠的异常处理、详细状态、自定义计划等功能。

出于这两个原因,在 .NET Framework 4 中,任务是用于编写多线程、异步和并行代码的首选 API。

 

1:隐式创建和运行任务

Parallel..::.Invoke 方法提供了一种简便方式,可同时运行任意数量的任意语句。只需为每个工作项传入 Action 委托即可。创建这些委托的最简单方式是使用 lambda 表达式。

Parallel.Invoke(() => DoSomeWork(), () => DoSomeOtherWork());

为了更好地控制任务执行或从任务返回值,必须更加显式地使用 Task 对象。

 

2:显式创建和运行任务

任务由 System.Threading.Tasks..::.Task 类表示。返回值的任务由 System.Threading.Tasks..::.Task<(Of <(TResult>)>) 类表示,该类从 Task 继承。

任务对象处理基础结构详细信息,并提供可在任务的整个生存期内从调用线程访问的方法和属性。 例如,可以随时访问任务的 Status 属性,以确定它是已开始运行、已完成运行、已取消还是引发了异常。 状态由 TaskStatus 枚举表示。

在创建任务时,您赋予它一个用户委托,该委托封装该任务将执行的代码。该委托可以表示为命名的委托、匿名方法或 lambda 表达式。lambda 表达式可以包含对命名方法的调用,如下面的示例所示。

代码
 
    
Task < byte [] > getData = new Task < byte [] > (() => GetFileData());
Task
< double [] > analyzeData = getData.ContinueWith(x => Analyze(x.Result));
Task
< string > reportData = analyzeData.ContinueWith(y => Summarize(y.Result));
getData.Start();
// or...
Task < string > reportData2 = Task.Factory.StartNew(() => GetFileData())
.ContinueWith((x)
=> Analyze(x.Result))
.ContinueWith((y)
=> Summarize(y.Result));
System.IO.File.WriteAllText(
@" C:\reportFolder\report.txt " , reportData.Result);

 

使用 ContinueWhenAll()()() 方法和 ContinueWhenAny()()() 方法,可以从多个任务继续。有关更多信息,请参见延续任务如何:用延续将多个任务链接在一起

  
4:等待任务

System.Threading.Tasks..::.Task 类型和 System.Threading.Tasks..::.Task<(Of <(TResult>)>) 类型提供使您可以等待任务完成的 Wait 方法的一些重载。此外,静态 TaskWaitAll()()() 方法和 TaskWaitAny()()() 方法的重载使您可以等待任务数组的任一任务或所有任务完成。

通常,会出于以下某个原因等待任务:

  • 主线程依赖于任务计算的最终结果。

  • 您必须处理可能从任务引发的异常。

代码
 
   
private void buttonParallelTaskWait_Click( object sender, RoutedEventArgs e)
{
// Wait on a single task with no timeout specified.
Task task1 = Task.Factory.StartNew(() => DoSomeWork( 10000000 ));
task1.Wait();
ConsoleTexter.Clear();
ConsoleTexter.WriteLine(
" task1 has completed. " );

// Wait on a single task with a timeout specified.
Task task2 = Task.Factory.StartNew(() => DoSomeWork( 10000000 ));
task2.Wait(
100 ); // Wait for 100 ms.

if (task2.IsCompleted)
ConsoleTexter.WriteLine(
" task2 has completed. " );
else
ConsoleTexter.WriteLine(
" Timed out before task2 completed. " );

// Wait for all tasks to complete.
Task[] tasks = new Task[ 10 ];
for ( int i = 0 ; i < 10 ; i ++ )
{
tasks[i]
= Task.Factory.StartNew(() => DoSomeWork( 10000000 ));
}
Task.WaitAll(tasks);

// Wait for first task to complete.
Task < double > [] tasks2 = new Task < double > [ 3 ];

// Try three different approaches to the problem. Take the first one.
tasks2[ 0 ] = Task < double > .Factory.StartNew(() => TrySolution1());
tasks2[
1 ] = Task < double > .Factory.StartNew(() => TrySolution2());
tasks2[
2 ] = Task < double > .Factory.StartNew(() => TrySolution3());

int index = Task.WaitAny(tasks2);
double d = tasks2[index].Result;
ConsoleTexter.WriteLine(
" task[{0}] completed first with result of {1}. " , index, d);
MessageBox.Show(ConsoleTexter.Out.ToString());
}

 

5:任务中的异常处理

当某个任务引发一个或多个异常时,异常包装在 AggregateException 中。该异常传播回与该任务联接的线程,此线程通常是正在等待该任务或尝试访问该任务的 Result 属性的线程。此行为用于强制实施所有未处理的异常默认情况下应关闭进程的 .NET Framework 策略。调用代码可以通过在任务或任务组上使用 WaitWaitAll()()()WaitAny()()() 方法或 Result()()() 属性,或者通过在 try-catch 块中包括 Wait 方法,来处理异常。

联接线程也可以通过在对任务进行垃圾回收之前访问 Exception 属性来处理异常。通过访问此属性,可防止未处理的异常触发在对象完成时关闭进程的异常传播行为。

异常处理的例子见下文的取消任务。 

 

6:取消任务

System.Threading.Tasks..::.TaskSystem.Threading.Tasks..::.Task<(Of <(TResult>)>) 类支持通过使用取消标记(.NET Framework 4 中的新功能)进行取消。有关更多信息,请参见取消在任务的类中,取消涉及到表示可取消操作的用户委托与请求取消的代码之间的协作。 成功的取消涉及到调用 CancellationTokenSource..::.Cancel 方法的请求代码,以及及时终止操作的用户委托。可以使用以下选项之一终止操作:

  • 简单地从委托中返回。在许多情况下,这样已足够;但是,采用这种方式“取消”的任务实例会转换为 RanToCompletion 状态,而不是 Canceled 状态。

  • 引发 OperationCanceledException,并将其传递到在其上请求了取消的标记。完成此操作的首选方式是使用 ThrowIfCancellationRequested 方法。采用这种方式取消的任务会转换为 Canceled 状态,调用代码可使用该状态来验证任务是否响应了其取消请求。

 

代码
 
    
CancellationTokenSource tokenSource2;
CancellationToken ct;
Task task;
public UserControlParallel()
{
InitializeComponent();
tokenSource2
= new CancellationTokenSource();
ct
= tokenSource2.Token;
}

// 任务开始
private void buttonParallelTaskStart_Click( object sender, RoutedEventArgs e)
{
task
= Task.Factory.StartNew(() =>
{
// Were we already canceled?
ct.ThrowIfCancellationRequested();
bool moreToDo = true ;
while (moreToDo)
{
// Poll on this property if you have to do
// other cleanup before throwing.
Thread.Sleep( 100 );
if (ct.IsCancellationRequested)
{
// Clean up here, then...
ct.ThrowIfCancellationRequested();
}
}
}, tokenSource2.Token);
// Pass same token to StartNew.
}

// 任务结束
private void buttonParallelTaskCancel_Click( object sender, RoutedEventArgs e)
{
tokenSource2.Cancel();
// Just continue on this thread, or Wait/WaitAll with try-catch:
try
{
task.Wait();
}
catch (AggregateException err)
{
ConsoleTexter.Clear();
foreach (var v in err.InnerExceptions)
ConsoleTexter.WriteLine(
" msg: " + v.Message);
MessageBox.Show(ConsoleTexter.Out.ToString());
}
}

更多参考http://msdn.microsoft.com/zh-cn/library/dd537609.aspx

 

 

 
Creative Commons License本文基于 Creative Commons Attribution 2.5 China Mainland License发布,欢迎转载,演绎或用于商业目的,但是必须保留本文的署名 http://www.cnblogs.com/luminji(包含链接)。如您有任何疑问或者授权方面的协商,请给我留言。
目录
相关文章
|
2月前
|
缓存 编译器 程序员
C/C++编译器并行优化技术:并行优化针对多核处理器和多线程环境进行优化,以提高程序的并行度
C/C++编译器并行优化技术:并行优化针对多核处理器和多线程环境进行优化,以提高程序的并行度
83 0
|
3月前
|
Rust 并行计算 安全
Rust中的并行与并发优化:释放多核性能
Rust语言以其内存安全和高效的并发模型在并行计算领域脱颖而出。本文深入探讨了Rust中的并行与并发优化技术,包括使用多线程、异步编程、以及并行算法等。通过理解并应用这些技术,Rust开发者可以有效地利用多核处理器,提高程序的性能和响应能力。
|
6月前
|
安全 前端开发 程序员
C++11 并发编程基础(一):并发、并行与C++多线程
C++11标准在标准库中为多线程提供了组件,这意味着使用C++编写与平台无关的多线程程序成为可能,而C++程序的可移植性也得到了有力的保证。另外,并发编程可提高应用的性能,这对对性能锱铢必较的C++程序员来说是值得关注的。
114 0
C++11 并发编程基础(一):并发、并行与C++多线程
|
SQL 开发框架 算法
C#多线程开发-任务并行库04
C#多线程开发-任务并行库04
131 0
C#多线程开发-任务并行库04
|
并行计算 Java 数据挖掘
对于并行和并行概念上的理解与总结
并行和并行概念上的理解与总结
1103 0
|
Java 异构计算
Java并发编程之概念一:并行与并发
Java并发编程之概念一:并行与并发概念解释并行性和并发性是既相似又有区别的两个概念。 并行性是指两个或多个事件在同一时刻发生。 而并发性是指连个或多个事件在同一时间间隔内发生。在多道程序环境下,并发性是指在一段时间内宏观上有多个程序在同时运行,但在单处理机环境下(一个处理器),每一时刻却仅能有一道程序执行,故微观上这些程序只能是分时地交替执行。
2274 0
|
并行计算
《并行计算的编程模型》一1.8 并行I/O
本节书摘来华章计算机《并行计算的编程模型》一书中的第1章 ,第1.8节, [(美)帕万·巴拉吉(Pavan Balaji)编著;张云泉等译,更多章节内容可以访问云栖社区“华章计算机”公众号查看。
1928 0
|
并行计算
《并行计算的编程模型》一2.2.2 线程
本节书摘来华章计算机《并行计算的编程模型》一书中的第2章 ,第2.2.2节, [(美)帕万·巴拉吉(Pavan Balaji)编著;张云泉等译,更多章节内容可以访问云栖社区“华章计算机”公众号查看。
870 0