来自 “C#并发编程经典实例”
优秀软件的一个关键特征就是具有并发性。过去的几十年,我们可以进行并发编程,但是难度很大。以前,并发性软件的编写、调试和维护都很难,这导致很多开发人员为图省事放弃了并发编程。新版.NET 中的程序库和语言特征,已经让并发编程变得简单多了。随着Visual Studio 2012 的发布,微软明显降低了并发编程的门槛。以前只有专家才能做并发编程,而今天,每一个开发人员都能够(而且应该)接受并发编程。
1.1 并发编程简介
首先,我来解释几个贯穿本书始终的术语。先来介绍并发。
• 并发
同时做多件事情。
这个解释直接表明了并发的作用。终端用户程序利用并发功能,在输入数据库的同时响应用户输入。服务器应用利用并发,在处理第一个请求的同时响应第二个请求。只要你希望程序同时做多件事情,你就需要并发。几乎每个软件程序都会受益于并发。在编写本书时(2014 年),大多数开发人员一看到“并发”就会想到“多线程”。对这两个概念,需要做一下区分。
• 多线程
并发的一种形式,它采用多个线程来执行程序。
从字面上看,多线程就是使用多个线程。本书后续章节将介绍,多线程是并发的一种形式,但不是唯一的形式。实际上,直接使用底层线程类型在现代程序中基本不起作用。比起老式的多线程机制,采用高级的抽象机制会让程序功能更加强大、效率更高。因此,本书将尽量不涉及一些过时的技术。书中所有多线程的方法都采用高级类型,而不是Thread或BackgroundWorker。
一旦你输入new Thread(),那就糟糕了,说明项目中的代码太过时了。
但是,不要认为多线程已经彻底被淘汰了!因为线程池要求多线程继续存在。线程池存放任务的队列,这个队列能够根据需要自行调整。相应地,线程池产生了另一个重要的并发形式:并行处理。
• 并行处理
把正在执行的大量的任务分割成小块,分配给多个同时运行的线程。为了让处理器的利用效率最大化,并行处理(或并行编程)采用多线程。当现代多核CPU执行大量任务时,若只用一个核执行所有任务,而其他核保持空闲,这显然是不合理的。并行处理把任务分割成小块并分配给多个线程,让它们在不同的核上独立运行。并行处理是多线程的一种,而多线程是并发的一种。在现代程序中,还有一种非常重要但很多人还不熟悉的并发类型:异步编程。
• 异步编程
并发的一种形式,它采用future 模式或回调(callback)机制,以避免产生不必要的线程。一个future(或promise)类型代表一些即将完成的操作。在.NET 中,新版future 类型有Task 和Task。在老式异步编程API 中,采用回调或事件(event),而不是future。异步编程的核心理念是异步操作:启动了的操作将会在一段时间后完成。这个操作正在执行时,不会阻塞原来的线程。启动了这个操作的线程,可以继续执行其他任务。当操作完成时,会通知它的future,或者调用回调函数,以便让程序知道操作已经结束。异步编程是一种功能强大的并发形式,但直至不久前,实现异步编程仍需要特别复杂的代码。VS2012 支持async 和await,这让异步编程变得几乎和同步(非并发)编程一样容易。
并发编程的另一种形式是响应式编程(reactive programming)。异步编程意味着程序启动一个操作,而该操作将会在一段时间后完成。响应式编程与异步编程非常类似,不过它是基并发编程概述
于异步事件(asynchronous event)的,而不是异步操作(asynchronous operation)。异步事件
可以没有一个实际的“开始”,可以在任何时间发生,并且可以发生多次,例如用户输入。
• 响应式编程
一种声明式的编程模式,程序在该模式中对事件做出响应。
如果把一个程序看作一个大型的状态机,则该程序的行为便可视为它对一系列事件做出响应,即每换一个事件,它就更新一次自己的状态。这听起来很抽象和空洞,但实际上并非如此。利用现代的程序框架,响应式编程已经在实际开发中广泛使用。响应式编程不一定是并发的,但它与并发编程联系紧密,因此本书介绍了响应式编程的基础知识。通常情况下,一个并发程序要使用多种技术。大多数程序至少使用了多线程(通过线程池)和异步编程。要大胆地把各种并发编程形式进行混合和匹配,在程序的各个部分使用合适的工具。
1.2 异步编程简介
异步编程有两大好处。第一个好处是对于面向终端用户的GUI 程序:异步编程提高了响应能力。我们都遇到过在运行时会临时锁定界面的程序,异步编程可以使程序在执行任务时仍能响应用户的输入。第二个好处是对于服务器端应用:异步编程实现了可扩展性。服务器应用可以利用线程池满足其可扩展性,使用异步编程后,可扩展性通常可以提高一个数量级。
现代的异步.NET 程序使用两个关键字:async 和await。async 关键字加在方法声明上,它的主要目的是使方法内的await 关键字生效(为了保持向后兼容,同时引入了这两个关键字)。如果async 方法有返回值,应返回Task;如果没有返回值,应返回Task。这些task 类型相当于future,用来在异步方法结束时通知主程序。
不要用void 作为async 方法的返回类型! async 方法可以返回void,但是这仅限于编写事件处理程序。一个普通的async 方法如果没有返回值,要返回Task,而不是void。
有了上述背景知识,我们来快速看一个例子:
async Task DoSomethingAsync()
{
int val = 13;
// 异步方式等待1 秒
await Task.Delay(TimeSpan.FromSeconds(1));
val *= 2;
// 异步方式等待1 秒
await Task.Delay(TimeSpan.FromSeconds(1));
Console.WriteLine(val);
}
和其他方法一样,async 方法在开始时以同步方式执行。在async 方法内部,await 关键字对它的参数执行一个异步等待。它首先检查操作是否已经完成,如果完成了,就继续运行(同步方式)。否则,它会暂停async 方法,并返回,留下一个未完成的task。一段时间后,操作完成,async 方法就恢复运行。
一个async 方法是由多个同步执行的程序块组成的,每个同步程序块之间由await 语句分隔。第一个同步程序块在调用这个方法的线程中运行,但其他同步程序块在哪里运行呢?情况比较复杂。最常见的情况是,用await 语句等待一个任务完成,当该方法在await 处暂停时,就可以捕捉上下文(context)。如果当前SynchronizationContext 不为空,这个上下文就是当前SynchronizationContext。如果当前SynchronizationContext 为空,则这个上下文为当前TaskScheduler。该方法会在这个上下文中继续运行。一般来说,运行UI 线程时采用UI 上下文,处理ASP.NET 请求时采用ASP.NET 请求上下文,其他很多情况下则采用线程池上下文。
因此,在上面的代码中,每个同步程序块会试图在原始的上下文中恢复运行。如果在UI线程中调用DoSomethingAsync,这个方法的每个同步程序块都将在此UI 线程上运行。但是,如果在线程池线程中调用,每个同步程序块将在线程池线程上运行。要避免这种错误行为, 可以在await 中使用ConfigureAwait 方法, 将参数continueOnCapturedContext 设为false。接下来的代码刚开始会在调用的线程里运行,在被await 暂停后,则会在线程池线程里继续运行:
async Task DoSomethingAsync()
{
int val = 13;
// 异步方式等待1 秒
await Task.Delay(TimeSpan.FromSeconds(1)).ConfigureAwait(false);//避免不同环境的上下文运行
val *= 2;
// 异步方式等待1 秒
await Task.Delay(TimeSpan.FromSeconds(1)).ConfigureAwait(false);//避免不同环境的上下文运行
Console.WriteLine(val.ToString());
}
最好的做法是,在核心库代码中一直使用ConfigureAwait。在外围的用户界面代码中,只在需要时才恢复上下文。
关键字await 不仅能用于任务,还能用于所有遵循特定模式的awaitable 类型。例如,Windows Runtime API 定义了自己专用的异步操作接口。这些接口不能转化为Task 类型,但确实遵循了可等待的(awaitable)模式,因此可以直接使用await。这种awaitable 类型在Windows 应用商店程序中更加常见,但是在大多数情况下,await 使用Task 或Task。
有两种基本的方法可以创建Task 实例。有些任务表示CPU 需要实际执行的指令,创建这种计算类的任务时,使用Task.Run(如需要按照特定的计划运行,则用TaskFactory.StartNew)。其他的任务表示一个通知(notification),创建这种基于事件的任务时,使用TaskCompletionSource。大部分I/O 型任务采用TaskCompletionSource。
使用async 和await 时,自然要处理错误。在下面的代码中,PossibleExceptionAsync 会抛出一个NotSupportedException 异常,而TrySomethingAsync 方法可很顺利地捕捉到这个异常。这个捕捉到的异常完整地保留了栈轨迹,没有人为地将它封装进TargetInvocationException 或AggregateException 类:
async Task TrySomethingAsync()
{
try
{
await PossibleExceptionAsync();
}
catch(NotSupportedException ex)
{
LogException(ex);
throw;
}
}
一旦异步方法抛出(或传递出)异常,该异常会放在返回的Task 对象中,并且这个Task对象的状态变为“已完成”。当await 调用该Task 对象时,await 会获得并(重新)抛出该异常,并且保留着原始的栈轨迹。因此,如果PossibleExceptionAsync 是异步方法,以下代码就能正常运行:
async Task TrySomethingAsync()
{
// 发生异常时,任务结束。不会直接抛出异常。
Task task = PossibleExceptionAsync();
try
{
//Task 对象中的异常,会在这条await 语句中引发
await task;
}
catch(NotSupportedException ex)
{
LogException(ex);
throw;
}
}
关于异步方法,还有一条重要的准则:你一旦在代码中使用了异步,最好一直使用。调用异步方法时,应该(在调用结束时)用await 等待它返回的task 对象。一定要避免使用Task.Wait 或Task.Result 方法,因为它们会导致死锁。参考一下下面这个方法:
async Task WaitAsync()
{
// 这里await会捕获当前上下文……
await Task.Delay(TimeSpan.FromSeconds(1));
// ……这里会试图用上面捕获的上下文继续执行
}
void Deadlock()
{
// 开始延迟
Task task = WaitAsync();
// 同步程序块,正在等待异步方法完成
task.Wait();
}
如果从UI 或ASP.NET 的上下文调用这段代码,就会发生死锁。这是因为,这两种上下文每次只能运行一个线程。Deadlock 方法调用WaitAsync 方法,WaitAsync 方法开始调用delay 语句。然后,Deadlock 方法(同步)等待WaitAsync 方法完成,同时阻塞了上下文线程。当delay 语句结束时,await 试图在已捕获的上下文中继续运行WaitAsync 方法,但这个步骤无法成功,因为上下文中已经有了一个阻塞的线程,并且这种上下文只允许同时运行一个线程。这里有两个方法可以避免死锁:在WaitAsync 中使用ConfigureAwait(false)(导致await 忽略该方法的上下文),或者用await 语句调用WaitAsync 方法(让Deadlock变成一个异步方法)。
如果使用了async,最好就一直使用它。
若想更全面地了解关于异步编程的知识,可参阅Alex Davies(O’Reilly)编写的Async inC# 5.0,这本书非常不错。另外,微软公司有关异步编程的在线文档也很不错,建议你至少读一读“async overview”和“Task-based Asynchronous Pattern(TAP) overview”这两篇。如果要深入了解,官方FAQ 和博客上也有大量的信息。
1.3 并行编程简介
如果程序中有大量的计算任务,并且这些任务能分割成几个互相独立的任务块,那就应该使用并行编程。并行编程可临时提高CPU 利用率,以提高吞吐量,若客户端系统中的CPU 经常处于空闲状态,这个方法就非常有用,但通常并不适合服务器系统。大多数服务器本身具有并行处理能力,例如ASP.NET 可并行地处理多个请求。某些情况下,在服务器系统中编写并行代码仍然有用(如果你知道并发用户数量会一直是少数)。但通常情况下,在服务器系统上进行并行编程,将降低本身的并行处理能力,并且不会有实际的好处。
并行的形式有两种:数据并行(data parallelism)和任务并行(task parallelism)。数据并行是指有大量的数据需要处理,并且每一块数据的处理过程基本上是彼此独立的。任务并行是指需要执行大量任务,并且每个任务的执行过程基本上是彼此独立的。任务并行可以是动态的,如果一个任务的执行结果会产生额外的任务,这些新增的任务也可以加入任务池。
实现数据并行有几种不同的做法。一种做法是使用Parallel.ForEach 方法,它类似于foreach 循环,应尽可能使用这种做法。在3.1 节将会详细介绍Parallel.ForEach 方法。Parallel 类也提供Parallel.For 方法,这类似于for 循环,当数据处理过程基于一个索引时,可使用这个方法。下面是使用Parallel.ForEach 的代码例子:
void RotateMatrices(IEnumerable<Matrix> matrices, float degrees)
{
Parallel.ForEach(matrices, matrix => matrix.Rotate(degrees));
}
另一种做法是使用PLINQ(Parallel LINQ), 它为LINQ 查询提供了AsParallel 扩展。跟PLINQ 相比,Parallel 对资源更加友好,Parallel 与系统中的其他进程配合得比较好, 而PLINQ 会试图让所有的CPU 来执行本进程。Parallel 的缺点是它太明显。很多情况下,PLINQ 的代码更加优美。PLINQ 在3.5 节有详细介绍:
IEnumerable<bool> PrimalityTest(IEnumerable<int> values)
{
return values.AsParallel().Select(val => IsPrime(val));
}
不管选用哪种方法,在并行处理时有一个非常重要的准则。
每个任务块要尽可能的互相独立。
只要任务块是互相独立的,并行性就能做到最大化。一旦你在多个线程中共享状态,就必须以同步方式访问这些状态,那样程序的并行性就变差了。第11 章将详细讲述同步。有多种方式可以控制并行处理的输出。可以把结果存在某些并发集合,或者对结果进行聚合。聚合在并行处理中很常见,Parallel 类的重载方法,也支持这种map/reduce 函数。关于聚合的详细内容在3.2 节。
下面讲任务并行。数据并行重点在处理数据,任务并行则关注执行任务。
Parallel 类的Parallel.Invoke 方法可以执行“分叉/ 联合”(fork/join)方式的任务并行。
3.3 节将详细介绍这个方法。调用该方法时,把要并行执行的委托(delegate)作为传入参数:
void ProcessArray(double[] array)
{
Parallel.Invoke(
() => ProcessPartialArray(array, 0, array.Length / 2),
() => ProcessPartialArray(array, array.Length / 2, array.Length)
);
}
void ProcessPartialArray(double[] array, int begin, int end)
{
// CPU 密集型的操作……
}
现在Task 这个类也被用于异步编程,但当初它是为了任务并行而引入的。任务并行中使用的一个Task 实例表示一些任务。可以使用Wait 方法等待任务完成,还可以使用Result和Exception 属性来检查任务执行的结果。直接使用Task 类型的代码比使用Parallel 类要复杂,但是,如果在运行前不知道并行任务的结构,就需要使用Task 类型。如果使用动态并行机制,在开始处理时,任务块的个数是不确定的,只有继续执行后才能确定。通常情况下,一个动态任务块要启动它所需的所有子任务,然后等待这些子任务执行完毕。为实现这个功能,可以使用Task 类型中的一个特殊标志:TaskCreationOptions.AttachedToParent。动态并行机制在3.4 节中详述。
跟数据并行一样,任务并行也强调任务块的独立性。委托(delegate)的独立性越强,程序的执行效率就越高。在编写任务并行程序时,要格外留意下闭包(closure)捕获的变量。记住闭包捕获的是引用(不是值),因此可以在结束时以不明显地方式地分享这些变量。对所有并行处理类型来讲,错误处理的方法都差不多。由于操作是并行执行的,多个异常就会同时发生,系统会把这些异常封装在AggregateException 类里,在程序中抛给代码。这一特点对所有方法都是一样的,包括Parallel.ForEach、Paralle.lInvoke、Task.Wait 等。AggregateException 类型有几个实用的Flatten 和Handle 方法,用来简化错误处理的代码:
try
{
Parallel.Invoke(
() => { throw new Exception(); },
() => { throw new Exception(); }
);
}
catch (AggregateException ex)
{
ex.Handle(exception =>
{
Console.WriteLine(exception);
return true; // “已经处理”
});
}
通常情况下,没必要关心线程池处理任务的具体做法。数据并行和任务并行都使用动态调整的分割器,把任务分割后分配给工作线程。线程池在需要的时候会增加线程数量。线程池线程使用工作窃取队列(work-stealing queue)。微软公司为了让每个部分尽可能高效,做了很多优化。要让程序得到最佳的性能,有很多参数可以调节。只要任务时长不是特别短,采用默认设置就会运行得很好。
任务不要特别短,也不要特别长。
如果任务太短,把数据分割进任务和在线程池中调度任务的开销会很大。如果任务太长,线程池就不能进行有效的动态调整以达到工作量的平衡。很难确定“太短”和“太长”的判断标准,这取决于程序所解决问题的类型以及硬件的性能。根据一个通用的准则,只要没有导致性能问题,我会让任务尽可能短(如果任务太短,程序性能会突然降低)。更好的做法是使用Parallel 类型或者PLINQ,而不是直接使用任务。这些并行处理的高级形式,自带有自动分配任务的算法(并且会在运行时自动调整)。
要更深入的了解并行编程,这方面最好的书是Colin Campbell 等人编写的Parallel Programming
with Microsoft.NET(微软出版社)。
1.4 响应式编程简介
跟并发编程的其他形式相比,响应式编程的学习难度较大。如果对响应式编程不是非常熟悉,代码维护相对会更难一点。一旦你学会了,就会发现响应式编程的功能特别强大。响应式编程可以像处理数据流一样处理事件流。根据经验,如果事件中带有参数,那么最好采用响应式编程,而不是常规的事件处理程序。响应式编程基于“可观察的流”(observable stream)这一概念。你一旦申请了可观察流,就可以收到任意数量的数据项(OnNext),并且流在结束时会发出一个错误(OnError)或一个
“流结束”的通知(OnCompleted)。有些可观察流是不会结束的。实际的接口就像这样:
interface IObserver<in T>
{
void OnNext(T item);
void OnCompleted();
void OnError(Exception error);
}
interface IObservable<out T>
{
IDisposable Subscribe(IObserver<T> observer);
}
不过,开发人员不需要实现这些接口。微软的Reactive Extensions(Rx)库已经实现了所有接口。响应式编程的最终代码非常像LINQ,可以认为它就是“LINQ to events”。下面的代码中,前面是我们不熟悉的操作符(Interval 和Timestamp),最后是一个Subscribe,但是中间部分是我们在LINQ 中熟悉的操作符:Where 和Select。LINQ 具有的特性,Rx也都有。Rx 在此基础上增加了很多它自己的操作符,特别是与时间有关的操作符:
Observable.Interval(TimeSpan.FromSeconds(1))
.Timestamp()
.Where(x => x.Value % 2 == 0)
.Select(x => x.Timestamp)
.Subscribe(x => Trace.WriteLine(x));
上面的代码中,首先是一个延时一段时间的计数器(Interval),随后、后为每个事件加了一个时间戳(Timestamp)。接着对事件进行过滤,只包含偶数值(Where),选择了时间
戳的值(Timestamp),然后当每个时间戳值到达时,把它输入调试器(Subscribe)。如果
没有理解上述新的操作符(例如Interval),不要紧,我们会在后面讲述。现在只要记住
这是一个LINQ 查询,与你以前见过的LINQ 查询很类似。主要区别在于:LINQ to Object
和LINQ to Entity 使用“拉取”模式,LINQ 的枚举通过查询拉出数据。而LINQ to event(Rx)使用“推送”模式,事件到达后就自行穿过查询。
可观察流的定义和其订阅是互相独立的。上面最后一个例子与下面的代码等效:
IObservable<DateTimeOffset> timestamps =Observable.Interval(TimeSpan.FromSeconds(1))
.Timestamp()
.Where(x => x.Value % 2 == 0)
.Select(x => x.Timestamp);
timestamps.Subscribe(x => Console.WriteLine(x));
一种常规的做法是把可观察流定义为一种类型,然后将其作为IObservable 资源使用。
其他类型可以订阅这些流,或者把这些流与其他操作符组合,创建另一个可观察流。
Rx 的订阅也是一个资源。Subscribe 操作符返回一个IDisposable,即表示订阅完成。当你
响应了那个可观察流,就得处理这个订阅。对于hot observable(热可观察流)和cold observable(冷可观察流)这两种对象,订阅的做法各有不同。一个hot observable 对象是指一直在发生的事件流,如果在事件到达时没有订阅者,事件就丢失了。例如,鼠标的移动就是一个hot observable 对象。old observable 对象是始终没有输入事件(不会主动产生事件)的观察流,它只会通过启动一个事件队列来响应订阅。例如,HTTP 下载是一个cold observable 对象,只有在订阅后才会发出HTTP 请求。
同样,所有Subscribe 操作符都需要有处理错误的参数。前面的例子没有错误处理参数。
下面则是一个更好的例子,在可观察流发生错误时,它能正确处理:
Observable.Interval(TimeSpan.FromSeconds(1))
.Timestamp()
.Where(x => x.Value % 2 == 0)
.Select(x => x.Timestamp)
.Subscribe(x => Console.WriteLine(x), ex => Console.WriteLine(ex));
在进行Rx 实验性编程时,Subject 这个类型很有用。这个“subject”就像手动实现一
个可观察流。可以在代码中调用OnNext、OnError 和OnCompleted,这个subject 会把这些
调用传递给订阅者。Subject 用于实验时效果非常不错,但在实际产品开发时,应该使
用第5 章介绍的操作符。
Rx 的操作符非常多,本书只介绍了一部分。想了解关于Rx 的更多信息,建议阅读优秀的
在线图书Introduction to Rx。
1.5 数据流简介
TPL 数据流很有意思,它把异步编程和并行编程这两种技术结合起来。如果需要对数据进行一连串的处理,TPL 数据流就很有用。例如,需要从一个URL 上下载数据,接着解析数据,然后把它与其他数据一起做并行处理。TPL 数据流通常作为一个简易的管道,数据从管道的一端进入,在管道中穿行,最后从另一端出来。不过,TPL 数据流的功能比普通管道要强大多了。对于处理各种类型的网格(mesh),在网格中定义分叉(fork)、连接(join)、循环(loop)的工作,TPL 数据流都能正确地处理。当然了,大多数时候TPL 数据流网格还是被用作管道。
数据流网格的基本组成单元是数据流块(dataflow block)。数据流块可以是目标块(接收数据)或源块(生成数据),或两者皆可。源块可以连接到目标块,创建网格。连接的具体内容在4.1 节介绍。数据流块是半独立的,当数据到达时,数据流块会试图对数据进行处理,并且把处理结果推送给下一个流程。使用TPL 数据流的常规方法是创建所有的块,再把它们链接起来,然后开始在一端填入数据。然后,数据会自行从另一端出来。再强调一次,数据流的功能比这要强大得多,数据穿过的同时,可能会断开连接、创建新的块并加入到网格,不过这是非常高级的使用场景。
目标块带有缓冲区,用来存放收到的数据。因此,在还来不及处理数据的时候,它仍能接收新的数据项,这就让数据可以持续地在网格上流动。在有分叉的情况下,一个源块链接了两个目标块,这种缓冲机制就会产生问题。当源块有数据需要传递下去时,它会把数据传给与它链接的块,并且一次只传一个数据。默认情况下,第一个目标块会接收数据并缓存起来,而第二个目标块就收不到任何数据。解决这个问题的方法是把目标块设置为“非贪婪”模式,以限制缓冲区的数量,这部分将在4.4 节介绍。
如果某些步骤出错,例如委托在处理数据项时抛出异常,数据流块就会出错。数据流块出错后就会停止接收数据。默认情况下,一个块出错不会摧毁整个网格。这让程序有能力重建部分网格,或者对数据重新定向。然而这是一个高级用法。通常来讲,你是希望这些错误通过链接传递给目标块。数据流也提供这个选择,唯一比较难办的地方是当异常通过链接传递时,它就会被封装AggregateException 类中。因此,如果管道很长,最后异常的嵌套层次会非常多,这时就可以使AggregateException.Flatten 方法:
try
{
var multiplyBlock = new TransformBlock<int, int>(item =>
{
if (item == 1)
throw new InvalidOperationException("Blech.");
return item * 2;
});
var subtractBlock = new TransformBlock<int, int>(item => item - 2);
multiplyBlock.LinkTo(subtractBlock,new DataflowLinkOptions { PropagateCompletion = true });
multiplyBlock.Post(1);
subtractBlock.Completion.Wait();
}
catch (AggregateException exception)
{
AggregateException ex = exception.Flatten();
Console.WriteLine(ex.InnerException);
}
数据流错误的处理方法将在4.2 节详细介绍。
数据流网格给人的第一印象是与可观察流非常类似,实际上它们确实有很多共同点。网格和流都有“数据项”这一概念,数据项从网格或流的中间穿过。还有,网格和流都有“正常完成”(表示没有更多数据需要接收时发出的通知)和“不正常完成”(在处理数据中发生错误时发出的通知)这两个概念。但是,Rx 和TPL 数据流的性能并不相同。如果执行并发编程概述 需要计时的任务,最好使用Rx 的observable 对象,而不是数据流块。如果进行并行处理,最好使用数据流块,而不是Rx 的observable 对象。从概念上说,Rx 更像是建立回调函数:observable 对象中的每个步骤都会直接调用下一步。相反,数据流网格中的每一块都是互相独立的。Rx 和TPL 数据流有各自的应用领域,也有一些交叉的领域。另一方面,Rx 和TPL 数据流也非常适合同时使用。Rx 和TPL 数据流的互操作性将在7.7 节详细介绍。
最常用的块类型有TransformBlock( 与LINQ 的Select 类似)、
TransformManyBlock(与LINQ 的SelectMany 类似)和ActionBlock
(为每个数据项运行一个委托)。要了解TPL 数据流的更多知识,建议阅读MSDN 的文档
和Guide to Implementing Custom TPL Dataflow Blocks。
1.6 多线程编程简介
线程是一个独立的运行单元,每个进程内部有多个线程,每个线程可以各自同时执行指令。每个线程有自己独立的栈,但是与进程内的其他线程共享内存。对某些程序来说,其中有一个线程是特殊的,例如用户界面程序有一个UI 线程,控制台程序有一个main 线程。
每个.NET 程序都有一个线程池,线程池维护着一定数量的工作线程,这些线程等待着执行分配下来的任务。线程池可以随时监测线程的数量。配置线程池的参数多达几十个,但是建议采用默认设置,线程池的默认设置是经过仔细调整的,适用于绝大多数现实中的应用场景。
应用程序几乎不需要自行创建新的线程。你若要为COM interop 程序创建SAT 线程,就得
创建线程,这是唯一需要线程的情况。
线程是低级别的抽象,线程池是稍微高级一点的抽象,当代码段遵循线程池的规则运行时,线程池就会在需要时创建线程。本书介绍的技术抽象级别更高:并行和数据流的处理队列会根据情况遵循线程池运行。抽象级别更高,正确代码的编写就更容易。基于这个原因,本书根本不介绍Thread 和BackgroundWorker 这两种类型。它们曾经非常流行,但那个时代已经过去了。
1.7 并发编程的集合
并发编程所用到的集合有两类:并发集合和不可变集合。这两种类别的集合将在第8 章详细介绍。多个线程可以用安全的方式同时更新并发集合。大多数并发集合使用快照(snapshot),当一个线程在增加或删除数据时,另一个线程也能枚举数据。比起给常规集合加锁以保护数据的方式,采用并发集合的方式要高效得多。
不可变集合则有些不同。不可变集合实际上是无法修改的。要修改一个不可变集合,需要建立一个新的集合来代表这个被修改了的集合。这看起来效率非常低,但是不可变集合的各个实例之间尽可能多地共享存储区,因此实际上效率没想象得那么差。不可变集合的优点之一,就是所有的操作都是简洁的,因此特别适合在函数式代码中使用。
1.8 现代设计
大多数并发编程技术有一个类似点:它们本质上都是函数式(functional)的。这里
“functional”的意思不是“实用,能完成任务”1,而是把它作为一种基于函数组合的编程模
式。如果你接受函数式的编程理念,并发编程的设计就会简单得多。
函数式编程的一个原则就是简洁(换言之,就是避免副作用)。解决方案中的每一个片段
都用一些值作为输入,生成一些值作为输出。应该尽可能避免让这些段落依赖于全局(或
共享)变量,或者修改全局(或共享)数据结构。不论这个片段是异步方法、并行任务、
Rx 操作还是数据流块,都应该这么做。当然了,具体做法迟早会受到计算内容的影响,
但如果能用简洁的段落来处理,然后用结果来执行更新,代码就会更加清晰。
函数式编程的另一个原则是不变性。不变性是指一段数据是不能被修改的。在并发编程中
使用不可变数据的原因之一,是程序永远不需要对不可变数据进行同步。数据不能修改,
这一事实让同步变得没有必要。不可变数据也能避免副作用。在编写本书时(2014 年),
虽然不可变数据还没有被广泛接受,但本书中有几节会介绍不可变数据结构。
1.9 技术要点总结
在.NET 刚推出时,就对异步编程提供了一定的支持。但是异步编程一直是很难的,直到2012 年.NET 4.5(同时发布C# 5.0 和VB 2012)引入async 和await 这两个关键字。本书中的异步编程方法,将全部采用现代的async/await。同时介绍一些方法,来实现async 和老式异步编程模式的交互。要支持老式平台的话,需要下载NuGet 包Microsoft.Bcl.Async。
不要在基于.NET 4.0 的ASP.NET 代码中使用Microsoft.Bcl.Async 进行异步编程!在.NET 中,ASP.NET 管道已经进行修改以支持async。对于异步ASP.NET 项目,必须使用.NET 4.5 或更高版本。
.NET 4.0 引入了并行任务库(TPL),完全支持数据并行和任务并行。但是一些资源较少的平台(例如手机),通常不支持TPL。TPL 是.NET 框架自带的。
Reactive Extensions 团队已经让它尽可能多地支持多种平台。Reactive Extensions 和async、await 一样,对所有类型的应用都有好处,包括客户端和服务器端应用。Rx 在NuGet 包
Rx-Main 中。TPL 数据流库只支持较新的平台,它的官方版本在NuGet 包Microsoft.Tpl.Dataflow 中。并发编程的集合是.NET 框架的一部分,但是不可变集合在NuGet 包Microsoft.Bcl.Immutable 中。表1-1 列出了各主流平台对各种技术的支持情况。