《C++ Concurrencyin Action》第8章--并发代码设计

简介: 《C++ Concurrencyin Action》第8章--并发代码设计

本章主要内容

· 线程间划分数据的技术

· 影响并发代码性能的因素

· 性能因素是如何影响数据结构的设计

· 多线程代码中的异常安全

· 可扩展性

· 并行算法的实现

之前章节着重于介绍使用C++11中的新工具来写并发代码。在第6、7章中我们了解到,如何使用这些工具来设计可并发访问的基本数据结构。这就好比一个木匠,其不仅要知道如何做一个合页,一个组合柜,或一个桌子;并发的代码的使用,要比使用/设计基本数据结构频繁的多。要将眼界放宽,就需要构建更大的结构,进行高效的工作。我将使用多线程化的C++标准库算法作为例子,不过这里的原则也适用于对其他应用程序的扩展。

认真思考如何进行并发化设计,对于每个编程项目来说都很重要。不过,写多线程代码的时候,需要考虑的因素比写序列化代码多得多。不仅包括一般性因素,例如:封装,耦合和聚合(这些在很多软件设计书籍中有很详细的介绍),还要考虑哪些数据需要共享,如何同步访问数据,哪些线程需要等待哪些线程,等等。

本章将会关注这些问题,从高层(但也是基本的)考虑,如何使用线程,哪些代码应该在哪些线程上执行;以及,这将如何影响代码的清晰度,并从底层细节上了解,如何构建共享数据来优化性能。

那么就先来看一下,如何在线程间划分工作。

8.1线程间划分工作的技术****

试想,你被要求负责建造一座房子。为了完成任务,你需要挖地基、砌墙、添加水暖、接入电线,等等。理论上,如果你很擅长建造屋子,那么这些事情都可以由你来完成,但是这样就要花费很长很长时间,并且需要不断的切换任务。或者,你可以雇佣一些人来帮助你完成房子的建造。那么现在你需要决定雇多少人,以及雇佣人员具有什么样的技能。比如,你可以雇几个人,这几个人什么都会。现在你还得不断的切换任务,不过因为雇佣了很多人,就要比之前的速度快很多。

或者,你可以雇佣一个包工队(专家组),由瓦工,木匠,电工和水管工组成。你的包工队员只做其擅长的,所以当没有水暖任务时,水管工会坐在那里休息,喝茶或咖啡。因为人多的缘故,要比之前一个人的速度快很多,并且水管工在收拾厕所的时候,电工可以将电线连接到厨房,不过当没有属于自己的任务时,有人就会休息。即使有人在休息,你可能还是能感觉到包工队要比雇佣一群什么都会的人快。包工队不需要更换工具,并且每个人的任务都要比会的人做的快。是快还是慢,取决于特定的情况——需要尝试,进行观察。

即使雇佣包工队,你依旧可以选择人数不同的团队(可能在一个团队中,瓦工的数量超过电工)。同样,这会是一种补足,并且在建造不止一座房子的时候,会改变整体效率。即使水管工没有太多的任务,在建造过一次房子后,你依旧能让他总是处于忙碌的状态。当包工队无事可做的时候,你是不会给他们钱的;即使每次工作只有那么几个人工作,你还需要负担整个团队的开销。

建造例子已经足够说明问题;这与线程所做的事情有什么关系呢?好吧,这些问题也会发生在线程上。你需要决定使用多少个线程,并且这些线程应该去做什么。还需要决定是使用“全能”的线程去完成所有的任务,还是使用“专业”线程只去完成一件事情,或将两种方法混合。使用并发的时候,需要作出诸多选择来驱动并发,这里的选择会决定代码的性能和清晰度。因此,这里的选择至关重要,所以在你设计应用程序的结构时,再作出适当的决定。在本节中,将看到很多划分任务的技术,就先从线程间划分数据开始吧!

8.1.1** 在线程处理前对数据进行划分**

最简单的并行算法,就是并行化的std::for_each,其会对一个数据集中每个元素执行同一个操作。为了并行化该算法,可以为数据集中每个元素分配一个处理线程。如何划分才能获得最佳的性能,很大程度上取决于数据结构实现的细节,在之后有关性能问题的章节会再提及此问题。

最简单的分配方式:第一组N个元素分配一个线程,下一组N个元素再分配一个线程,以此类推,如图8.1所示。不管数据怎么分,每个线程都会对分配给它的元素进行操作,不过并不会和其他线程进行沟通,直到处理完成。

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-s9j6jLrd-1657616056592)(file:///C:/Users/sunx/AppData/Local/Temp/msohtmlclip1/01/clip_image002.gif)]

图8.1 向线程分发连续的数据块

使用过MPI(Message PassingInterface)[1]和OpenMP[2]的人对这个结构一定很熟悉:一项任务被分割成多个,放入一个并行任务集中,执行线程独立的执行这些任务,结果在会有主线程中合并。这种方式在2.4节中的accumulate的例子中使用过了;在这个例子中,所有并行任务和主线程的任务都是累加和。对于for_each来说,主线程将无事可做,因为这个计算不需要最终处理。

最后一步对于并行程序来说十分重要;如清单2.8中那样原始的实现,最后一步就是一个串行的。不过,这一步同样也是能被并行化的;accumulate实际上是一个递减操作,所以清单2.8中,当线程数量大于一个线程上最小处理项时,可以对accumulate进行递归调用。或者,工作线程就像做一个完整的任务一样,对步骤进行递减,而非每次都产生新的线程。

虽然这个技术十分强大,但是并不是哪都适用。有时不能像之前那样,对任务进行整齐的划分,因为只有对数据进行处理后,才能进行明确的划分。这里特别适用了递归算法,就像快速排序;下面就来看看这种特别的方式。

8.1.2** 递归划分**

快速排序有两个最基本的步骤:将数据划分到中枢元素之前或之后,然后对中枢元素之前和之后的两半数组再次进行快速排序。这里不能通过对数据的简单划分达到并行,因为,只有在一次排序结束后,才能知道哪些项在中枢元素之前和之后。当要对这种算法进行并行化,很自然的会想到使用递归。每一级的递归都会多次调用quick_sort函数,因为需要知道哪些元素在中枢元素之前和之后。递归调用是完全独立的,因为其访问的是不同的数据集,并且每次迭代都能并发执行。图8.2展示了这样的递归划分。

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-sYI5N1Zs-1657616056593)(file:///C:/Users/sunx/AppData/Local/Temp/msohtmlclip1/01/clip_image004.gif)]

图 8.2 递归划分数据

在第4章中,已经见过这种实现。比起对大于和小于的数据块递归调用函数,使用std::async()可以为每一级生成小于数据块的异步任务。使用std::async()时,C++线程库就能决定何时让一个新线程执行任务,以及同步执行任务。

重要的是:对一个很大的数据集进行排序时,当每层递归都产生一个新线程,最后就会产生大量的线程。你会看到其对性能的影响,如果有太多的线程存在,那么你的应用将会运行的很慢。如果数据集过于庞大,会将线程耗尽。那么在递归的基础上进行任务的划分,就是一个不错的主意;你只需要将一定数量的数据打包后,交给线程即可。std::async()可以出里这种简单的情况,不过这不是唯一的选择。

另一种选择是使用std::thread::hardware_concurrency()函数来确定线程的数量,就像在清单2.8中的并行版accumulate()一样。然后,你可以将已排序的数据推到线程安全的栈上(如第6、7章中提及的栈)。当线程无所事事,不是已经完成对自己数据块的梳理,就是在等待一组排序数据的产生;线程可以从栈上获取这组数据,并且对其排序。

下面的代码就是使用以上方式进行的实现。

清单8.1 使用栈的并行快速排序算法——等待数据块排序

template
struct sorter // 1
{
struct chunk_to_sort
{
std::list<T> data;
std::promise promise;
};
thread_safe_stack chunks; // 2
std::vectorstd::threadthreads; // 3
unsigned const max_thread_count;
std::atomicend_of_data;
sorter():max_thread_count(std::thread::hardware_concurrency()-1),end_of_data(false){}
~sorter() //** 4**
{
end_of_data=true;  //** 5**
for(unsignedi=0;i<threads.size();++i)
{
  threads[i].join();  // **6**
}
}
void try_sort_chunk()
{
boost::shared_ptr chunk=chunks.pop(); // 7
if(chunk)
{
 sort_chunk(chunk);  // **8**
}
}
std::listdo_sort(std::list& chunk_data) // 9
{
if(chunk_data.empty())
{
  return chunk_data;
}
std::list<T> result;
result.splice(result.begin(),chunk_data,chunk_data.begin());
T const& partition_val=*result.begin();
//** 10**
typenamestd::list<T>::iterator divide_point= 
  std::partition(chunk_data.begin(),chunk_data.end(), [&](T const&val){return val<partition_val;});
chunk_tosort new_lower_chunk;
new_lower_chunk.data.splice(new_lower_chunk.data.end(),chunk_data,chunk_data.begin(),divide_point)
std::futurenew_lower=new_lower_chunk.promise.get_future();
chunks.push(std::move(new_lower_chunk)); // 11
if(threads.size()<max_thread_count)  //** 12**
{
 threads.push_back(std::thread(&sorter<T>::sort_thread,this));
}
std::list<T>new_higher(do_sort(chunk_data));
result.splice(result.end(),new_higher);
while(new_lower.wait_for(std::chrono::seconds(0)) !=std::future_status::ready) //** 13**
{
  try_sort_chunk();  // **14**
}

result.splice(result.begin(),new_lower.get());

return result;
}
voidsort_chunk(boost::shared_ptr const& chunk)
{
chunk->promise.set_value(do_sort(chunk->data)); //** 15**
}
void sort_thread()
{
while(!end_of_data)  // **16**
{
  try_sort_chunk();  // **17**
 std::this_thread::yield();  // **18**
}
}
};
template
std::list parallel_quick_sort(std::list input) // 19
{
if(input.empty())
{
return input;
}
sorter s;
return s.do_sort(input); // 20
}

这里,parallel_quick_sort函数⑲代表了sorter类①的功能,其支持在栈上简单的存储无序数据块②,并且对线程进行设置③。do_sort成员函数⑨主要做的就是对数据进行划分⑩。相较于对每一个数据块产生一个新的线程,这次会将这些数据块推到栈上⑪;并在有备用处理器⑫的时候,产生新线程。因为小于部分的数据块可能由其他线程进行处理,那么就得等待这个线程完成⑬。为了让所有事情顺利进行(只有一个线程和其他所有线程都忙碌时),当线程处于等待状态时⑭,就让当前线程尝试处理栈上的数据。try_sort_chunk只是从栈上弹出一个数据块⑦,并且对其进行排序⑧,将结果存在promise中,让线程对已经存在于栈上的数据块进行提取⑮。

当end_of_data没有被设置时⑯,新生成的线程还在尝试从栈上获取需要排序的数据块⑰。在循环检查中,也要给其他线程机会⑱,可以从栈上取下数据块进行更多的操作。这里的实现依赖于sorter类④对线程的清理。当所有数据都已经排序完成,do_sort将会返回(即使还有工作线程在运行),所以主线程将会从parallel_quick_sort⑳中返回,在这之后会销毁sorter对象。析构函数会设置end_of_data标志⑤,以及等待所有线程完成工作⑥。标志的设置将终止线程函数内部的循环⑯。

在这个方案中,不用为spawn_task产生的无数线程所困扰,并且也不用再依赖C++线程库,为你选择执行线程的数量(就像std::async()那样)。该方案制约线程数量的值就是std::thread::hardware_concurrency()的值,这样就能避免任务过于频繁的切换。不过,这里还有两个问题:线程管理和线程通讯。要解决这两个问题就要增加代码的复杂程度。虽然,线程对数据项是分开处理的,不过所有对栈的访问,都可以向栈添加新的数据块,并且移出数据块以作处理。这里重度的竞争会降低性能(即使使用无锁(无阻塞)栈),原因将会在后面提到。

这个方案使用到了一个特殊的线程池——所有线程的任务都来源于一个等待链表,然后线程会去完成任务,完成任务后会再来链表提取任务。这个线程池很有问题(包括对工作链表的竞争),这个问题的解决方案将在第9章提到。关于多处理器的问题,将会在本章后面的章节中做出更为详细的介绍(详见8.2.1)。

几种划分方法:1,处理前划分;2,递归划分(都需要事先知道数据的长度固定),还有上面的那种划分方式。事情并非总是这样好解决;当数据是动态生成,或是通过外部输入,那么这里的办法就不适用了。在这种情况下,基于任务类型的划分方式,就要好于基于数据的划分方式。

8.1.3** 通过任务类型划分工作**

虽然为每个线程分配不同的数据块,但工作的划分(无论是之前就划分好,还是使用递归的方式划分)仍然在理论阶段,因为这里每个线程对每个数据块的操作是相同的。而另一种选择是让线程做专门的工作,也就是每个线程做不同的工作,就像水管工和电工在建造一所屋子的时候所做的不同工作那样。线程可能会对同一段数据进行操作,但它们对数据进行不同的操作。

对分工的排序,也就是从并发分离关注结果;每个线程都有不同的任务,这就意味着真正意义上的线程独立。其他线程偶尔会向特定线程交付数据,或是通过触发事件的方式来进行处理;不过总体而言,每个线程只需要关注自己所要做的事情即可。其本身就是基本良好的设计,每一段代码只对自己的部分负责。

分离关注

当有多个任务需要持续运行一段时间,或需要及时进行处理的事件(比如,按键事件或传入网络数据),且还有其他任务正在运行时,单线程应用采用的是单职责原则处理冲突。单线程的世界中,代码会执行任务A(部分)后,再去执行任务B(部分),再检查按钮事件,再检查传入的网络包,然后在循环回去,执行任务A。这将会使得任务A复杂化,因为需要存储完成状态,以及定期从主循环中返回。如果在循环中添加了很多任务,那么程序将运行的很慢;并且用户会发现,在他/她按下按键后,很久之后才会有反应。我确定你已经在一些程序中见过这种情况:你给程序分配一项任务后,发现接口会封锁,直到这项任务完成。

当使用独立线程执行任务时,操作系统会帮你处理接口问题。在执行任务A时,线程可以专注于执行任务,而不用为保存状态从主循环中返回。操作系统会自动保存状态,当需要的时候,将线程切换到任务B或任务C。如果目标系统是带有多核或多个处理器,任务A和任务B可很可能真正的并发执行。这样处理按键时间或网络包的代码,就能及时执行了。所有事情都完成的很好,用户得到了及时的响应;当然,作为开发者只需要写具体操作的代码即可,不用再将控制分支和使用用户交互混在一起了。

听起来不错,玫瑰色的愿景呀。事实真像上面所说的那样简单?一切取决于细节。如果每件事都是独立的,那么线程间就不需要交互,这样的话一切都很简单了。不幸的是,现实没那么美好。后台那些优雅的任务,经常会被用户要求做一些事情,并且它们需要通过更新用户接口的方式,来让用户知道它们完成了任务。或者,用户可能想要取消任务,这就需要用户向接口发送一条消息,告知后台任务停止运行。这两种情况都需要认真考虑,设计,以及适当的同步,不过这里担心的部分还是分离的。用户接口线程只能处理用户接口,当其他线程告诉该线程要做什么时,用户接口线程会进行更新。同样,后台线程只运行它们所关注的任务;只是,有时会发生“允许任务被其他线程所停止”的情况。在这两种情况下,后台线程需要照顾来自其他线程的请求,线程本身只知道它们请求与自己的任务有所关联。

多线程下有两个危险需要分离关注。第一个是对错误担忧的分离,主要表现为线程间共享着很多的数据,或者不同的线程要相互等待;这两种情况都是因为线程间很密切的交互。当这种情况发生,就需要看一下为什么需要这么多交互。当所有交互都有关于同样的问题,就应该使用单线程来解决,并将引用同一原因的线程提取出来。或者,当有两个线程需要频繁的交流,且没有其他线程时,那么就可以将这两个线程合为一个线程。

当通过任务类型对线程间的任务进行划分时,不应该让线程处于完全隔离的状态。当多个输入数据集需要使用同样的操作序列,可以将序列中的操作分成多个阶段,来让每个线程执行。

划分任务序列

当任务会应用到相同操作序列,去处理独立的数据项时,就可以使用流水线(pipeline)系统进行并发。这好比一个物理管道:数据流从管道一端进入,在进行一系列操作后,从管道另一端出去。

使用这种方式划分工作,可以为流水线中的每一阶段操作创建一个独立线程。当一个操作完成,数据元素会放在队列中,以供下一阶段的线程提取使用。这就允许第一个线程在完成对于第一个数据块的操作,并要对第二个数据块进行操作时,第二个线程可以对第一个数据块执行管线中的第二个操作。

这就是在线程间划分数据的一种替代方案(如8.1.1描述);这种方式适合于操作开始前,且对输入数据处长度不清楚的情况。例如,数据来源可能是从网络,或者可能是通过扫描文件系统来确定要处理的文件。

流水线对于队列中耗时的操作处理的也很合理;通过对线程间任务的划分,就能对应用的性能所有改善。假设有20个数据项,需要在四核的机器上处理,并且每一个数据项需要四个步骤来完成操作,每一步都需要3秒来完成。如果你将数据分给了四个线程,那么每个线程上就有5个数据项要处理。假设在处理的时候,没有其他线程对处理过程进行影响,在12秒后4个数据项处理完成,24秒后8个数据项处理完成,以此类推。当20个数据项都完成操作,就需要1分钟的时间。在管线中就会完全不同。四步可以交给四个内核。那么现在,第一个数据项可以被每一个核进行处理,所以其还是会消耗12秒。的确,在12秒后你就能得到一个处理过的数据项,这相较于数据划分并没有好多少。不过,当流水线流动起来,事情就会不一样了;在第一个核处理第一个数据项后,数据项就会交给下一个内核,所以第一个核在处理完第一个数据项后,其还可以对第二个数据项进行处理。那么在12秒后,每3秒将会得到一个已处理的数据项,这就要好于每隔12秒完成4个数据项。

为什么整批处理的时间要长于流水线呢?因为你需要在最终核开始处理第一个元素前等待9秒。更平滑的操作,能在某些情况下获益更多。考虑如下情况:当一个系统用来播放高清数字视频。为了让视频能够播放,你至少要保证25帧每秒的解码速度。同样的,这些图像需要有均匀的间隔,才会给观众留有连续播放的感觉;一个应用可以在1秒解码100帧,不过在解完就需要暂停1s的时候,这个应用就没有意义了。另一方面,观众能接受在视频开始播放的时候有一定的延迟。这种情况,并行使用流水线就能得到稳定的解码率。

看了这么多线程间划分工作的技术,接下来让我们来看一下在多线程系统中有哪些因素会影响性能,并且这些因素是如何影响你选择划分方案的。

8.2影响并发代码性能的因素****

在多处理系统中,使用并发的方式来提高代码的效率时,你需要了解一下有哪些因素会影响并发的效率。即使已经使用多线程对关注进行分离,还需要确定是否会对性能造成负面影响。因为,在崭新16核机器上应用的速度与单核机器相当时,用户是不会打死你的。

之后你会看到,在多线程代码中有很多因素会影响性能——对线程处理的数据做一些简单的改动(其他不变),都可能对性能产生戏剧性的效果。所以,多言无益,让我们来看一下这些因素吧,从明显的开始:目标系统有多少个处理器?

8.2.1** 有多少个处理器?**

处理器个数是影响多线程应用的首要因素。在某些情况下,你对目标硬件会很熟悉,并且针对硬件进行设计,并在目标系统或副本上进行测量。如果是这样,那你很幸运;不过,要知道这些都是很奢侈的。你可能在一个类似的平台上进行开发,不过你所使用的平台与目标平台的差异很大。例如,你可能会在一个双芯或四芯的系统上做开发,不过你的用户系统可能就只有一个处理器(可能有很多芯),或多个单芯处理器,亦或是多核多芯的处理器。在不同的平台上,并发程序的行为和性能特点就可能完全不同,所以你需要仔细考虑那些地方会被影响到,如果会被影响,就需要在不同平台上进行测试。

一个单核16芯的处理器和四核双芯或十六核单芯的处理器相同:在任何系统上,都能运行16个并发线程。当线程数量少于16个时,会有处理器处于空闲状态(除非系统同时需要运行其他应用,不过我们暂时忽略这种可能性)。另一方面,当多于16个线程在运行的时候(都没有阻塞或等待),应用将会浪费处理器的运算时间在线程间进行切换,如第1章所述。这种情况发生时,我们称其为超额认购(oversubscription)。

为了扩展应用线程的数量,与硬件所支持的并发线程数量一致,C++标准线程库提供了std::thread::hardware_concurrency()。使用这个函数就能知道在给定硬件上可以扩展的线程数量了。

需要谨慎使用std::thread::hardware_concurrency(),因为代码不会考虑有其他运行在系统上的线程(除非已经将系统信息进行共享)。最坏的情况就是,多线程同时调用std::thread::hardware_concurrency()函数来对线程数量进行扩展,这样将导致庞大的超额认购。std::async()就能避免这个问题,因为标准库会对所有的调用进行适当的安排。同样,谨慎的使用线程池也可以避免这个问题。

不过,即使你已经考虑到所有在应用中运行的线程,程序还要被同时运行的其他程序所影响。虽然,在单用户系统中,使用多个CPU密集型应用程序很罕见,但在某些领域,这种情况就很常见了。虽然系统能提供选择线程数量的机制,但这种机制已经超出C++标准的范围。这里的一种选择是使用与std::async()类似的工具,来为所有执行异步任务的线程的数量做考虑;另一种选择就是,限制每个应用使用的处理芯个数。我倒是希望,这种限制能反映到std::thread::hardware_concurrency()上面(不能保证)。如果你需要处理这种情况,可以看一下你所使用的系统说明,了解一下是否有相关选项可供使用。

理想算法可能会取决于问题规模与处理单元的比值。大规模并行系统中有很多的处理单元,算法可能就会同时执行很多操作,让应用更快的结束;这就要快于执行较少操作的平台,因为该平台上的每一个处理器只能执行很少的操作。

随着处理器数量的增加,另一个问题就会来影响性能:多个处理器尝试访问同一个数据。

8.2.2** 数据争用与乒乓缓存**

当两个线程并发的在不同处理器上执行,并且对同一数据进行读取,通常不会出现问题;因为数据将会拷贝到每个线程的缓存中,并且可以让两个处理器同时进行处理。不过,当有线程对数据进行修改的时候,这个修改需要更新到其他核芯的缓存中去,就要耗费一定的时间。根据线程的操作性质,以及使用到的内存序,这样的修改可能会让第二个处理器停下来,等待硬件内存更新缓存中的数据。即便是精确的时间取决于硬件的物理结构,不过根据CPU指令,这是一个特别特别慢的操作,相当于执行成百上千个独立指令。

思考下面简短的代码段:

std::atomic counter(0);
void processing_loop()
{
while(counter.fetch_add(1,std::memory_order_relaxed)<100000000)
{
do_something();
• 1

}

}

counter变量是全局的,所以任何线程都能调用processing_loop()去修改同一个变量。因此,当新增加的处理器时,counter变量必须要在缓存内做一份拷贝,再改变自己的值,或其他线程以发布的方式对缓存中的拷贝副本进行更新。即使用std::memory_order_relaxed,编译器不会为任何数据做同步操作,fetch_add是一个“读-改-写”操作,因此就要对最新的值进行检索。如果另一个线程在另一个处理器上执行同样的代码,counter的数据需要在两个处理器之间进行传递,那么这两个处理器的缓存中间就存有counter的最新值(当counter的值增加时)。如果do_something()足够短,或有很多处理器来对这段代码进行处理时,处理器将会互相等待;一个处理器准备更新这个值,另一个处理器正在修改这个值,所以该处理器就不得不等待第二个处理器更新完成,并且完成更新传递时,才能执行更新。这种情况被称为高竞争(high contention)。如果处理器很少需要互相等待,那么这种情况就是低竞争(low contention)。

在这个循环中,counter的数据将在每个缓存中传递若干次。这就叫做乒乓缓存(cache ping-pong),这种情况会对应用的性能有着重大的影响。当一个处理器因为等待缓存转移而停止运行时,这个处理器就不能做任何事情,所以对于整个应用来说,这就是一个坏消息。

你可能会想,这种情况不会发生在你身上;因为,你没有使用任何循环。你确定吗?那么互斥锁呢?如果你需要在循环中放置一个互斥量,那么你的代码就和之前从数据访问的差不多了。为了锁住互斥量,另一个线程必须将数据进行转移,就能弥补处理器的互斥性,并且对数据进行修改。当这个过程完成时,将会再次对互斥量进行修改,并对线程进行解锁,之后互斥数据将会传递到下一个需要互斥量的线程上去。转移时间,就是第二个线程等待第一个线程释放互斥量的时间:

std::mutex m;
my_data data;
void processing_loop_with_mutex()
{
while(true)
{
std::lock_guardstd::mutex lk(m);
if(done_processing(data)) break;

}

}

接下来看看最糟糕的部分:数据和互斥量已经准备好让多个线程进访问之后,当系统中的核心数和处理器数量增加时,很可能看到高竞争,以及一个处理器等待其他处理器的情况。如果在多线程情况下,能更快的对同样级别的数据进行处理,线程就会对数据和互斥量进行竞争。这里有很多这样的情况,很多线程会同时尝试对互斥量进行获取,或者同时访问变量,等等。

互斥量的竞争通常不同于原子操作的竞争,最简单的原因是,互斥量通常使用操作系统级别的序列化线程,而非处理器级别的。如果有足够的线程去执行任务,当有线程在等待互斥量时,操作系统会安排其他线程来执行任务,而处理器只会在其他线程运行在目标处理器上时,让该处理器停止工作。不过,对互斥量的竞争,将会影响这些线程的性能;毕竟,只能让一个线程在同一时间运行。

回顾第3章,一个很少更新的数据结构可以被一个“单作者,多读者”互斥量(详见3.3.2)。乒乓缓存效应可以抵消互斥所带来的收益(工作量不利时),因为所有线程访问数据(即使是读者线程)都会对互斥量进行修改。随着处理器对数据的访问次数增加,对于互斥量的竞争就会增加,并且持有互斥量的缓存行将会在核芯中进行转移,因此会增加不良的锁获取和释放次数。有一些方法可以改善这个问题,其本质就是让互斥量对多行缓存进行保护,不过这样的互斥量需要自己去实现。

如果乒乓缓存是一个糟糕的现象,那么该怎么避免它呢?在本章后面,答案会与提高并发潜能的指导意见相结合:减少两个线程对同一个内存位置的竞争。

虽然,要实现起来并不简单。即使给定内存位置被一个线程所访问,可能还是会有乒乓缓存的存在,是因为另一种叫做伪共享(false sharing)的效应。

8.2.3** 伪共享**

处理器缓存通常不会用来处理在单个存储位置,但其会用来处理称为缓存行(cache lines)的内存块。内存块通常大小为32或64字节,实际大小需要由正在使用着的处理器模型来决定。因为硬件缓存进处理缓存行大小的内存块,较小的数据项就在同一内存行的相邻内存位置上。有时,这样的设定还是挺不错:当线程访问的一组数据是在同一数据行中,对于应用的性能来说就要好于向多个缓存行进行传播。不过,当在同一缓存行存储的是无关数据,且需要被不同线程访问,这就会造成性能问题。

假设你有一个int类型的数组,并且有一组线程可以访问数组中的元素,且对数组的访问很频繁(包括更新)。通常int类型的大小要小于一个缓存行,同一个缓存行中可以存储多个数据项。因此,即使每个线程都能对数据中的成员进行访问,硬件缓存还是会产生乒乓缓存。每当线程访问0号数据项,并对其值进行更新时,缓存行的所有权就需要转移给执行该线程的处理器,这仅是为了让更新1号数据项的线程获取1号线程的所有权。缓存行是共享的(即使没有数据存在),因此使用伪共享来称呼这种方式。这个问题的解决办法就是对数据进行构造,让同一线程访问的数据项存在临近的内存中(就像是放在同一缓存行中),这样那些能被独立线程访问的数据将分布在相距很远的地方,并且可能是存储在不同的缓存行中。在本章接下来的内容中看到,这种思路对代码和数据设计的影响。

如果多线程访问同一内存行是一种糟糕的情况,那么在单线程下的内存布局将会如何带来哪些影响呢?

8.2.4** 如何让数据紧凑?**

伪共享发生的原因:某个线程所要访问的数据过于接近另一线程的数据,另一个是与数据布局相关的陷阱会直接影响单线程的性能。问题在于数据过于接近:当数据能被单线程访问时,那么数据就已经在内存中展开,就像是分布在不同的缓存行上。另一方面,当内存中有能被单线程访问紧凑的数据时,就如同数据分布在同一缓存行上。因此,当数据已传播,那么将会有更多的缓存行将会从处理器的缓存上加载数据,这会增加访问内存的延迟,以及降低数据的系能(与紧凑的数据存储地址相比较)。

同样的,如果数据已传播,在给定缓存行上就即包含于当前线程有关和无关的数据。在极端情况下,当有更多的数据存在于缓存中,你会对数据投以更多的关注,而非这些数据去做了什么。这就会浪费宝贵的缓存空间,增加处理器缓存缺失的情况,即使这个数据项曾经在缓存中存在过,还需要从主存中添加对应数据项到缓存中,因为在缓存中其位置已经被其他数据所占有。

现在,对于单线程代码来说就很关键了,何至于此呢?原因就是任务切换(task switching)。如果系统中的线程数量要比核芯多,每个核上都要运行多个线程。这就会增加缓存的压力,为了避免伪共享,努力让不同线程访问不同缓存行。因此,当处理器切换线程的时候,就要对不同内存行上的数据进行重新加载(当不同线程使用的数据跨越了多个缓存行时),而非对缓存中的数据保持原样(当线程中的数据都在同一缓存行时)。

如果线程数量多于内核或处理器数量,操作系统可能也会选择将一个线程安排给这个核芯一段时间,之后再安排给另一个核芯一段时间。因此就需要将缓存行从一个内核上,转移到另一个内核上;这样的话,就需要转移很多缓存行,也就意味着要耗费很多时间。虽然,操作系统通常避免这样的情况发生,不过当其发生的时候,对性能就会有很大的影响。

当有超级多的线程准备运行时(非等待状态),任务切换问题就会频繁发生。这个问题我们之前也接触过:超额认购(oversubscription)。

8.2.5** 超额认购和频繁的任务切换**

多线程系统中,通常线程的数量要多于处理的数量。不过,线程经常会花费时间来等待外部I/O完成,或被互斥量阻塞,或等待条件变量,等等;所以等待不是问题。应用使用额外的线程来完成有用的工作,而非让线程在处理器处以闲置状态时继续等待。

这也并非长久之计,如果有很多额外线程,就会有很多线程准备执行,而且数量远远大于可用处理器的数量,不过操作系统就会忙于在任务间切换,以确保每个任务都有时间运行。如第1章所见,这将增加切换任务的时间开销,和缓存问题造成同一结果。当无限制的产生新线程,超额认购就会加剧,如第4章的递归快速排序那样;或者在通过任务类型对任务进行划分的时候,线程数量大于处理器数量,这里对性能影响的主要来源是CPU的能力,而非I/O。

如果只是简单的通过数据划分生成多个线程,那可以限定工作线程的数量,如8.1.2节中那样。如果超额认购是对工作的天然划分而产生,那么不同的划分方式对这种问题就没有太多益处了。之前的情况是,需要选择一个合适的划分方案,可能需要对目标平台有着更加详细的了解,不过这也只限于性能已经无法接受,或是某种划分方式已经无法提高性能的时候。

其他因素也会影响多线程代码的性能。即使CPU类型和时钟周期相同,乒乓缓存的开销可以让程序在两个单核处理器和在一个双核处理器上,产生巨大的性能差,不过这只是那些对性能影响可见的因素。接下来,让我们看一下这些因素如何影响代码与数据结构的设计。

8.3为多线程性能设计数据结构****

8.1节中,我们看到了各种划分方法;并且在8.2节,了解了对性能影响的各种因素。如何在设计数据结构的时候,使用这些信息提高多线程代码的性能?这里的问题与第6、7章中的问题不同,之前是关于如何设计能够安全、并发访问的数据结构。在8.2节中,单线程中使用的数据布局就会对性能产生巨大冲击(即使数据并未与其他线程进行共享)。

关键的是,当为多线程性能而设计数据结构的时候,需要考虑竞争(contention),伪共享(false sharing)和数据距离(data proximity)。这三个因素对于性能都有着重大的影响,并且你通常可以改善的是数据布局,或者将赋予其他线程的数据元素进行修改。首先,让我们来看一个轻松方案:线程间划分数组元素。

8.3.1** 为复杂操作划分数组元素**

假设你有一些偏数学计算任务,比如,需要将两个很大的矩阵进行相乘。对于矩阵相乘来说,将第一个矩阵中的首行每个元素和第二个矩阵中首列每个元素相乘后,再相加,从而产生新矩阵中左上角的第一个元素。然后,第二行和第一列,产生新矩阵第一列上的第二个结果,第二行和第二列,产生新矩阵中第二列的第一个结果,以此类推。如图8.3所示,高亮展示的就是在新矩阵中第二行-第三列中的元素产生的过程。

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-sUiOUnXT-1657616056594)(file:///C:/Users/sunx/AppData/Local/Temp/msohtmlclip1/01/clip_image006.gif)]

图8.3 矩阵相乘

现在,让我们假设两个矩阵都有上千行和上千列,为了使用多线程来优化矩阵乘法。通常,非稀疏矩阵可以用一个大数组来代表,也就是第二行的元素紧随着第一行的,以此类推。为了完成矩阵乘法,这里就需要三个大数组。为了优化性能,你需要仔细考虑数据访问的模式,特别是向第三个数组中写入的方式。

线程间划分工作是有很多种方式的。假设矩阵的行或列数量大于处理器的数量,可以让每个线程计算出结果矩阵列上的元素,或是行上的元素,亦或计算一个子矩阵。

回顾一下8.2.3和8.2.4节,对于一个数组来说,访问连续的元素是最好的方式,因为这将会减少缓存的使用,并且降低伪共享的概率。如果要让每个线程处理几行,线程需要读取第一个矩阵中的每一个元素,并且读取第二个矩阵上的相关行上的数据,不过这里只需要对列的值进行写入。给定的两个矩阵是以行连续的方式存储,这就意味着当你访问第一个矩阵的第一行的前N个元素,然后是第二行的前N个元素,以此类推(N是列的数量)。其他线程会访问每行的的其他元素;很明显的,应该访问相邻的列,所以从行上读取的N个元素也是连续的,这将最大程度的降低伪共享的几率。当然,如果空间已经被N个元素所占有,且N个元素也就是每个缓存行上具体的存储元素数量,就会让伪共享的情况消失,因为线程将会对独立缓存行上的数据进行操作。

另一方面,当每个线程处理一组行,就需要读取第二个矩阵上的每一个数据,还要读取第一个矩阵中的相关行上的值,不过这里只需要对行上的值进行写入。因为矩阵是以行连续的方式存储,那么现在可以以N行的方式访问所有的元素。如果再次选择相邻行,这就意味着线程现在只能写入N行,这里就有不能被其他线程所访问的连续内存块。那么让线程对每组列进行处理就是一个改进,因为伪共享只可能有在一个内存块的最后几个元素和下一个元素的开始几个上发生,不过具体的时间还要根据目标架构来决定。

第三个选择——将矩阵分成小矩阵块?这可以看作先对列进行划分,再对行进行划分。因此,划分列的时候,同样有伪共享的问题存在。如果你可以选择内存块所拥有行的数量,就可以有效的避免伪共享;将大矩阵划分为小块,对于读取来说是有好处的:就不再需要读取整个源矩阵了。这里,只需要读取目标矩形里面相关行列的值就可以了。具体的来看,考虑1,000行和1,000列的两个矩阵相乘。就会有1百万个元素。如果有100个处理器,这样就可以每次处理10行的数据,也就是10,000个元素。不过,为了计算着10,000个元素,就需要对第二个矩阵中的全部内容进行访问(1百万个元素),再加上10,000个相关行(第一个矩阵)上的元素,大概就要访问1,010,000个元素。另外,硬件能处理100x100的数据块(总共10,000个元素),这就需要对第一个矩阵中的100行进行访问(100x1,000=100,000个元素),还有第二个矩阵中的100列(另外100,000个)。这才只有200,000个元素,就需要五轮读取才能完成。如果这里读取的元素少一些,缓存缺失的情况就会少一些,对于性能来说就好一些。

因此,将矩阵分成小块或正方形的块,要比使用单线程来处理少量的列好的多。当然,可以根据源矩阵的大小和处理器的数量,在运行时对块的大小进行调整。和之前一样,当性能是很重要的指标,就需要对目标架构上的各项指标进行测量。

如果不做矩阵乘法,该如何对上面提到的方案进行应用呢?同样的原理可以应用于任何情况,这种情况就是有很大的数据块需要在线程间进行划分;仔细观察所有数据访问的各个方面,以及确定性能问题产生的原因。各种领域中,出现问题的情况都很相似:改变划分方式就能够提高性能,而不需要对基本算法进行任何修改。

OK,我们已经了解了访问数组是如何对性能产生影响的。那么其他类型的数据结构呢?

8.3.2** 其他数据结构中的数据访问模式**

根本上讲,同样的考虑适用于想要优化数据结构的数据访问模式,就像优化对数组的访问:

· 尝试调整数据在线程间的分布,就能让同一线程中的数据紧密联系在一起。

· 尝试减少线程上所需的数据量。

· 尝试让不同线程访问不同的存储位置,以避免伪共享。

当然,应用于其他数据结构上会比较麻烦。例如,对二叉树划分就要比其他结构困难,有用与没用要取决于树的平衡性,以及需要划分的节点数量。同样,树的的属性决定了其节点会动态的进行分配,并且在不同的地方进行释放。

现在,节点在不同的地方释放倒不是一个严重的问题,不过这就意味着处理器需要在缓存中存储很多东西,这实际上是有好处的。当多线程需要旋转树的时候,就需要对树中的所有节点进行访问,不过当树中的节点只包括指向实际值的指针时,处理器只能从主存中对数据进行加载。如果数据正在被访问线程所修改,这就能避免节点数据,以及树数据结构间的伪共享。

这里就和用一个互斥量来保护数据类似了。假设你有一个简单的类,包含一些数据项和一个用于保护数据的互斥量(在多线程环境下)。如果互斥量和数据项在内存中很接近,对与一个需要获取互斥量的线程来说是很理想的情况;需要的数据可能早已存入处理器的缓存中了,因为在之前为了对互斥量进行修改,已经加载了需要的数据。不过,这还有一个缺点:当其他线程尝试锁住互斥量时(第一个线程还没有是释放),线程就能对对应的数据项进行访问。互斥锁是当做一个“读-改-写”原子操作实现的,对于相同位置的操作都需要先获取互斥量,如果互斥量已锁,那就会调用系统内核。这种“读-改-写”操作,可能会让数据存储在缓存中,让线程获取的互斥量变得毫无作用。从目前互斥量的发展来看,这并不是个问题;线程不会直到互斥量解锁,才接触互斥量。不过,当互斥量共享同一缓存行时,其中存储的是线程已使用的数据,这时拥有互斥量的线程将会遭受到性能打击,因为其他线程也在尝试锁住互斥量。

一种测试伪共享问题的方法是:对大量的数据块填充数据,让不同线程并发的进行访问。比如,你可以使用:

struct protected_data
{
std::mutex m;
char padding[65536]; // 65536字节已经超过一个缓存行的数量级
my_data data_to_protect;
};
用来测试互斥量竞争或
struct my_data
{
data_item1 d1;
data_item2 d2;
char padding[65536];
};
my_data some_array[256];

用来测试数组数据中的伪共享。如果这样能够提高性能,你就能知道伪共享在这里的确存在。

当然,在设计并发的时候有更多的数据访问模式需要考虑,现在让我们一起来看一些附加的注意事项。

8.4设计并发代码的注意事项****

目前为止,在本章中我们已经看到了很多线程间划分工作的方法,影响性能的因素,以及这些因素是如何影响你选择数据访问模式和数据结构的。虽然,已经有了很多设计并发代码的内容。你还需要考虑很多事情,比如异常安全和可扩展性。随着系统中核数的增加,性能越来越高(无论是在减少执行时间,还是增加吞吐率),这样的代码称为“可扩展”代码。理想状态下,性能随着核数的增加线性增长,也就是当系统有100个处理器时,其性能是系统只有1核时的100倍。

虽然,非扩展性代码依旧可以正常工作——单线程应用就无法扩展——例如,异常安全是一个正确性问题。如果你的代码不是异常安全的,最终会破坏不变量,或是造成条件竞争,亦或是你的应用意外终止,因为某个操作会抛出异常。有了这个想法,我们就率先来看一下异常安全的问题。

8.4.1** 并行算法中的异常安全**

异常安全是衡量C++代码一个很重要的指标,并发代码也不例外。实际上,相较于串行算法,并行算法常会格外要求注意异常问题。当一个操作在串行算法中抛出一个异常,算法只需要考虑对其本身进行处理,以避免资源泄露和损坏不变量;这里可以允许异常传递给调用者,由调用者对异常进行处理。通过对比,在并行算法中很多操作要运行在独立的线程上。在这种情况下,异常就不再允许被传播,因为这将会使调用堆栈出现问题。如果一个函数在创建一个新线程后带着异常退出,那么这个应用将会终止。

作为一个具体的例子,让我们回顾一下清单2.8中的parallel_accumulate函数:

清单8.2 std::accumulate的原始并行版本(源于清单2.8)

template
struct accumulate_block
{
void operator()(Iteratorfirst,Iterator last,T& result)
{
result=std::accumulate(first,last,result); // 1
}
};
template
T parallel_accumulate(Iterator first,Iterator last,T init)
{
unsigned long constlength=std::distance(first,last); // 2
if(!length)
return init;
unsigned long constmin_per_thread=25;
unsigned long constmax_threads=(length+min_per_thread-1)/min_per_thread;
unsigned long consthardware_threads=std::thread::hardware_concurrency();
unsigned long constnum_threads=std::min(hardware_threads!=0?hardware_threads:2,max_threads);
unsigned long constblock_size=length/num_threads;
std::vectorresults(num_threads); //** 3**
std::vectorstd::threadthreads(num_threads-1); // 4
Iterator block_start=first; // 5
for(unsigned longi=0;i<(num_threads-1);++i)
{
Iteratorblock_end=block_start;  // **6**

std::advance(block_end,block_size);

threads[i]=std::thread(accumulate_block<Iterator,T>(),block_start,block_end,std::ref(results[i]));//**7**
block_start=block_end;  // **8**
}
accumulate_block()(block_start,last,results[num_threads-1]); // 9
std::for_each(threads.begin(),threads.end(),std::mem_fn(&std::thread::join));
returnstd::accumulate(results.begin(),results.end(),init); // 10
}

现在让我们来看一下异常要在哪抛出:基本上就是在调用函数的地方抛出异常,或在用户定义类型上执行某个操作时可能抛出异常。

首先,需要调用distance②,其会对用户定义的迭代器类型进行操作。因为,这时还没有做任何事情,所以对于调用线程来说,所有事情都没问题。接下来,就需要分配results③和threads④。再后,调用线程依旧没有做任何事情,或产生新的线程,所以到这里也是没有问题的。当然,如果在构造threads抛出异常,那么对已经分配的results将会被清理,析构函数会帮你打理好一切。

跳过block_start⑤的初始化(因为也是安全的),来到了产生新线程的循环⑥⑦⑧。当在⑦处创建了第一个线程,如果再抛出异常,就会出问题的;对于新的std::thread对象将会销毁,程序将调用std::terminate来中断程序的运行。使用std::terminate的地方,可不是什么好地方。

accumulate_block⑨的调用就可能抛出异常,就会产生和上面类似的结果;线程对象将会被销毁,并且调用std::terminate。另一方面,最终调用std::accumulate⑩可能会抛出异常,不过处理起来没什么难度,因为所有的线程在这里已经汇聚回主线程了。

上面只是对于主线程来说的,不过还有很多地方会抛出异常:对于调用accumulate_block的新线程来说就会抛出异常①。没有任何catch块,所以这个异常不会被处理,并且当异常发生的时候会调用std::terminater()来终止应用的运行。

也许这里的异常问题并不明显,不过这段代码是非异常安全的。

添加异常安全

好吧,我们已经确定所有抛出异常的地方了,并且知道异常所带来的恶性后果。能为其做些什么呢?就让我们来解决一下在新线程上的异常问题。

在第4章时已经使用过工具来做这件事。如果你仔细的了解过新线程用来完成什么样的工作,要返回一个计算的结果的同时,允许代码产生异常。这可以将std::packaged_task和std::future相结合,来解决这个问题。如果使用std::packaged_task重新构造代码,代码可能会是如下模样。

清单8.3 使用std::packaged_task的并行std::accumulate

template
struct accumulate_block
{
T operator()(Iteratorfirst,Iterator last) // 1
{
returnstd::accumulate(first,last,T());  // **2**
}
};
template
T parallel_accumulate(Iterator first,Iterator last,T init)
{
unsigned long constlength=std::distance(first,last);
if(!length)
return init;
unsigned long constmin_per_thread=25;
unsigned long constmax_threads=(length+min_per_thread-1)/min_per_thread;
unsigned long consthardware_threads=std::thread::hardware_concurrency();
unsigned long constnum_threads=std::min(hardware_threads!=0?hardware_threads:2,max_threads);
unsigned long constblock_size=length/num_threads;
std::vector futures(num_threads-1); // 3
std::vectorstd::threadthreads(num_threads-1);
Iterator block_start=first;
for(unsigned longi=0;i<(num_threads-1);++i)
{
Iterator block_end=block_start;
std::advance(block_end,block_size);
std::packaged_task task(accumulate_block());//4
futures[i]=task.get_future(); // 5
threads[i]=std::thread(std::move(task),block_start,block_end); //** 6**
block_start=block_end;
}
Tlast_result=accumulate_block()(block_start,last); //** 7**
std::for_each(threads.begin(),threads.end(),std::mem_fn(&std::thread::join));
T result=init; // 8
for(unsigned longi=0;i<(num_threads-1);++i)
{
result+=futures[i].get();  //** 9**
}
result += last_result; // 10
return result;
}

第一个修改就是调用accumulate_block的操作现在就是直接将结果返回,而非使用引用将结果存储在某个地方①。使用std::packaged_task和std::future是线程安全的,所以你可以使用它们来对结果进行转移。当调用std::accumulate②时,需要你显示传入T的默认构造函数,而非复用result的值,不过这只是一个小改动。

下一个改动就是,不用向量来存储结果,而使用futures向量为每个新生线程存储std::future③。在新线程生成循环中,首先要为accumulate_block创建一个任务④。std::packaged_task声明,需要操作的两个Iterators和一个想要获取的T。然后,从任务中获取future⑤,再将需要处理的数据块的开始和结束信息传入⑥,让新线程去执行这个任务。当任务执行时,future将会获取对应的结果,以及任何抛出的异常。

使用future,就不能获得到一组结果数组,所以需要将最终数据块的结果赋给一个变量进行保存⑦,而非对一个数组进行填槽。同样,因为需要从future中获取结果,使用简单的for循环,就要比使用std::accumulate好的多;循环从提供的初始值开始⑧,并且将每个future上的值进行累加⑨。如果相关任务抛出一个异常,那么异常就会被future捕捉到,并且使用get()的时候获取数据时,这个异常会再次抛出。最后,在返回结果给调用者之前,将最后一个数据块上的结果添加入结果中⑩。

这样,一个问题就已经解决:在工作线程上抛出的异常,可以在主线程上抛出。如果不止一个工作线程抛出异常,那么只有一个能在主线程中抛出,不过这不会有产生太大的问题。如果这个问题很重要,你可以使用类似std::nested_exception来对所有抛出的异常进行捕捉。

剩下的问题就是,当生成第一个新线程和当所有线程都汇入主线程时,抛出异常;这样会让线程产生泄露。最简单的方法就是捕获所有抛出的线程,汇入的线程依旧是joinable()的,并且会再次抛出异常:

try
{
for(unsigned longi=0;i<(num_threads-1);++i)
{
// ... as before
}
Tlast_result=accumulate_block()(block_start,last);
std::for_each(threads.begin(),threads.end(),
std::mem_fn(&std::thread::join));
}
catch(…)
{
for(unsigned longi=0;i<(num_thread-1);++i)
{
if(threads[i].joinable())
thread[i].join();
}
throw;
}

现在好了,无论线程如何离开这段代码,所有线程都可以被汇入。不过,try-catch很不美观,并且这里有重复代码。可以将“正常”控制流上的线程在catch块上执行的线程进行汇入。重复代码是没有必要的,因为这就意味着更多的地方需要改变。不过,现在让我们来提取一个对象的析构函数;毕竟,析构函数是C++中处理资源的惯用方式。看一下你的类:

class join_threads
{
std::vectorstd::thread& threads;
public:
explicitjoin_threads(std::vectorstd::thread& threads_):threads(threads_){}
~join_threads()
{
for(unsigned longi=0;i<threads.size();++i)
{
  if(threads[i].joinable())
    threads[i].join();
}

}

};

这个类和在清单2.3中看到的thread_guard类很相似,除了使用向量的方式来扩展线程量。用这个类简化后的代码如下所示:

清单8.4 异常安全版std::accumulate

template
T parallel_accumulate(Iterator first,Iterator last,T init)
{
unsigned long constlength=std::distance(first,last);
if(!length)
return init;
unsigned long constmin_per_thread=25;
unsigned long constmax_threads=(length+min_per_thread-1)/min_per_thread;
unsigned long consthardware_threads=std::thread::hardware_concurrency();
unsigned long constnum_threads=std::min(hardware_threads!=0?hardware_threads:2,max_threads);
unsigned long constblock_size=length/num_threads;
std::vector futures(num_threads-1);
std::vectorstd::threadthreads(num_threads-1);
join_threads joiner(threads); //** 1**
Iterator block_start=first;
for(unsigned longi=0;i<(num_threads-1);++i)
{
Iterator block_end=block_start;
std::advance(block_end,block_size);

std::packaged_tasktask(accumulate_block());

futures[i]=task.get_future();

threads[i]=std::thread(std::move(task),block_start,block_end);

block_start=block_end;
}
T last_result=accumulate_block()(block_start,last);
T result=init;
for(unsigned longi=0;i<(num_threads-1);++i)
{
result+=futures[i].get();  //** 2**
}
result += last_result;
return result;
}

当创建了线程容器,就对新类型创建了一个实例①,可让退出线程进行汇入。然后,可以再显式的汇入循环中将线程删除,在原理上来说是安全的:因为线程,无论怎么样退出,都需要汇入主线程。注意这里对futures[i].get()②的调用,将会阻塞线程,直到结果准备就绪,所以这里不需要显式的将线程进行汇入。和清单8.2中的原始代码不同:原始代码中,你需要将线程汇入,以确保results向量被正确填充。不仅需要异常安全的代码,还需要较短的函数实现,因为这里已经将汇入部分的代码放到新(可复用)类型中去了。

**std::async()**的异常安全

现在,你已经了解了,当需要显式管理线程的时候,需要代码是异常安全的。那现在让我们来看一下使用std::async()是怎么样完成异常安全的。在本例中,标准库对线程进行了较好的管理,并且当“期望”处以就绪状态的时候,就能生成一个新的线程。对于异常安全,还需要注意一件事,如果在没有等待的情况下对“期望”实例进行销毁,析构函数会等待对应线程执行完毕后才执行。这就能桥面的必过线程泄露的问题,因为线程还在执行,且持有数据的引用。下面的代码将展示使用std::async()完成异常安全的实现。

清单8.5 异常安全并行版std::accumulate——使用std::async()

template
T parallel_accumulate(Iterator first,Iterator last,T init)
{
unsigned long constlength=std::distance(first,last); // 1
unsigned long constmax_chunk_size=25;
if(length<=max_chunk_size)
{
returnstd::accumulate(first,last,init);  //** 2**
}
else
{
Iterator mid_point=first;

std::advance(mid_point,length/2); //** 3**

std::future<T>first_half_result=std::async(parallel_accumulate<Iterator,T>,first,mid_point,init);// **4**
Tsecond_half_result=parallel_accumulate(mid_point,last,T());  // **5**
returnfirst_half_result.get()+second_half_result; // **6**

}

}

这个版本对数据进行递归划分,而非在预计算后对数据进行分块;因此,这个版本要比之前的版本简单很多,并且这个版本也是异常安全的。和之前一样,一开始要确定序列的长度①,如果其长度小于数据块包含数据的最大数量,那么可以直接调用std::accumulate②。如果元素的数量超出了数据块包含数据的最大数量,那么就需要找到数量中点③,将这个数据块分成两部分,然后再生成一个异步任务对另一半数据进行处理④。第二半的数据是通过直接的递归调用来处理的⑤,之后将两个块的结果加和到一起⑥。标准库能保证std::async的调用能够充分的利用硬件线程,并且不会产生线程的超额认购,一些“异步”调用是在调用get()⑥后同步执行的。

优雅的地方,不仅在于利用硬件并发的优势,并且还能保证异常安全。如果有异常在递归调用⑤中抛出,通过调用std::async④所产生的“期望”,将会在异常传播时被销毁。这就需要依次等待异步任务的完成,因此也能避免悬空线程的出现。另外,当异步任务抛出异常,且被future所捕获,在对get()⑥调用的时候,future中存储的异常,会再次抛出。

除此之外,在设计并发代码的时候还要考虑哪些其他因素?让我们来看一下扩展性 (scalability)。随着系统中核数的增加,应用性能如何提升?

8.4.2** 可扩展性和Amdahl定律**

扩展性代表了应用利用系统中处理器执行任务的能力。一种极端就是将应用写死为单线程运行,这种应用就是完全不可扩展的;即使添加了100个处理器到你的系统中,应用的性能都不会有任何改变。另一种就是像SETI@Home[3]项目一样,让应用使用系统中成千上万的处理器(以个人电脑的形式加入网络的用户)成为可能。

对于任意的多线程程序,在程序运行的时候,运行的工作线程数量会有所不同。应用初始阶段只有一个线程,之后会在这个线程上衍生出新的线程。理想状态:每个线程都做着有用的工作,不过这种情况几乎是不可能发生的。线程通常会花时间进行互相等待,或等待I/O操作的完成。

一种简化的方式就是就是将程序划分成“串行”部分和“并行”部分。串行部分:只能由单线程执行一些工作的地方。并行部分:可以让所有可用的处理器一起工作的部分。当在多处理系统上运行你的应用时,“并行”部分理论上会完成的相当快,因为其工作被划分为多份,放在不同的处理器上执行。“串行”部分则不同,还是只能一个处理器执行所有工作。这样(简化)假设下,就可以对随着处理数量的增加,估计一下性能的增益:当程序“串行”部分的时间用fs来表示,那么性能增益§就可以通过处理器数量(N)进行估计:

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-n82596J8-1657616056596)(file:///C:/Users/sunx/AppData/Local/Temp/msohtmlclip1/01/clip_image008.gif)]

这就是Amdahl定律,在讨论并发程序性能的时候都会引用到的公式。如果每行代码都能并行化,串行部分就为0,那么性能增益就为N。或者,当串行部分为1/3时,当处理器数量无限增长,你都无法获得超过3的性能增益。

Amdahl定律明确了,对代码最大化并发可以保证所有处理器都能用来做有用的工作。如果将“串行”部分的减小,或者减少线程的等待,就可以在多处理器的系统中获取更多的性能收益。或者,当能提供更多的数据让系统进行处理,并且让并行部分做最重要的工作,就可以减少“串行”部分,以获取更高的性能增益。

扩展性:当有更多的处理器加入时,减少一个动作的执行时间,或在给定时间内做更多工作。有时这两个指标是等价的(如果处理器的速度相当快,那么就可以处理更多的数据),有时不是。选择线程间的工作划分的技术前,辨别哪些方面是能否扩展的就十分的重要。

本节开始已经提到,线程并非任何时候都做的是有用的工作。有时,它们会等待其他线程,或者等待I/O完成,亦或是等待其他的事情。如果线程在等待的时候,系统中还有必要的任务需要完成时,就可以将等待“隐藏”起来。

8.4.3** 使用多线程隐藏延迟**

之前讨论了很多有关多线程性能的话题。现在假设,线程在一个处理器上运行时不会偷懒,并且做的工作都很有用。当然,这只是假设;在实际应用中,线程会经常因为等待某些事情而阻塞。

不论等待的理由是什么,如果有和系统中物理单元相同数量的线程,那么线程阻塞就意味着在等待CPU时间片。处理器将会在阻塞的时间内运行另一个线程,而不是什么事情都不做。因此,当知道一些线程需要像这样耗费相当一段时间进行等待时,可以利用CPU的空闲时间去运行一个或多个线程。

试想一个病毒扫描程序,使用流水线对线程间的工作进行划分。第一个线程对文件系统中的文件进行检查,并将它们放入一个队列中。同时,另一个线程从队列中获取文件名,加载文件,之后对它们进行病毒扫描。线程对文件系统中的文件进行扫描就会受到I/O操作的限制,所以可以通过执行额外的扫描线程,充分利用CPU的“空闲”时间。这时还需要一个文件搜索线程,以及足够多的扫描线程。当扫描线程为了扫描文件,还要从磁盘上读取到重要部分的文件时,就能体会到多扫描线程的意义所在了。不过,在某些时候线程也过于多,系统将会因为越来越多的任务切换而降低效率,就像8.2.5节描述的那样。

同之前一样,这也是一种优化,对修改(线程数量)前后性能的测量很重要;优化的线程数量高度依赖要完成工作的先天属性,以及等待时间所占的百分比。

应用可能不用额外的线程,而使用CPU的空闲时间。例如,如果一个线程因为I/O操作被阻塞,这个线程可能会使用异步I/O(如果可以用的话),当I/O操作在后台执行完成后,线程就可以做其他有用的工作了。在其他情况下,当一个线程等待其他线程去执行一个操作时,比起阻塞,不如让阻塞线程自己来完成这个操作,就像在第7章中看到的无锁队列那样。在一个极端的例子中,当一个线程等待一个任务完成,并且这个任务还没有被其他任何线程所执行时,等待线程就可以执行这个任务,或执行另一个不完整的任务。在清单8.1中看到这样的例子,排序函数持续的尝试对数据进行排序,即使那些数据已经不需要排序了。

比起添加线程数量让其对处理器进行充分利用,有时也要在增加线程的同时,确保外部事件被及时的处理,以提高系统的响应能力。

8.4.4** 使用并发提高响应能力**

很多流行的图形化用户接口框架都是事件驱动型(event driven);对图形化接口进行操作是通过按下按键或移动鼠标进行,将产生一系列需要应用处理的事件或信息。系统也可能产生信息或事件。为了确定所有事件和信息都能被正确的处理,应用通常会有一个事件循环,就像下面的代码:

while(true)
{
event_data event=get_event();
if(event.type==quit)
break;
process(event);
}

显然,API中的细节可能不同,不过结构通常是一样的:等待一个事件,对其做必要的处理,之后等待下一个事件。如果是一个单线程应用,那么就会让长期任务很难书写,如同在8.1.3节中所描述。为了确保用户输入被及时的处理,无论应时在做些什么,get_event()和process()必须以合理的频率调用。这就意味着任务要被周期性的悬挂,并且返回到事件循环中,或get_event()/process()必须在一个合适地方进行调用。每个选项的复杂程度取决于任务的实现方式。

通过使用并发分离关注,可以将一个很长的任务交给一个全新的线程,并且留下一个专用的GUI线程来处理这些事件。线程可以通过简单的机制进行通讯,而不是将事件处理代码和任务代码混在一起。下面的例子就是展示了这样的分离。

清单8.6 将GUI线程和任务线程进行分离

std::thread task_thread;
std::atomic task_cancelled(false);
void gui_thread()
{
while(true)
{
event_data event=get_event();
if(event.type==quit)
  break;
process(event);
}
}
void task()
{
while(!task_complete() &&!task_cancelled)
{
do_next_operation();
}
if(task_cancelled)
{
perform_cleanup();
}
else
{
post_gui_event(task_complete);
}
}
void process(event_data const& event)
{
switch(event.type)
{
case start_task:
task_cancelled=false;
task_thread=std::thread(task);
break;

case stop_task:

task_cancelled=true;
task_thread.join();
break;

case task_complete:

task_thread.join();
display_results();
break;

default:

//...

}

}

通过这种方式对关注进行分离,用户线程将总能及时的对事件进行响应,及时完成任务需要花费很长事件。使用应用的时候,响应事件通常也是影响用户体验的重要一点;无论是特定操作被不恰当的执行(无论是什么操作),应用都会被完全锁住。通过使用专门的事件处理线程,GUI就能处理GUI指定的信息了(比如对于调整窗口的大小或颜色),而不需要中断处理器,进行耗时的处理;同时,还能向长期任务传递相关的信息。

现在,你可以将本章中在设计并发代码时要考虑的所有问题进行一下回顾。作为一个整体,它们都很具有代表性,不过当你熟练的使用“多线程编程”时,考虑其中的很多问题将变成你习惯。如果你是初学者,我希望这些例子能让你明白,这些问题是如何影响多线程代码的。

8.5在实践中设计并发代码****

当为一个特殊的任务设计并发代码时,需要根据任务本身来考虑之前所提到的问题。为了展示以上的注意事项是如何应用的,我们将看一下在C++标准库中三个标准函数的并行实现。当你遇到问题时,这里的例子可以作为很好的参照。在有较大的并发任务进行辅助下,我们也将实现一些函数。

我主要演示这些实现使用的技术,不过可能这些技术并不是最先进的;更多优秀的实现可以更好的利用硬件并发,不过这些实现可能需要到与并行算法相关的学术文献,或者是多线程的专家库中(比如:Inter的TBB[4])才能看到。

并行版的std::for_each可以看作为能最直观体现并行概念,就让我们从并行版的std::for_each开始吧!

8.5.1** **并行实现:std::for_each

std::for_each的原理很简单:其对某个范围中的元素,依次调用用户提供的函数。并行和串行调用的最大区别就是函数的调用顺序。std::for_each是对范围中的第一个元素调用用户函数,接着是第二个,以此类推,而在并行实现中对于每个元素的处理顺序就不能保证了,并且它们可能(我们希望如此)被并发的处理。

为了实现这个函数的并行版本,需要对每个线程上处理的元素进行划分。你事先知道元素数量,所以可以处理前对数据进行划分(详见8.1.1节)。假设只有并行任务运行,就可以使用std::thread::hardware_concurrency()来决定线程的数量。同样,这些元素都能被独立的处理,所以可以使用连续的数据块来避免伪共享(详见8.2.3节)。

这里的算法有点类似于并行版的std::accumulate(详见8.4.1节),不过比起计算每一个元素的加和,这里对每个元素仅仅使用了一个指定功能的函数。因为不需要返回结果,可以假设这可能会对简化代码,不过想要将异常传递给调用者,就需要使用std::packaged_task和std::future机制对线程中的异常进行转移。这里展示一个样本实现。

清单8.7 并行版std::for_each

template
void parallel_for_each(Iterator first,Iterator last,Func f)
{
unsigned long const length=std::distance(first,last);
if(!length)
return;
unsigned long constmin_per_thread=25;
unsigned long constmax_threads=(length+min_per_thread-1)/min_per_thread;
unsigned long consthardware_threads=std::thread::hardware_concurrency();
unsigned long constnum_threads=std::min(hardware_threads!=0?hardware_threads:2,max_threads);
unsigned long constblock_size=length/num_threads;
std::vector futures(num_threads-1); // 1
std::vectorstd::threadthreads(num_threads-1);
join_threads joiner(threads);
Iterator block_start=first;
for(unsigned longi=0;i<(num_threads-1);++i)
{
Iterator block_end=block_start;
std::advance(block_end,block_size);
std::packaged_task task( // 2
[=]()
  {
   std::for_each(block_start,block_end,f);
  });
futures[i]=task.get_future();

threads[i]=std::thread(std::move(task)); //** 3**

block_start=block_end;
}
std::for_each(block_start,last,f);
for(unsigned longi=0;i<(num_threads-1);++i)
{
futures[i].get();  // **4**

}

}

代码结构与清单8.4的差不多。最重要的不同在于futures向量对std::future类型①变量进行存储,因为工作线程不会返回值,并且简单的lambda函数会对block_start到block_end上的任务②执行f函数。这是为了避免传入线程的构造函数③。当工作线程不需要返回一个值时,调用futures[i].get()④只是提供检索工作线程异常的方法;如果不想把异常传递出去,就可以省略这一步。

实现并行std::accumulate的时候,使用std::async会简化代码;同样,parallel_for_each也可以使用std::async。实现如下所示。

清单8.8 使用std::async实现std::for_each

template
void parallel_for_each(Iterator first,Iterator last,Func f)
{
unsigned long const length=std::distance(first,last);
if(!length)
return;
unsigned long constmin_per_thread=25;
if(length<(2*min_per_thread))
{
std::for_each(first,last,f); // 1
}
else
{
Iterator constmid_point=first+length/2;
std::future<void> first_half=std::async(&parallel_for_each<Iterator,Func>,first,mid_point,f);  // **2**

parallel_for_each(mid_point,last,f); // 3

first_half.get();  // **4**

}

}

和基于std::async的parallel_accumulate(清单8.5)一样,是在运行时对数据进行迭代划分的,而非在执行前划分好,这是因为你不知道你的库需要使用多少个线程。像之前一样,当你将每一级的数据分成两部分,异步执行另外一部分②,剩下的部分就不能再进行划分了,所以直接运行这一部分③;这样就可以直接对std::for_each①进行使用了。这里再次使用std::async和std::future的get()成员函数④来提供对异常的传播。

回到算法,函数需要对每一个元素执行同样的操作(这样的操作有很多种,初学者可能会想到std::count和std::replace),一个稍微复杂一些的例子就是使用std::find。

8.5.2** **并行实现:std::find

接下来是std::find算法,因为这是一种不需要对数据元素做任何处理的算法。比如,当第一个元素就满足查找标准,那就没有必要对其他元素进行搜索了。将会看到,算法属性对于性能具有很大的影响,并且对并行实现的设计有着直接的影响。这个算法是一个很特别的例子,数据访问模式都会对代码的设计产生影响(详见8.3.2节)。该类中的另一些算法包括std::equal和std::any_of。

当你和妻子或者搭档,在一个纪念盒中找寻一张老照片,当找到这张照片时,就不会再看另外的照片了。不过,你得让其他人知道你已经找到照片了(比如,大喊一声“找到了!”),这样其他人就会停止搜索了。很多算法的特性就是要对每一个元素进行处理,所以它们没有办法像std::find一样,一旦找到合适数据就停止执行。因此,你需要设计代码对其进行使用——当得到想要的答案就中断其他任务的执行,所以不能等待线程处理对剩下的元素进行处理。

如果不中断其他线程,那么串行版本的性能可能会超越并行版,因为串行算法可以在找到匹配元素的时候,停止搜索并返回。如果系统能支持四个并发线程,那么每个线程就可以对总数据量的1/4进行检查,并且在我们的实现只需要单核完成的1/4的时间,就能完成对所有元素的查找。如果匹配的元素在第一个1/4块中,串行算法将会返回第一个,因为算法不需要对剩下的元素进行处理了。

一种办法,中断其他线程的一个办法就是使用一个原子变量作为一个标识,在处理过每一个元素后就对这个标识进行检查。如果标识被设置,那么就有线程找到了匹配元素,所以算法就可以停止并返回了。用这种方式来中断线程,就可以将那些没有处理的数据保持原样,并且在更多的情况下,相较于串行方式,性能能提升很多。缺点就是,加载原子变量是一个很慢的操作,会阻碍每个线程的运行。

如何返回值和传播异常呢?现在你有两个选择。你可以使用一个future数组,使用std::packaged_task来转移值和异常,在主线程上对返回值和异常进行处理;或者使用std::promise对工作线程上的最终结果直接进行设置。这完全依赖于你想怎么样处理工作线程上的异常。如果想停止第一个异常(即使还没有对所有元素进行处理),就可以使用std::promise对异常和最终值进行设置。另外,如果想要让其他工作线程继续查找,可以使用std::packaged_task来存储所有的异常,当线程没有找到匹配元素时,异常将再次抛出。

这种情况下,我会选择std::promise,因为其行为和std::find更为接近。这里需要注意一下搜索的元素是不是在提供的搜索范围内。因此,在所有线程结束前,获取future上的结果。如果被future阻塞住,所要查找的值不在范围内,就会持续的等待下去。实现代码如下。

清单8.9 并行find算法实现

template
Iterator parallel_find(Iterator first,Iterator last,MatchType match)
{
struct find_element // 1
{
void operator()(Iteratorbegin,Iterator end, MatchType match,
               std::promise<Iterator>* result,std::atomic<bool>* done_flag)
{
  try
  {
    for(;(begin!=end) &&!done_flag->load();++begin)  // **2**
    {
      if(*begin==match)
      {
       result->set_value(begin);  // **3**
       done_flag->store(true);  // **4**
        return;
      }
    }
  }
  catch(...)  //** 5**
  {
    try
    {
      result->set_exception(std::current_exception());  // **6**
      done_flag->store(true);
    }
    catch(...)  //** 7**
    {}
  }
}
};
unsigned long constlength=std::distance(first,last);
if(!length)
return last;
unsigned long constmin_per_thread=25;
unsigned long constmax_threads=(length+min_per_thread-1)/min_per_thread;
unsigned long consthardware_threads=std::thread::hardware_concurrency();
unsigned long constnum_threads=std::min(hardware_threads!=0?hardware_threads:2,max_threads);
unsigned long constblock_size=length/num_threads;
std::promiseresult; //** 8**
std::atomicdone_flag(false); //** 9**
std::vectorstd::threadthreads(num_threads-1);
{
join_threads joiner(threads); //** 10**
Iterator block_start=first;
for(unsigned longi=0;i<(num_threads-1);++i)
{
  Iteratorblock_end=block_start;
 std::advance(block_end,block_size);
  threads[i]=std::thread(find_element(),block_start,block_end,match,&result,&done_flag); // **11**
  block_start=block_end;
}
find_element()(block_start,last,match,&result,&done_flag); // 12
}
if(!done_flag.load()) //13
{
return last;
}
returnresult.get_future().get(); // 14
}

清单8.9中的函数主体与之前的例子相似。这次,由find_element类①的函数调用操作实现,来完成查找工作的。循环通过在给定数据块中的元素,检查每一步上的标识②。如果匹配的元素被找到,就将最终的结果设置到promise③当中,并且在返回前对done_flag④进行设置。

如果有一个异常被抛出,那么它就会被通用处理代码⑤捕获,并且在promise⑥尝中试存储前,对done_flag进行设置。如果对应promise已经被设置,设置在promise上的值可能会抛出一个异常,所以这里⑦发生的任何异常,都可以捕获并丢弃。

这意味着,当线程调用find_element查询一个值,或者抛出一个异常时,如果其他线程看到done_flag被设置,那么其他线程将会终止。如果多线程同时找到匹配值或抛出异常,它们将会对promise产生竞争。不过,这是良性的条件竞争;因为,成功的竞争者会作为“第一个”返回线程,因此这个结果可以接受。

回到parallel_find函数本身,其拥有用来停止搜索的promise⑧和标识⑨;随着对范围内的元素的查找⑪,promise和标识会传递到新线程中。主线程也使用find_element来对剩下的元素进行查找⑫。像之前提到的,需要在全部线程结束前,对结果进行检查,因为结果可能是任意位置上的匹配元素。这里将“启动-汇入”代码放在一个块中⑩,所以所有线程都会在找到匹配元素时⑬进行汇入。如果找到匹配元素,就可以调用std::future(来自promise⑭)的成员函数get()来获取返回值或异常。

不过,这里假设你会使用硬件上所有可用的的并发线程,或使用其他机制对线程上的任务进行提前划分。就像之前一样,可以使用std::async,以及递归数据划分的方式来简化实现(同时使用C++标准库中提供的自动缩放工具)。使用std::async的parallel_find实现如下所示。

清单8.10 使用std::async实现的并行find算法

template // 1
Iterator parallel_find_impl(Iterator first,Iterator last,MatchTypematch,std::atomic& done)
{
try
{
unsigned long constlength=std::distance(first,last);
unsigned long constmin_per_thread=25;  // **2**

if(length<(2*min_per_thread)) // 3

{
  for(;(first!=last) &&!done.load();++first)  // **4**
  {
    if(*first==match)
    {
      done=true;  //** 5**
      return first;
    }
  }
  return last;  // **6**
}
else
{ 
  Iterator constmid_point=first+(length/2);  // **7**
  std::future<Iterator>async_result=
   std::async(&parallel_find_impl<Iterator,MatchType>, mid_point,last,match,std::ref(done));  //** 8**
  Iterator constdirect_result=parallel_find_impl(first,mid_point,match,done);  // 9
  return(direct_result==mid_point)?async_result.get():direct_result;  // 10
}
}
catch(…)
{
done=true;  // 11
throw;
}
}
template
Iterator parallel_find(Iterator first,Iterator last,MatchType match)
{
std::atomicdone(false);
returnparallel_find_impl(first,last,match,done); // 12
}

如果想要在找到匹配项时结束,就需要在线程之间设置一个标识来表明匹配项已经被找到。因此,需要将这个标识递归的传递。通过函数①的方式来实现是最简单的办法,只需要增加一个参数——一个done标识的引用,这个表示通过程序的主入口点传入⑫。

核心实现和之前的代码一样。通常函数的实现中,会让单个线程处理最少的数据项②;如果数据块大小不足于分成两半,就要让当前线程完成所有的工作了③。实际算法在一个简单的循环当中(给定范围),直到在循环到指定范围中的最后一个,或找到匹配项,并对标识进行设置④。如果找到匹配项,标识done就会在返回前进行设置⑤。无论是因为已经查找到最后一个,还是因为其他线程对done进行了设置,都会停止查找。如果没有找到,会将最后一个元素last进行返回⑥。

如果给定范围可以进行划分,首先要在st::async在对第二部分进行查找⑧前,要找数据中点⑦,而且需要使用std::ref将done以引用的方式传递。同时,可以通过对第一部分直接进行递归查找。两部分都是异步的,并且在原始范围过大时,直接递归查找的部分可能会再细化。

如果直接查找返回的是mid_point,这就意味着没有找到匹配项,所以就要从异步查找中获取结果。如果在另一半中没有匹配项的话,返回的结果就一定是last,这个值的返回就代表了没有找到匹配的元素⑩。如果“异步”调用被延迟(非真正的异步),那么实际上这里会运行get();这种情况下,如果对下半部分的元素搜索成功,那么就不会执行对上半部分元素的搜索了。如果异步查找真实的运行在其他线程上,那么async_result变量的析构函数将会等待该线程完成,所以这里不会有线程泄露。

像之前一样,std::async可以用来提供“异常-安全”和“异常-传播”特性。如果直接递归抛出异常,future的析构函数就能让异步执行的线程提前结束;如果异步调用抛出异常,那么这个异常将会通过对get()成员函数的调用进行传播⑩。使用try/catch块只能捕捉在done发生的异常,并且当有异常抛出⑪时,所有线程都能很快的终止运行。不过,不使用try/catch的实现依旧没问题,不同的就是要等待所有线程的工作是否完成。

实现中一个重要的特性就是,不能保证所有数据都能被std::find串行处理。其他并行算法可以借鉴这个特性,因为要让一个算法并行起来这是必须具有的特性。如果有顺序问题,元素就不能并发的处理了。如果每个元素独立,虽然对于parallel_for_each不是很重要,不过对于parallel_find,即使在开始部分已经找到了匹配元素,也有可能返回范围中最后一个元素;如果在知道结果的前提下,这样的结果会让人很惊讶。

OK,现在你已经使用了并行化的std::find。如在本节开始说的那样,其他相似算法不需要对每一个数据元素进行处理,并且同样的技术可以使用到这些类似的算法上去。我们将在第9章中看到“中断线程”的问题。

为了完成我们的并行“三重奏”,我们将换一个角度来看一下std::partial_sum。对于这个算法,没有太多的文献可参考,不过让这个算法并行起来是一件很有趣的事。

8.5.3** **并行实现:std::partial_sum

std::partial_sum会计算给定范围中的每个元素,并用计算后的结果将原始序列中的值替换掉。比如,有一个序列[1,2,3,4,5],在执行该算法后会成为:[1,3(1+2),6(1+2+3),10(1+2+3+4),15(1+2+3+4+5)]。让这样一个算法并行起来会很有趣,因为这里不能讲任务分块,对每一块进行独立的计算。比如,原始序列中的第一个元素需要加到后面的一个元素中去。

确定某个范围部分和的一种的方式,就是在独立块中计算部分和,然后将第一块中最后的元素的值,与下一块中的所有元素进行相加,依次类推。如果有个序列[1,2,3,4,5,6,7,8,9],然后将其分为三块,那么在第一次计算后就能得到[{1,3,6},{4,9,15},{7,15,24}]。然后将6(第一块的最后一个元素)加到第二个块中,那么就得到[{1,3,6},{10,15,21},{7,15,24}]。然后再将第二块的最后一个元素21加到第三块中去,就得到[{1,3,6},{10,15,21},{28,36,55}]。

将原始数据分割成块,加上之前块的部分和就能够并行了。如果每个块中的末尾元素都是第一个被更新的,那么块中其他的元素就能被其他线程所更新,同时另一个线程对下一块进行更新,等等。当处理的元素比处理核心的个数多的时候,这样完成工作没问题,因为每一个核芯在每一个阶段都有合适的数据可以进行处理。

如果有很多的处理器(就是要比处理的元素个数多),那么之前的方式就无法正常工作了。如果还是将工作划分给每个处理器,那么在第一步就没必要去做了。这种情况下,传递结果就意味着让处理器进行等待,这时需要给这些处于等待中的处理器一些工作。所以,可以采用完全不同的方式来处理这个问题。比起将数据块中的最后一个元素的结果向后面的元素块传递,可以对部分结果进行传播:第一次与相邻的元素(距离为1)相加和(和之前一样),之后和距离为2的元素相加,在后来和距离为4的元素相加,以此类推。比如,初始序列为[1,2,3,4,5,6,7,8,9],第一次后为[1,3,5,7,9,11,13,15,17],第二次后为[1,3,6,10,14,18, 22,26,30],下一次就要隔4个元素了。第三次后[1, 3, 6, 10, 15, 21, 28, 36, 44],下一次就要隔8个元素了。第四次后[1, 3, 6, 10, 15, 21, 28, 36, 45],这就是最终的结果。虽然,比起第一种方法多了很多步骤,不过在可并发平台下,这种方法提高了并行的可行性;每个处理器可在每一步中处理一个数据项。

总体来说,当有N个操作时(每步使用一个处理器)第二种方法需要log(N)[底为2]步;在本节中,N就相当于数据链表的长度。比起第一种,每个线程对分配块做N/k个操作,然后在做N/k次结果传递(这里的k是线程的数量)。因此,第一种方法的时间复杂度为O(N),不过第二种方法的时间复杂度为Q(Nlog(N))。当数据量和处理器数量相近时,第二种方法需要每个处理器上log(N)个操作,第一种方法中每个处理器上执行的操作数会随着k的增加而增多,因为需要对结果进行传递。对于处理单元较少的情况,第一种方法会比较合适;对于大规模并行系统,第二种方法比较合适。

不管怎么样,先将效率问题放一边,让我们来看一些代码。下面清单实现的,就是第一种方法。

清单8.11 使用划分的方式来并行的计算部分和

template
void parallel_partial_sum(Iterator first,Iterator last)
{
typedef typenameIterator::value_type value_type;
struct process_chunk // 1
{
void operator()(Iteratorbegin,Iterator last,std::future<value_type>* previous_end_value,
                 std::promise<value_type>* end_value)
{
  try
  {
    Iterator end=last;
    ++end;
   std::partial_sum(begin,end,begin); // **2**
    if(previous_end_value)  // **3**
    {
      value_type&addend=previous_end_value->get();  // **4**
      *last+=addend;  // **5**
      if(end_value)
      {
       end_value->set_value(*last); // **6**
      }
      std::for_each(begin,last,[addend](value_type&item){ item+=addend; });// **7**
     }
     else if(end_value)
     {
      end_value->set_value(*last); // **8**
     }
   }
   catch(...)  // **9**
   {
     if(end_value)
     {
      end_value->set_exception(std::current_exception());  // **10**
     }
     else
     {
       throw;  // **11**
     }
   }
 }
};
unsigned long constlength=std::distance(first,last);
if(!length)
return last;
unsigned long constmin_per_thread=25; // 12
unsigned long constmax_threads=(length+min_per_thread-1)/min_per_thread;
unsigned long consthardware_threads=std::thread::hardware_concurrency();
unsigned long constnum_threads=std::min(hardware_threads!=0?hardware_threads:2,max_threads);
unsigned long constblock_size=length/num_threads;
typedef typenameIterator::value_type value_type;
std::vectorstd::threadthreads(num_threads-1); // 13
std::vector > end_values(num_threads-1); // 14
std::vector > previous_end_values; //** 15**
previous_end_values.reserve(num_threads-1); // 16
join_threads joiner(threads);
Iterator block_start=first;
for(unsigned longi=0;i<(num_threads-1);++i)
{
Iterator block_last=block_start;

std::advance(block_last,block_size-1); // 17

threads[i]=std::thread(process_chunk(),block_start,block_last,

(i!=0)?&previous_end_values[i-1]:0,&end_values[i]); // 18

block_start=block_last;
++block_start;  //** 19**
previous_end_values.push_back(end_values[i].get_future()); // 20
}
Iteratorfinal_element=block_start;
std::advance(final_element,std::distance(block_start,last)-1); // 21
process_chunk()(block_start,final_element,(num_threads>1)?&previous_end_values.back():0, 0); //** 22**
}

这个实现中,使用的结构体和之前算法中的一样,将问题进行分块解决,每个线程处理最小的数据块⑫。其中,有一组线程⑬和一组promise⑭,用来存储每块中的最后一个值;并且实现中还有一组future⑮,用来对前一块中的最后一个值进行检索。可以为future⑯做些储备,以避免生成新线程时,再分配内存。

主循环和之前一样,不过这次是让迭代器指向了每个数据块的最后一个元素,而不是作为一个普通值传递到最后⑰,这样就方便向其他块传递当前块的最后一个元素了。实际处理是在process_chunk函数对象中完成的,这个结构体看上去不是很长;当前块的开始和结束迭代器和前块中最后一个值的future一起,作为参数进行传递,并且promise用来保留当前范围内最后一个值的原始值⑱。

生成新的线程后,就对开始块的ID进行更新,别忘了传递最后一个元素⑲,并且将当前块的最后一个元素存储到future,上面的数据将在循环中再次使用到⑳。

在处理最后一个数据块前,需要获取之前数据块中最后一个元素的迭代器(21),这样就可以将其作为参数传入process_chunk(22)中了。std::partial_sum不会返回一个值,所以在最后一个数据块被处理后,就不用再做任何事情了。当所有线程的操作完成时,求部分和的操作也就算完成了。

OK,现在来看一下process_chunk函数对象①。对于整块的处理是始于对std::partial_sum的调用,包括对于最后一个值的处理②,不过得要知道当前块是否是第一块③。如果当前块不是第一块,就会有一个previous_end_value值从前面的块传过来,所以这里需要等待这个值的产生④。为了将算法最大程度的并行,首先需要对最后一个元素进行更新⑤,这样你就能将这个值传递给下一个数据块(如果有下一个数据块的话)⑥。当完成这个操作,就可以使用std::for_each和简单的lambda函数⑦对剩余的数据项进行更新。

如果previous_end_value值为空,当前数据块就是第一个数据块,所以只需要为下一个数据块更新end_value⑧(如果有下一个数据块的话——当前数据块可能是唯一的数据块)。

最后,如果有任意一个操作抛出异常,就可以将其捕获⑨,并且存入promise⑩,如果下一个数据块尝试获取前一个数据块的最后一个值④时,异常会再次抛出。处理最后一个数据块时,异常会全部重新抛出⑪,因为抛出动作一定会在主线程上进行。

因为线程间需要同步,这里的代码就不容易使用std::async重写。任务等待会让线程中途去执行其他的任务,所以所有的任务必须同时执行。

基于块,以传递末尾元素值的方法就介绍到这里,让我们来看一下第二种计算方式。

实现以2的幂级数为距离部分和算法

第二种算法通过增加距离的方式,让更多的处理器充分发挥作用。在这种情况下,没有进一步同步的必要了,因为所有中间结果都直接传递到下一个处理器上去了。不过,在实际中我们很少见到,单个处理器处理对一定数量的元素执行同一条指令,这种方式成为*单指令**-*多数据流(SIMD)。因此,代码必须能处理通用情况,并且需要在每步上对线程进行显式同步。

完成这种功能的一种方式是使用栅栏(barrier)——一种同步机制:只有所有线程都到达栅栏处,才能进行之后的操作;先到达的线程必须等待未到达的线程。C++11标准库没有直接提供这样的工具,所以你得自行设计一个。

试想游乐场中的过山车。如果有适量的游客在等待,那么过山车管理员就要保证,在过山车启动前,每一个位置都得坐一个游客。栅栏的工作原理也一样:你已经知道了“座位”的数量,线程就是要等待所有“座位”都坐满。当等待线程够数,那么它们可以继续运行;这时,栅栏会重置,并且会让下一拨线程开始扥带。通常,会在循环中这样做,当同一个线程再次到达栅栏处,它会再次等待。这种方法是为了让线程同步,所以不会有线程在其他未完成的情况下,就去完成下一个任务。如果有线程提前执行,对于这样一个算法,就是一场灾难,因为提前出发的线程可能会修改要被其他线程使用到的数据,后面线程获取到的数据就不是正确数据了。

下面的代码就简单的实现了一个栅栏。

清单8.12 简单的栅栏类

class barrier
{
unsigned const count;
std::atomicspaces;
std::atomicgeneration;
public:
explicit barrier(unsigned count_):count(count_),spaces(count),generation(0){}// 1
void wait()
{
unsigned constmy_generation=generation;  // **2**

if(!--spaces)  //** 3**
{
  spaces=count;  //** 4**
  ++generation;  //** 5**
}
else
{
 while(generation==my_generation) // **6**
    std::this_thread::yield();  // **7**
}

}

};

这个实现中,用一定数量的“座位”构造了一个barrier①,这个数量将会存储count变量中。起初,栅栏中的spaces与count数量相当。当有线程都在等待时,spaces的数量就会减少③。当spaces的数量减到0时,spaces的值将会重置为count④,并且generation变量会增加,以向线程发出信号,让这些等待线程能够继续运行⑤。如果spaces没有到达0,那么线程会继续等待。这个实现使用了一个简单的自旋锁⑥,对generation的检查会在wait()开始的时候进行②。因为generation只会在所有线程都到达栅栏的时候更新⑤,在等待的时候使用yield()⑦就不会让CPU处于忙等待的状态。

这个实现比较“简单”的真实意义:使用自旋等待的情况下,如果让线程等待很长时间就不会很理想,并且如果超过count数量的线程对wait()进行调用,这个实现就没有办法工作了。如果想要很好的处理这样的情况,必须使用一个更加健壮(更加复杂)的实现。我依旧坚持对原子变量操作顺序的一致性,因为这会让事情更加简单,不过有时还是需要放松这样的约束。全局同步对于大规模并行架构来说是消耗巨大的,因为相关处理器会穿梭于存储栅栏状态的缓存行中(可见8.2.2中对乒乓缓存的讨论),所以需要格外的小心,来确保使用的是最佳同步方法。

不论怎么样,这些都需要你考虑到;需要有固定数量的线程执行同步循环。好吧,大多数情况下线程数量都是固定的。你可能还记得,代码起始部分的几个数据项,只需要几步就能得到其最终值。这就意味着,无论是让所有线程循环处理范围内的所有元素,还是让栅栏来同步线程,都会递减count的值。我会选择后者,因为其能避免线程做不必要的工作,仅仅是等待最终步骤完成。

这意味着你要将count改为一个原子变量,这样在多线程对其进行更新的时候,就不需要添加额外的同步:

std::atomic count;

初始化保持不变,不过当spaces的值被重置后,你需要显式的对count进行load()操作:

spaces=count.load();

这就是要对wait()函数的改动;现在需要一个新的成员函数来递减count。这个函数命名为done_waiting(),因为当一个线程完成其工作,并在等待的时候,才能对其进行调用它:

void done_waiting()
{
–count; // 1
if(!–spaces) // 2
{
spaces=count.load();  //** 3**
++generation;

}

}

实现中,首先要减少count①,所以下一次spaces将会被重置为一个较小的数。然后,需要递减spaces的值②。如果不做这些操作,有些线程将会持续等待,因为spaces被旧的count初始化,大于期望值。一组当中最后一个线程需要对计数器进行重置,并且递增generation的值③,就像在wait()里面做的那样。最重要的区别:最后一个线程不需要等待。当最后一个线程结束,整个等待也就随之结束!

现在就准备开始写部分和的第二个实现吧。在每一步中,每一个线程都在栅栏出调用wait(),来保证线程所处步骤一致,并且当所有线程都结束,那么最后一个线程会调用done_waiting()来减少count的值。如果使用两个缓存对原始数据进行保存,栅栏也可以提供你所需要的同步。每一步中,线程都会从原始数据或是缓存中读取数据,并且将新值写入对应位置。如果有线程先从原始数据处获取数据,那下一步就从缓存上获取数据(或相反)。这就能保证在读与写都是由独立线程完成,并不存在条件竞争。当线程结束等待循环,就能保证正确的值最终被写入到原始数据当中。下面的代码就是这样的实现。

清单8.13 通过两两更新对的方式实现partial_sum

struct barrier
{
std::atomic count;
std::atomicspaces;
std::atomicgeneration;
barrier(unsignedcount_):count(count_),spaces(count_),generation(0) {}
void wait()
{
unsigned constgen=generation.load();
if(!--spaces)
{
  spaces=count.load();
  ++generation;
}
else
{
  while(generation.load()==gen)
  {
    std::this_thread::yield();
  }
}
}
void done_waiting()
{
--count;
if(!--spaces)
{
  spaces=count.load();
  ++generation;
}
}
};
template
void parallel_partial_sum(Iterator first,Iterator last)
{
typedef typenameIterator::value_type value_type;
struct process_element //** 1**
{
void operator()(Iteratorfirst,Iterator last,std::vector<value_type>& buffer,unsignedi,barrier& b)
{
  value_type&ith_element=*(first+i);
  bool update_source=false;
  for(unsignedstep=0,stride=1;stride<=i;++step,stride*=2)
  {
    value_type const&source=(step%2) ? buffer[i]:ith_element; // **2**
    value_type&dest=(step%2) ? ith_element:buffer[i];
    value_type const& addend=(step%2)? buffer[i-stride]:*(first+i-stride);  //**3**
    dest=source+addend;  //** 4**
    update_source=!(step%2);
    b.wait();  // **5**
  }
  if(update_source)  // **6**
  {
    ith_element=buffer[i];
  }
  b.done_waiting();  // **7**
}
};
unsigned long constlength=std::distance(first,last);
if(length<=1)
return;
std::vectorbuffer(length);
barrier b(length);
std::vectorstd::threadthreads(length-1); // 8
join_threads joiner(threads);
Iterator block_start=first;
for(unsigned longi=0;i<(length-1);++i)
{
threads[i]=std::thread(process_element(),first,last,std::ref(buffer),i,std::ref(b));//** 9**
}
process_element()(first,last,buffer,length-1,b); // 10
}

代码的整体结构应该不用说了。process_element类有函数调用操作可以用来做具体的工作①,就是运行一组线程⑨,并将线程存储到vector中⑧,同样还需要在主线程中对其进行调用⑩。这里与之前最大的区别就是,线程的数量是根据列表中的数据量来定的,而非根据std::thread::hardware_concurrency。如我之前所说,除非你使用的是一个大规模并行的机器,因为这上面的线程都十分廉价(虽然这样的方式并不是很好),还能为我们展示了其整体结构。这个结构在有较少线程的时候,每一个线程只能处理源数据中的部分数据,当没有足够的线程支持该结构时,效率要比传递算法低。

不管怎样,主要的工作都是调用process_element的函数操作符来完成的。每一步,都会从原始数据或缓存中获取第i个元素②,并且将获取到的元素加到指定stride的元素中去③,如果从原始数据开始读取的元素,加和后的数需要存储在缓存中④。然后,在开始下一步前,会在栅栏处等待⑤。当stride超出了给定数据的范围,当最终结果已经存在缓存中时,就需要更新原始数据中的数据,同样这也意味着本次加和结束。最后,在调用栅栏中的done_waiting()函数⑦。

注意这个解决方案并不是异常安全的。如果某个线程在process_element执行时抛出一个异常,其就会终止整个应用。这里可以使用一个std::promise来存储异常,就像在清单8.9中parallel_find的实现,或仅使用一个被互斥量保护的std::exception_ptr即可。

总结下这三个例子。希望其能保证我们了解8.1、8.2、8.3和8.4节中提到的设计考量,并且证明了这些技术在真实的代码中,需要承担些什么责任。

8.6总结****

本章我们讨论了很多东西。我们从划分线程间的工作开始(比如,数据提前划分或让线程形成流水线)。之后,以低层次视角来看多线程下的性能问题,顺带了解了伪共享和数据通讯;了解访问数据的模式对性能的影响。再后,了解了附加注意事项是如何影响并发代码设计的,比如:异常安全和可扩展性。最后,用一些并行算法实现来结束了本章,在设计这些并行算法实现时碰到的问题,在设计其他并行代码的时候也会遇到。

本章中,关于线程池的部分被转移了。线程池——一个预先设定的线程组,会将任务指定给池中的线程。很多不错的想法可以用来设计一个不过的线程池;所以我们将在下一章中来看一些有关线程池的问题,以及高级线程管理方式。

相关文章
|
6天前
|
并行计算 安全 调度
C++ 11新特性之并发
C++ 11新特性之并发
19 0
|
5月前
|
算法 C++ 开发者
【C++ 20 并发工具 std::barrier】掌握并发编程:深入理解C++的std::barrier
【C++ 20 并发工具 std::barrier】掌握并发编程:深入理解C++的std::barrier
226 0
|
3月前
|
安全 程序员 C++
C++一分钟之-C++中的并发容器
【7月更文挑战第17天】C++11引入并发容器,如`std::shared_mutex`、`std::atomic`和线程安全的集合,以解决多线程中的数据竞争和死锁。常见问题包括原子操作的误用、锁的不当使用和迭代器失效。避免陷阱的关键在于正确使用原子操作、一致的锁管理以及处理迭代器失效。通过示例展示了如何安全地使用这些工具来提升并发编程的安全性和效率。
37 1
|
5月前
|
安全 Go 对象存储
C++多线程编程:并发与同步的实战应用
本文介绍了C++中的多线程编程,包括基础知识和实战应用。C++借助`&lt;thread&gt;`库支持多线程,通过`std::thread`创建线程执行任务。文章探讨了并发与同步的概念,如互斥锁(Mutex)用于保护共享资源,条件变量(Condition Variable)协调线程等待与通知,以及原子操作(Atomic Operations)保证线程安全。实战部分展示了如何使用多线程进行并发计算,利用`std::async`实现异步任务并获取结果。多线程编程能提高效率,但也需注意数据竞争和同步问题,以确保程序的正确性。
|
5月前
|
安全 C++
C++多线程编程:并发与同步
C++多线程编程:并发与同步
36 0
|
5月前
|
负载均衡 Java 数据处理
【C++ 并发 线程池】轻松掌握C++线程池:从底层原理到高级应用(三)
【C++ 并发 线程池】轻松掌握C++线程池:从底层原理到高级应用
316 2
|
5月前
|
负载均衡 安全 Java
【C++ 并发 线程池】轻松掌握C++线程池:从底层原理到高级应用(一)
【C++ 并发 线程池】轻松掌握C++线程池:从底层原理到高级应用
781 2
|
5月前
|
存储 监控 Java
【C++ 并发 线程池】轻松掌握C++线程池:从底层原理到高级应用(二)
【C++ 并发 线程池】轻松掌握C++线程池:从底层原理到高级应用
236 1
|
5月前
|
算法 安全 Unix
【C++ 20 信号量 】C++ 线程同步新特性 C++ 20 std::counting_semaphore 信号量的用法 控制对共享资源的并发访问
【C++ 20 信号量 】C++ 线程同步新特性 C++ 20 std::counting_semaphore 信号量的用法 控制对共享资源的并发访问
148 0
|
5月前
|
存储 设计模式 算法
【C++ 并发 线程池设计】深入理解C++线程池:设计、实现与应用
【C++ 并发 线程池设计】深入理解C++线程池:设计、实现与应用
191 0
下一篇
无影云桌面