3.2.3 定位接口间的条件竞争
因为使用了互斥量或其他机制保护了共享数据,就不必再为条件竞争所担忧吗?并不是,你依旧需要确定数据是否受到了保护。回想之前双链表的例子,为了能让线程安全地删除一个节点,需要确保防止对这三个节点(待删除的节点及其前后相邻的节点)的并发访问。如果只对指向每个节点的指针进行访问保护,那就和没有使用互斥量一样,条件竞争仍会发生——除了指针,整个数据结构和整个删除操作需要保护。这种情况下最简单的解决方案就是使用互斥量来保护整个链表,如清单3.1所示。
尽管链表的个别操作是安全的,但不意味着你就能走出困境;即使在一个很简单的接口中,依旧可能遇到条件竞争。例如,构建一个类似于 std::stack 结构的栈(清单3.3),除了构造函数和swap()以外,需要对 std::stack 提供五个操作:push()一个新元素进栈,pop()一个元素出栈,top()查看栈顶元素,empty()判断栈是否是空栈,size()了解栈中有多少个元素。即使修改了top(),使其返回一个拷贝而非引用(即遵循了3.2.2节的准则),对内部数据使用一个互斥量进行保护,不过这个接口仍存在条件竞争。这个问题不仅存在于基于互斥量实现的接口中,在无锁实现的接口中,条件竞争依旧会产生。这是接口的问题,与其实现方式无关。
清单3.3 std::stack 容器的实现
template<typename T,typename Container=std::deque<T> > class stack { public: explicit stack(const Container&); explicit stack(Container&& = Container()); template <class Alloc> explicit stack(const Alloc&); template <class Alloc> stack(const Container&, const Alloc&); template <class Alloc> stack(Container&&, const Alloc&); template <class Alloc> stack(stack&&, const Alloc&); bool empty() const; size_t size() const; T& top(); T const& top() const; void push(T const&); void push(T&&); void pop(); void swap(stack&&); template <class... Args> void emplace(Args&&... args); //C++14的新特性 };
虽然empty()和size()可能在返回时是正确的,但其的结果是不可靠的;当它们返回后,其他线程就可以自由地访问栈,并且可能push()多个新元素到栈中,也可能pop()一些已在栈中的元素。这样的话,之前从empty()和size()得到的结果就有问题了。
当栈实例是非共享的,如果栈非空,使用empty()检查再调用top()访问栈顶部的元素是安全的。如下代码所示:
stack<int> s; if (! s.empty()){ // 1 int const value = s.top(); // 2 s.pop(); // 3 do_something(value); }
以上是单线程安全代码:
- 对一个空栈使用top()是未定义行为。对于共享的栈对象,这样的调用顺序就不再安全了,因为在调用empty()①和调用top()②之间,可能有来自另一个线程的pop()调用并删除了最后一个元素。这是一个经典的条件竞争,使用互斥量对栈内部数据进行保护,但依旧不能阻止条件竞争的发生,这就是接口固有的问题。
怎么解决呢?
- 问题发生在接口设计上,所以解决的方法也就是改变接口设计。
有人会问:怎么改?
- 在这个简单的例子中,当调用top()时,发现栈已经是空的了,那么就抛出异常。
虽然这能直接解决这个问题,但这是一个笨拙的解决方案,这样的话,即使empty()返回false的情况下,你也需要异常捕获机制。本质上,这样的改变会让empty()成为一个多余函数。当仔细的观察过之前的代码段,就会发现另一个潜在的条件竞争在调用top()②和pop()之间。
假设两个线程运行着前面的代码,并且都引用同一个栈对象s。这并非罕见的情况,当为性能而使用线程时,多个线程在不同的数据上执行相同的操作是很平常的,并且共享同一个栈可以将工作分摊给它们。假设,一开始栈中只有两个元素,这时任一线程上的empty()和top()都存在竞争,只需要考虑可能的执行顺序即可。
当栈被一个内部互斥量所保护时,只有一个线程可以调用栈的成员函数,所以调用可以很好地交错,并且do_something()是可以并发运行的。在表3.1中,展示一种可能的执行顺序。
表3.1 一种可能执行顺序
当线程运行时,调用两次top(),栈没被修改,所以每个线程能得到同样的值。不仅是这样,在调用top()函数调用的过程中(两次),pop()函数都没有被调用。这样,在其中一个值再读取的时候,虽然不会出现“写后读”的情况,但其值已被处理了两次。这种条件竞争,比未定义的empty()/top()竞争更加严重;虽然其结果依赖于do_something()的结果,但因为看起来没有任何错误,就会让这个Bug很难定位。
这就需要接口设计上有较大的改动,提议之一就是使用同一互斥量来保护top()和pop()。TomCargill[1]指出当一个对象的拷贝构造函数在栈中抛出一个异常,这样的处理方式就会有问题。
在Herb Sutter[2]看来,这个问题可以从“异常安全”的角度完美解决,不过潜在的条件竞争,可能会组成一些新的组合。
说一些大家没有意识到的问题:假设有一个 stack<vector> ,vector是一个动态容器,当你拷贝一个vetcor,标准库会从堆上分配很多内存来完成这次拷贝。当这个系统处在重度负荷,或有严重的资源限制的情况下,这种内存分配就会失败,所以vector的拷贝构造函数可能会抛出一个 std::bad_alloc 异常。当vector中存有大量元素时,这种情况发生的可能性更大。
当pop()函数返回“弹出值”时(也就是从栈中将这个值移除),会有一个潜在的问题:这个值被返回到调用函数的时候,栈才被改变;但当拷贝数据的时候,调用函数抛出一个异常会怎么样? 如果事情真的发生了,要弹出的数据将会丢失;它的确从栈上移出了,但是拷贝失败了!
std::stack 的设计人员将这个操作分为两部分:先获取顶部元素(top()),然后从栈中移除(pop())。这样,在不能安全的将元素拷贝出去的情况下,栈中的这个数据还依旧存在,没有丢失。当问题是堆空间不足,应用可能会释放一些内存,然后再进行尝试。不幸的是,这样的分割却制造了本想避免或消除的条件竞争。幸运的是,我们还有的别的选项,但是使用这些选项是要付出代价的。
选项1: 传入一个引用
第一个选项是将变量的引用作为参数,传入pop()函数中获取想要的“弹出值”:
std::vector<int> result; some_stack.pop(result);
大多数情况下,这种方式还不错,但缺点很明显:
- 需要构造出一个栈中类型的实例,用于接收目标值。
对于一些类型,这样做是不现实的,因为临时构造一个实例,从时间和资源的角度上来看,都是不划算。对于其他的类型,这样也不总能行得通,因为构造函数需要的一些参数,在这个阶段的代码不一定可用。最后,需要可赋值的存储类型,这是一个重大限制:
- 即使支持移动构造,甚至是拷贝构造(从而允许返回一个值),很多用户自定义类型可能都不支持赋值操作。
选项2:无异常抛出的拷贝构造函数或移动构造函数
对于有返回值的pop()函数来说,只有“异常安全”方面的担忧(当返回值时可以抛出一个异常)。
很多类型都有拷贝构造函数,它们不会抛出异常,并且随着新标准中对“右值引用”的支持(详见附录A,A.1节),很多类型都将会有一个移动构造函数,即使他们和拷贝构造函数做着相同的事情,它也不会抛出异常。一个有用的选项可以限制对线程安全的栈的使用,并且能让栈安全的返回所需的值,而不会抛出异常。
虽然安全,但非可靠。尽管能在编译时可使用 std::is_nothrow_copy_constructible 和 std::is_nothrow_move_constructible 类型特征,让拷贝或移动构造函数不抛出异常,但是这种方式的局限性太强。用户自定义的类型中,会有不抛出异常的拷贝构造函数或移动构造函数的类型, 那些有抛出异常的拷贝构造函数,但没有移动构造函数的类型往往更多(这种情况会随着人们习惯于C++11中的右值引用而有所改变)。如果这些类型不能被存储在线程安全的栈中,那将是多么的不幸。
选项3:返回指向弹出值的指针
第三个选择是返回一个指向弹出元素的指针,而不是直接返回值。指针的优势是自由拷贝,并且不会产生异常,这样你就能避免Cargill提到的异常问题了。缺点就是返回一个指针需要对对象的内存分配进行管理,对于简单数据类型(比如:int),内存管理的开销要远大于直接返回值。对于选择这个方案的接口,使用 std::shared_ptr 是个不错的选择;不仅能避免内存泄露(因为当对象中指针销毁时,对象也会被销毁),而且标准库能够完全控制内存分配方案,也就不需要new和delete操作。这种优化是很重要的:因为堆栈中的每个对象,都需要用new进行独立的内存分配,相较于非线程安全版本,这个方案的开销相当大。
选项4:“选项1 + 选项2”或 “选项1 + 选项3”
对于通用的代码来说,灵活性不应忽视。当你已经选择了选项2或3时,再去选择1也是很容易的。这些选项提供给用户,让用户自己选择对于他们自己来说最合适,最经济的方案。
例:定义线程安全的堆栈
清单3.4中是一个接口没有条件竞争的堆栈类定义,它实现了选项1和选项3:重载了pop(),使用一个局部引用去存储弹出值,并返回一个 std::shared_ptr<> 对象。它有一个简单的接口,只有两个函数:push()和pop();
清单3.4 线程安全的堆栈类定义(概述)
#include <exception> #include <memory> // For std::shared_ptr<> struct empty_stack: std::exception { const char* what() const throw(); }; template<typename T> class threadsafe_stack { public: threadsafe_stack(); threadsafe_stack(const threadsafe_stack&); threadsafe_stack& operator=(const threadsafe_stack&) = delete; // 1 赋值操作被删除 void push(T new_value); std::shared_ptr<T> pop(); void pop(T& value); bool empty() const; };
削减接口可以获得最大程度的安全,甚至限制对栈的一些操作。栈是不能直接赋值的,因为赋值操作已经删除了①(详见附录A,A.2节),并且这里没有swap()函数。栈可以拷贝的,假设栈中的元素可以拷贝。当栈为空时,pop()函数会抛出一个empty_stack异常,所以在empty()函数被调用后,其他部件还能正常工作。如选项3描述的那样,使用 std::shared_ptr 可以避免内存分配管理的问题,并避免多次使用new和delete操作。堆栈中的五个操作,现在就剩下三个:
- push(), pop()和empty()(这里empty()都有些多余)。
简化接口更有利于数据控制,可以保证互斥量将一个操作完全锁住。下面的代码将展示一个简单的实现——封装 std::stack<> 的线程安全堆栈。
清单3.5 扩充(线程安全)堆栈
#include <exception> #include <memory> #include <mutex> #include <stack> struct empty_stack: std::exception { const char* what() const throw() { return "empty stack!"; }; }; template<typename T> class threadsafe_stack { private: std::stack<T> data; mutable std::mutex m; public: threadsafe_stack():data(std::stack<T>()){} threadsafe_stack(const threadsafe_stack& other) { std::lock_guard<std::mutex> lock(other.m); data = other.data; // 1 在构造函数体中的执行拷贝 } threadsafe_stack& operator=(const threadsafe_stack&) = delete; void push(T new_value) { std::lock_guard<std::mutex> lock(m); data.push(new_value); } std::shared_ptr<T> pop() { std::lock_guard<std::mutex> lock(m); if(data.empty()) throw empty_stack(); // 在调用pop前,检查栈是否为空 std::shared_ptr<T> const res(std::make_shared<T> (data.top())); // 在修改堆栈前,分配出返回值 data.pop(); return res; } void pop(T& value) { std::lock_guard<std::mutex> lock(m); if(data.empty()) throw empty_stack(); value=data.top(); data.pop(); } bool empty() const { std::lock_guard<std::mutex> lock(m); return data.empty(); } };
堆栈可以拷贝——拷贝构造函数对互斥量上锁,再拷贝堆栈。构造函数体中①的拷贝使用互斥量来确保复制结果的正确性,这样的方式比成员初始化列表好。
之前对top()和pop()函数的讨论中,恶性条件竞争已经出现,因为锁的粒度太小,需要保护的操作并未全覆盖到。不过,锁住的颗粒过大同样会有问题。还有一个问题,一个全局互斥量要去保护全部共享数据,在一个系统中存在有大量的共享数据时,因为线程可以强制运行,甚至可以访问不同位置的数据,抵消了并发带来的性能提升。第一版为多处理器系统设计Linux内核中,就使用了一个全局内核锁。虽然这个锁能正常工作,但在双核处理系统的上的性能要比两个单核系统的性能差很多,四核系统就更不能提了。太多请求去竞争占用内核,使得依赖于处理器运行的线程没有办法很好的工作。随后修正的Linux内核加入了一个细粒度锁方案,因为少了很多内核竞争,这时四核处理系统的性能就和单核处理的四倍差不多了。
使用多个互斥量保护所有的数据,细粒度锁也有问题。如前所述,当增大互斥量覆盖数据的粒度时,只需要锁住一个互斥量。但是,这种方案并非放之四海皆准,比如:互斥量正在保护一个独立类的实例;这种情况下,锁的状态的下一个阶段,不是离开锁定区域将锁定区域还给用户,就是有独立的互斥量去保护这个类的全部实例。当然,这两种方式都不理想。
一个给定操作需要两个或两个以上的互斥量时,另一个潜在的问题将出现:
- 死锁。
与条件竞争完全相反——不同的两个线程会互相等待,从而什么都没做。
3.2.4 死锁:问题描述及解决方案
试想有一个玩具,这个玩具由两部分组成,必须拿到这两个部分,才能够玩。
例如,一个玩具鼓,需要一个鼓锤和一个鼓才能玩。现在有两个小孩,他们都很喜欢玩这个玩具。当其中一个孩子拿到了鼓和鼓锤时,那就可以尽情的玩耍了。当另一孩子想要玩,他就得等待另一孩子玩完才行。再试想,鼓和鼓锤被放在不同的玩具箱里,并且两个孩子在同一时间里都想要去敲鼓。之后,他们就去玩具箱里面找这个鼓。其中一个找到了鼓,并且另外一个找到了鼓锤。现在问题就来了,除非其中一个孩子决定让另一个先玩,他可以把自己的那部分给另外一个孩子;但当他们都紧握着自己所有的部分而不给予,那么这个鼓谁都没法玩。
现在没有孩子去争抢玩具,但线程有对锁的竞争:一对线程需要对他们所有的互斥量做一些操作,其中每个线程都有一个互斥量,且等待另一个解锁。这样没有线程能工作,因为他们都在等待对方释放互斥量。这种情况就是死锁,它的最大问题就是由两个或两个以上的互斥量来锁定一个操作。
避免死锁的一般建议,就是让两个互斥量总以相同的顺序上锁:总在互斥量B之前锁住互斥量A,就永远不会死锁。某些情况下是可以这样用,因为不同的互斥量用于不同的地方。不过,事情没那么简单,比如:当有多个互斥量保护同一个类的独立实例时,一个操作对同一个类的两个不同实例进行数据的交换操作,为了保证数据交换操作的正确性,就要避免数据被并发修改,并确保每个实例上的互斥量都能锁住自己要保护的区域。不过,选择一个固定的顺序(例如,实例提供的第一互斥量作为第一个参数,提供的第二个互斥量为第二个参数),可能会适得其反:在参数交换了之后,两个线程试图在相同的两个实例间进行数据交换时,程序又死锁了!
很幸运,C++标准库有办法解决这个问题, std::lock ——可以一次性锁住多个(两个以上)的互斥量,并且没有副作用(死锁风险)。下面的程序清单中,就来看一下怎么在一个简单的交换操作中使用 std::lock 。
清单3.6 交换操作中使用 std::lock() 和 std::lock_guard
// 这里的std::lock()需要包含<mutex>头文件 class some_big_object; void swap(some_big_object& lhs,some_big_object& rhs); class X { private: some_big_object some_detail; std::mutex m; public: X(some_big_object const& sd):some_detail(sd){} friend void swap(X& lhs, X& rhs) { if(&lhs==&rhs) return; std::lock(lhs.m,rhs.m); // 1 std::lock_guard<std::mutex> lock_a(lhs.m,std::adopt_lock); // 2 std::lock_guard<std::mutex> lock_b(rhs.m,std::adopt_lock); // 3 swap(lhs.some_detail,rhs.some_detail); } };
首先,检查参数是否是不同的实例,因为操作试图获取 std::mutex 对象上的锁,所以当其被获取时,结果很难预料。(一个互斥量可以在同一线程上多次上锁,标准库中 std::recursive_mutex 提供这样的功能。详情见3.3.3节)。然后,调用 std::lock() ①锁住两个互斥量,并且两个 std:lock_guard 实例已经创建好②③。提供 std::adopt_lock 参数除了表示 std::lock_guard 对象可获取锁之外,还将锁交由 std::lock_guard 对象管理,而不需要 std::lock_guard 对象再去构建新的锁。
这样,就能保证在大多数情况下,函数退出时互斥量能被正确的解锁(保护操作可能会抛出一个异常),也允许使用一个简单的“return”作为返回。还有,当使用 std::lock 去锁lhs.m或rhs.m时,可能会抛出异常;这种情况下,异常会传播到 std::lock 之外。当 std::lock 成功的获取一个互斥量上的锁,并且当其尝试从另一个互斥量上再获取锁时,就会有异常抛出,第一个锁也会随着异常的产生而自动释放,所以 std::lock 要么将两个锁都锁住,要不一个都不锁。
C++17对这种情况提供了支持, std::scoped_lock<> 一种新的RAII类型模板类型,
与 std::lock_guard<> 的功能等价,这个新类型能接受不定数量的互斥量类型作为模板参数,以及相应的互斥量(数量和类型)作为构造参数。互斥量支持构造即上锁,与 std::lock 的用法相同,其解锁阶段是在析构中进行。清单3.6中swap()操作可以重写如下:
void swap(X& lhs, X& rhs) { if(&lhs==&rhs) return; std::scoped_lock guard(lhs.m,rhs.m); // 1 swap(lhs.some_detail,rhs.some_detail); }
这里使用了C++17的另一个特性:自动推导模板参数。如果你手头上有支持C++17的编译器(那你就能使用 std::scoped_lock 了,因为其是C++17标准库中的一个工具),C++17可以通过隐式参数模板类型推导机制, 通过传递的对形象类型来构造实例①。这行代码等价于下面参数全给的版本:
std::scoped_lock<std::mutex,std::mutex> guard(lhs.m,rhs.m);
std::scoped_lock 的好处在于,可以将所有 std::lock 替换掉,从而减少潜在错误的发生。
虽然 std::lock (和 std::scoped_lock<> )可以在这情况下(获取两个以上的锁)避免死锁,但它没办法帮助你获取其中一个锁。这时,依赖于开发者的纪律性(译者:也就是经验),来确保你的程序不会死锁。这并不简单:死锁是多线程编程中一个令人相当头痛的问题,并且死锁经常是不可预见的,因为在大多数时间里,所有工作都能很好的完成。不过,一些相对简单的规则能帮助写出“无死锁”的代码。
3.2.5 避免死锁的进阶指导
死锁通常是由于对锁的不当操作造成,但也不排除死锁出现在其他地方。无锁的情况下,仅需要每个 std::thread 对象调用join(),两个线程就能产生死锁。这种情况下,没有线程可以继续运行,因为他们正在互相等待。这种情况很常见,一个线程会等待另一个线程,其他线程同时也会等待第一个线程结束,所以三个或更多线程的互相等待也会发生死锁。为了避免死锁,这里的指导意见为:当机会来临时,不要拱手让人。以下提供一些个人的指导建议,如何识别死锁,并消除其他线程的等待。
避免嵌套锁
第一个建议往往是最简单的:
- 一个线程已获得一个锁时,再别去获取第二个。因为每个线程只持有一个锁,锁上就不会产生死锁。即使互斥锁造成死锁的最常见原因,也可能会在其他方面受到死锁的困扰(比如:线程间的互相等待)。当你需要获取多个锁,使用一
个 std::lock 来做这件事(对获取锁的操作上锁),避免产生死锁。避免在持有锁时调用用户提供的代码
第二个建议是次简单的:
- 因为代码是用户提供的,你没有办法确定用户要做什么;用户程序可能做任何事情,包括获取锁。你在持有锁的情况下,调用用户提供的代码;如果用户代码要获取一个锁,就会违反第一个指导意见,并造成死锁(有时,这是无法避免的)。当你正在写一份通用代码,例如3.2.3中的栈,每一个操作的参数类型,都在用户提供的代码中定义,就需要其他指导意见来帮助你。
使用固定顺序获取锁
当硬性条件要求你获取两个或两个以上的锁,并且不能使用 std::lock 单独操作来获取它
们;那么最好在每个线程上,用固定的顺序获取它们(锁)。3.2.4节中提到,当需要获取两个互斥量时,避免死锁的方法,关键是如何在线程之间,以一定的顺序获取锁。一些情况下,这种方式相对简单。比如,3.2.3节中的栈——每个栈实例中都内置有互斥量,但是对数据成员存储的操作上,栈就需要调用用户提供的代码。虽然,可以添加一些约束,对栈上存储的数据项不做任何操作,对数据项的处理仅限于栈自身。这会给用户提供的栈增加一些负担,但是一个容器很少去访问另一个容器中存储的数据,即使发生了也会很明显,所以这对于通用栈来说并不是一个特别沉重的负担。
其他情况下,这就没那么简单了,例如:3.2.4节中的交换操作,这种情况下你可能同时锁住多个互斥量(有时不会发生)。回看3.1节中那个链表连接例子时,会看到列表中的每个节点都会有一个互斥量保护。为了访问列表,线程必须获取他们感兴趣节点上的互斥锁。当一个线程删除一个节点,它必须获取三个节点上的互斥锁:将要删除的节点,两个邻接节点(因为他们也会被修改)。同样的,为了遍历链表,线程必须保证在获取当前节点的互斥锁前提下,获得下一个节点的锁,要保证指向下一个节点的指针不会同时被修改。一旦下一个节点上的锁被获取,那么第一个节点的锁就可以释放了,因为没有持有它的必要性了。
这种“手递手”锁的模式允许多个线程访问列表,为每一个访问的线程提供不同的节点。但是,为了避免死锁,节点必须以同样的顺序上锁:如果两个线程试图用互为反向的顺序,使用“手递手”锁遍历列表,他们将执行到列表中间部分时,发生死锁。当节点A和B在列表中相邻,当前线程可能会同时尝试获取A和B上的锁。另一个线程可能已经获取了节点B上的锁,并且试图获取节点A上的锁——经典的死锁场景,如图3.2所示。
图3.2 不同线程以相反顺序访问列表所造成的死锁
当A、C节点中的B节点正在被删除时,如果有线程在已获取A和C上的锁后,还要获取B节点上的锁时,当一个线程遍历列表的时候,这样的情况就可能发生死锁。这样的线程可能会试图首先锁住A节点或C节点(根据遍历的方向),但是后面就会发现,它无法获得B上的锁,因为线程在执行删除任务的时候,已经获取了B上的锁,并且同时也获取了A和C上的锁。
这里提供一种避免死锁的方式,定义遍历的顺序,一个线程必须先锁住A才能获取B的锁,在锁住B之后才能获取C的锁。这将消除死锁发生的可能性,不允许反向遍历的列表上。类似的约定常被用来建立其他的数据结构。